You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/01/17 22:02:32 UTC

[incubator-hudi] branch master updated: [HUDI-238] Make Hudi support Scala 2.12 (#1226)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 292c1e2  [HUDI-238] Make Hudi support Scala 2.12 (#1226)
292c1e2 is described below

commit 292c1e2ff436a711cbbb53ad9b1f6232121d53ec
Author: wenningd <we...@gmail.com>
AuthorDate: Fri Jan 17 14:02:21 2020 -0800

    [HUDI-238] Make Hudi support Scala 2.12 (#1226)
    
    * [HUDI-238] Rename scala related artifactId & add maven profile to support Scala 2.12
---
 LICENSE                                            |   2 +
 dev/change-scala-version.sh                        |  66 ++++++++++++
 docker/demo/config/kafka-source.properties         |   4 +-
 docker/hoodie/hadoop/hive_base/pom.xml             |   4 +-
 docker/hoodie/hadoop/pom.xml                       |   2 +-
 hudi-cli/pom.xml                                   |  12 ++-
 hudi-client/pom.xml                                |   4 +-
 hudi-integ-test/pom.xml                            |   4 +-
 hudi-spark/pom.xml                                 |  19 ++--
 hudi-spark/run_hoodie_app.sh                       |   2 +-
 hudi-utilities/pom.xml                             |  39 +++++--
 .../hudi/utilities/sources/AvroKafkaSource.java    |  17 +--
 .../hudi/utilities/sources/JsonKafkaSource.java    |  13 ++-
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 120 ++++++++-------------
 .../hudi/utilities/sources/TestKafkaSource.java    |  22 ++--
 .../delta-streamer-config/kafka-source.properties  |   4 +-
 packaging/hudi-spark-bundle/pom.xml                |  12 +--
 packaging/hudi-utilities-bundle/pom.xml            |  18 ++--
 pom.xml                                            |  50 +++++++--
 19 files changed, 261 insertions(+), 153 deletions(-)

diff --git a/LICENSE b/LICENSE
index 341988e..6b2997f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -245,6 +245,8 @@ This product includes code from Apache Spark
 
 * org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package
 
+* dev/change-scala-version.sh copied from https://github.com/apache/spark/blob/branch-2.4/dev/change-scala-version.sh
+
 Copyright: 2014 and onwards The Apache Software Foundation
 Home page: http://spark.apache.org/
 License: http://www.apache.org/licenses/LICENSE-2.0
diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh
new file mode 100755
index 0000000..151581d
--- /dev/null
+++ b/dev/change-scala-version.sh
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+VALID_VERSIONS=( 2.11 2.12 )
+
+usage() {
+  echo "Usage: $(basename $0) [-h|--help] <version>
+where :
+  -h| --help Display this help text
+  valid version values : ${VALID_VERSIONS[*]}
+" 1>&2
+  exit 1
+}
+
+if [[ ($# -ne 1) || ( $1 == "--help") ||  $1 == "-h" ]]; then
+  usage
+fi
+
+TO_VERSION=$1
+
+check_scala_version() {
+  for i in ${VALID_VERSIONS[*]}; do [ $i = "$1" ] && return 0; done
+  echo "Invalid Scala version: $1. Valid versions: ${VALID_VERSIONS[*]}" 1>&2
+  exit 1
+}
+
+check_scala_version "$TO_VERSION"
+
+if [ $TO_VERSION = "2.11" ]; then
+  FROM_VERSION="2.12"
+else
+  FROM_VERSION="2.11"
+fi
+
+sed_i() {
+  sed -e "$1" "$2" > "$2.tmp" && mv "$2.tmp" "$2"
+}
+
+export -f sed_i
+
+BASEDIR=$(dirname $0)/..
+find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \
+  -exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \;
+
+# Also update <scala.binary.version> in parent POM
+# Match any scala binary version to ensure idempotency
+sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'$TO_VERSION'</' \
+  "$BASEDIR/pom.xml"
diff --git a/docker/demo/config/kafka-source.properties b/docker/demo/config/kafka-source.properties
index dbeba47..4d7f77c 100644
--- a/docker/demo/config/kafka-source.properties
+++ b/docker/demo/config/kafka-source.properties
@@ -26,5 +26,5 @@ hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.a
 # Kafka Source
 hoodie.deltastreamer.source.kafka.topic=stock_ticks
 #Kafka props
-metadata.broker.list=kafkabroker:9092
-auto.offset.reset=smallest
+bootstrap.servers=kafkabroker:9092
+auto.offset.reset=earliest
diff --git a/docker/hoodie/hadoop/hive_base/pom.xml b/docker/hoodie/hadoop/hive_base/pom.xml
index 497d9d4..5af64d0 100644
--- a/docker/hoodie/hadoop/hive_base/pom.xml
+++ b/docker/hoodie/hadoop/hive_base/pom.xml
@@ -57,9 +57,9 @@
               <tasks>
                 <copy file="${project.basedir}/../../../../packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-${project.version}.jar" tofile="target/hoodie-hadoop-mr-bundle.jar" />
                 <copy file="${project.basedir}/../../../../packaging/hudi-hive-bundle/target/hudi-hive-bundle-${project.version}.jar" tofile="target/hoodie-hive-bundle.jar" />
-                <copy file="${project.basedir}/../../../../packaging/hudi-spark-bundle/target/hudi-spark-bundle-${project.version}.jar" tofile="target/hoodie-spark-bundle.jar" />
+                <copy file="${project.basedir}/../../../../packaging/hudi-spark-bundle/target/hudi-spark-bundle_${scala.binary.version}-${project.version}.jar" tofile="target/hoodie-spark-bundle.jar" />
                 <copy
-                  file="${project.basedir}/../../../../packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-${project.version}.jar"
+                  file="${project.basedir}/../../../../packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_${scala.binary.version}-${project.version}.jar"
                   tofile="target/hoodie-utilities.jar"/>
               </tasks>
             </configuration>
diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml
index 55e5c65..84380f7 100644
--- a/docker/hoodie/hadoop/pom.xml
+++ b/docker/hoodie/hadoop/pom.xml
@@ -42,7 +42,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark-bundle</artifactId>
+      <artifactId>hudi-spark-bundle_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
   </dependencies>
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 47f7c7f..d8e1903 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -147,7 +147,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-utilities</artifactId>
+      <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
 
@@ -160,16 +160,22 @@
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.thoughtworks.paranamer</groupId>
+          <artifactId>paranamer</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Spark -->
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_2.11</artifactId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_2.11</artifactId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
 
     <dependency>
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 02842e9..63a6218 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -95,11 +95,11 @@
     <!-- Spark -->
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_2.11</artifactId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_2.11</artifactId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
 
     <!-- Dropwizard Metrics -->
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index af82818..6c74a12 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -54,7 +54,7 @@
     <!-- Hoodie -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark</artifactId>
+      <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
 
@@ -83,7 +83,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark</artifactId>
+      <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <classifier>tests</classifier>
       <type>test-jar</type>
diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index e83d96d..bf18735 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -23,7 +23,7 @@
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>hudi-spark</artifactId>
+  <artifactId>hudi-spark_2.11</artifactId>
   <packaging>jar</packaging>
 
   <properties>
@@ -190,13 +190,20 @@
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.module</groupId>
-      <artifactId>jackson-module-scala_2.11</artifactId>
+      <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
     </dependency>
 
     <!-- Avro -->
     <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
+      <exclusions>
+        <exclusion>
+          <!-- this version to conflict to spark-core_2.12 -->
+          <groupId>com.thoughtworks.paranamer</groupId>
+          <artifactId>paranamer</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Parquet -->
@@ -208,17 +215,17 @@
     <!-- Spark -->
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_2.11</artifactId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_2.11</artifactId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
 
     <!-- Spark (Packages) -->
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-avro_2.11</artifactId>
+      <artifactId>spark-avro_${scala.binary.version}</artifactId>
       <scope>provided</scope>
     </dependency>
 
@@ -293,7 +300,7 @@
 
     <dependency>
       <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_2.11</artifactId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
       <version>${scalatest.version}</version>
       <scope>test</scope>
     </dependency>
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh
index 17ba925..88ae209 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_app.sh
@@ -25,7 +25,7 @@ function error_exit {
 
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 #Ensure we pick the right jar even for hive11 builds
-HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle-*.jar | grep -v source | head -1`
+HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1`
 
 if [ -z "$HADOOP_CONF_DIR" ]; then
   echo "setting hadoop conf dir"
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 53a3a1a..59aae72 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -23,7 +23,7 @@
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>hudi-utilities</artifactId>
+  <artifactId>hudi-utilities_2.11</artifactId>
   <packaging>jar</packaging>
 
   <properties>
@@ -109,7 +109,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark</artifactId>
+      <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <exclusions>
         <exclusion>
@@ -128,19 +128,25 @@
     <!-- Fasterxml -->
     <dependency>
       <groupId>com.fasterxml.jackson.module</groupId>
-      <artifactId>jackson-module-scala_2.11</artifactId>
+      <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
     </dependency>
 
     <!-- Parquet -->
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.thoughtworks.paranamer</groupId>
+          <artifactId>paranamer</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Spark -->
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_2.11</artifactId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
       <exclusions>
         <exclusion>
           <groupId>javax.servlet</groupId>
@@ -151,7 +157,7 @@
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_2.11</artifactId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
       <exclusions>
         <exclusion>
           <groupId>javax.servlet</groupId>
@@ -162,20 +168,26 @@
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-avro_2.11</artifactId>
+      <artifactId>spark-avro_${scala.binary.version}</artifactId>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_2.11</artifactId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
+      <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <classifier>tests</classifier>
+    </dependency>
 
     <!-- Dropwizard Metrics -->
     <dependency>
@@ -197,8 +209,8 @@
 
     <dependency>
       <groupId>com.twitter</groupId>
-      <artifactId>bijection-avro_2.11</artifactId>
-      <version>0.9.2</version>
+      <artifactId>bijection-avro_${scala.binary.version}</artifactId>
+      <version>0.9.3</version>
     </dependency>
 
     <!-- Kafka -->
@@ -223,6 +235,13 @@
       <version>3.0.0</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>${kafka.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <!-- Httpcomponents -->
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 18ebff4..b213988 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -24,16 +24,17 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 
-import io.confluent.kafka.serializers.KafkaAvroDecoder;
-import kafka.serializer.StringDecoder;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+import org.apache.spark.streaming.kafka010.OffsetRange;
 
 /**
  * Reads avro serialized Kafka data, based on the confluent schema-registry.
@@ -47,6 +48,8 @@ public class AvroKafkaSource extends AvroSource {
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
+    props.put("key.deserializer", StringDeserializer.class);
+    props.put("value.deserializer", KafkaAvroDeserializer.class);
     offsetGen = new KafkaOffsetGen(props);
   }
 
@@ -64,9 +67,7 @@ public class AvroKafkaSource extends AvroSource {
   }
 
   private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
-    JavaRDD<GenericRecord> recordRDD =
-        KafkaUtils.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class,
-            offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj);
-    return recordRDD;
+    return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+            LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
   }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index bd922ac..51a1ae1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -24,14 +24,15 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 
-import kafka.serializer.StringDecoder;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+import org.apache.spark.streaming.kafka010.OffsetRange;
 
 /**
  * Read json kafka data.
@@ -45,6 +46,8 @@ public class JsonKafkaSource extends JsonSource {
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(properties, sparkContext, sparkSession, schemaProvider);
+    properties.put("key.deserializer", StringDeserializer.class);
+    properties.put("value.deserializer", StringDeserializer.class);
     offsetGen = new KafkaOffsetGen(properties);
   }
 
@@ -61,7 +64,7 @@ public class JsonKafkaSource extends JsonSource {
   }
 
   private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
-    return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class,
-        offsetGen.getKafkaParams(), offsetRanges).values();
+    return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+            LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
   }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index c17a5cf..ed5e4e9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -22,30 +22,24 @@ import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
 
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.streaming.kafka.KafkaCluster;
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.OffsetRange;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
-import scala.Predef;
-import scala.collection.JavaConverters;
-import scala.collection.immutable.Map;
-import scala.collection.immutable.Set;
-import scala.collection.mutable.ArrayBuffer;
-import scala.collection.mutable.StringBuilder;
-import scala.util.Either;
-
 /**
  * Source to read data from Kafka, incrementally.
  */
@@ -58,8 +52,8 @@ public class KafkaOffsetGen {
     /**
      * Reconstruct checkpoint from string.
      */
-    public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String checkpointStr) {
-      HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<>();
+    public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) {
+      HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
       if (checkpointStr.length() == 0) {
         return offsetMap;
       }
@@ -67,8 +61,7 @@ public class KafkaOffsetGen {
       String topic = splits[0];
       for (int i = 1; i < splits.length; i++) {
         String[] subSplits = splits[i].split(":");
-        offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])),
-            new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1])));
+        offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
       }
       return offsetMap;
     }
@@ -83,7 +76,7 @@ public class KafkaOffsetGen {
       // at least 1 partition will be present.
       sb.append(ranges[0].topic() + ",");
       sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
-          .collect(Collectors.joining(",")));
+              .collect(Collectors.joining(",")));
       return sb.toString();
     }
 
@@ -94,32 +87,32 @@ public class KafkaOffsetGen {
      * @param toOffsetMap offsets of where each partitions is currently at
      * @param numEvents maximum number of events to read.
      */
-    public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, LeaderOffset> fromOffsetMap,
-        HashMap<TopicAndPartition, LeaderOffset> toOffsetMap, long numEvents) {
+    public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap,
+                                                    Map<TopicPartition, Long> toOffsetMap, long numEvents) {
 
       Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
 
       // Create initial offset ranges for each 'to' partition, with from = to offsets.
       OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
       toOffsetMap.entrySet().stream().map(e -> {
-        TopicAndPartition tp = e.getKey();
-        long fromOffset = fromOffsetMap.getOrDefault(tp, new LeaderOffset("", -1, 0)).offset();
+        TopicPartition tp = e.getKey();
+        long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
         return OffsetRange.create(tp, fromOffset, fromOffset);
       }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
 
       long allocedEvents = 0;
-      java.util.Set<Integer> exhaustedPartitions = new HashSet<>();
+      Set<Integer> exhaustedPartitions = new HashSet<>();
       // keep going until we have events to allocate and partitions still not exhausted.
       while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
         long remainingEvents = numEvents - allocedEvents;
         long eventsPerPartition =
-            (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
+                (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
 
         // Allocate the remaining events to non-exhausted partitions, in round robin fashion
         for (int i = 0; i < ranges.length; i++) {
           OffsetRange range = ranges[i];
           if (!exhaustedPartitions.contains(range.partition())) {
-            long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset();
+            long toOffsetMax = toOffsetMap.get(range.topicPartition());
             long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
             if (toOffset == toOffsetMax) {
               exhaustedPartitions.add(range.partition());
@@ -130,7 +123,7 @@ public class KafkaOffsetGen {
               long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents));
               toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
             }
-            ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
+            ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset);
           }
         }
       }
@@ -144,28 +137,10 @@ public class KafkaOffsetGen {
   }
 
   /**
-   * Helpers to deal with tricky scala <=> java conversions. (oh my!)
-   */
-  static class ScalaHelpers {
-
-    public static <K, V> Map<K, V> toScalaMap(HashMap<K, V> m) {
-      return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(Predef.conforms());
-    }
-
-    public static Set<String> toScalaSet(HashSet<String> s) {
-      return JavaConverters.asScalaSetConverter(s).asScala().toSet();
-    }
-
-    public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> m) {
-      return JavaConverters.mapAsJavaMapConverter(m).asJava();
-    }
-  }
-
-  /**
    * Kafka reset offset strategies.
    */
   enum KafkaResetOffsetStrategies {
-    LARGEST, SMALLEST
+    LATEST, EARLIEST
   }
 
   /**
@@ -175,20 +150,20 @@ public class KafkaOffsetGen {
 
     private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
     private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
-    private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
+    private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
     public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
     public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
   }
 
-  private final HashMap<String, String> kafkaParams;
+  private final HashMap<String, Object> kafkaParams;
   private final TypedProperties props;
   protected final String topicName;
 
   public KafkaOffsetGen(TypedProperties props) {
     this.props = props;
-    kafkaParams = new HashMap<String, String>();
+    kafkaParams = new HashMap<>();
     for (Object prop : props.keySet()) {
-      kafkaParams.put(prop.toString(), props.getString(prop.toString()));
+      kafkaParams.put(prop.toString(), props.get(prop.toString()));
     }
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
     topicName = props.getString(Config.KAFKA_TOPIC_NAME);
@@ -197,31 +172,25 @@ public class KafkaOffsetGen {
   public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
 
     // Obtain current metadata for the topic
-    KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
-    Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either =
-        cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName))));
-    if (either.isLeft()) {
-      // log errors. and bail out.
-      throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head());
-    }
-    Set<TopicAndPartition> topicPartitions = either.right().get();
+    KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
+    List<PartitionInfo> partitionInfoList;
+    partitionInfoList = consumer.partitionsFor(topicName);
+    Set<TopicPartition> topicPartitions = partitionInfoList.stream()
+            .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
 
     // Determine the offset ranges to read from
-    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
-    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
+    Map<TopicPartition, Long> fromOffsets;
     if (lastCheckpointStr.isPresent()) {
-      fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
+      fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
     } else {
       KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
-          .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
+              .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
       switch (autoResetValue) {
-        case SMALLEST:
-          fromOffsets =
-              new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+        case EARLIEST:
+          fromOffsets = consumer.beginningOffsets(topicPartitions);
           break;
-        case LARGEST:
-          fromOffsets =
-              new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
+        case LATEST:
+          fromOffsets = consumer.endOffsets(topicPartitions);
           break;
         default:
           throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
@@ -229,8 +198,7 @@ public class KafkaOffsetGen {
     }
 
     // Obtain the latest offsets.
-    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets =
-        new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
+    Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
 
     // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
     long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
@@ -245,15 +213,13 @@ public class KafkaOffsetGen {
 
   // check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
   // else return earliest offsets
-  private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(KafkaCluster cluster,
-      Option<String> lastCheckpointStr, Set<TopicAndPartition> topicPartitions) {
-    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
-        CheckpointUtils.strToOffsets(lastCheckpointStr.get());
-    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
-        new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+  private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
+                                                        Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
+    Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+    Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
 
     boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
-        .anyMatch(offset -> offset.getValue().offset() < earliestOffsets.get(offset.getKey()).offset());
+            .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
     return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
   }
 
@@ -261,7 +227,7 @@ public class KafkaOffsetGen {
     return topicName;
   }
 
-  public HashMap<String, String> getKafkaParams() {
+  public HashMap<String, Object> getKafkaParams() {
     return kafkaParams;
   }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index d5eb6c3..b58247f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -28,14 +28,14 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 
-import kafka.common.TopicAndPartition;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
-import org.apache.spark.streaming.kafka.KafkaTestUtils;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -44,6 +44,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 
@@ -84,13 +85,12 @@ public class TestKafkaSource extends UtilitiesTestBase {
   private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
     TypedProperties props = new TypedProperties();
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
-    props.setProperty("metadata.broker.list", testUtils.brokerAddress());
-    props.setProperty("auto.offset.reset", "smallest");
-    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+    props.setProperty("auto.offset.reset", "earliest");
     props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
         maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
             String.valueOf(Config.maxEventsFromKafkaSource));
+    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
     return props;
   }
 
@@ -214,10 +214,10 @@ public class TestKafkaSource extends UtilitiesTestBase {
     assertEquals(Option.empty(), fetch6.getBatch());
   }
 
-  private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
-    HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
+  private static HashMap<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
+    HashMap<TopicPartition, Long> map = new HashMap<>();
     for (int i = 0; i < partitions.length; i++) {
-      map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i]));
+      map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]);
     }
     return map;
   }
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties b/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
index 392720f..e256b8c 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
@@ -25,6 +25,6 @@ hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/
 #hoodie.deltastreamer.source.kafka.topic=uber_trips
 hoodie.deltastreamer.source.kafka.topic=impressions
 #Kafka props
-metadata.broker.list=localhost:9092
-auto.offset.reset=smallest
+bootstrap.servers=localhost:9092
+auto.offset.reset=earliest
 schema.registry.url=http://localhost:8081
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index c56c789..a7e7664 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -23,7 +23,7 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
-  <artifactId>hudi-spark-bundle</artifactId>
+  <artifactId>hudi-spark-bundle_2.11</artifactId>
   <packaging>jar</packaging>
 
   <properties>
@@ -32,7 +32,7 @@
   </properties>
 
   <build>
-     <plugins>
+    <plugins>
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
@@ -66,7 +66,7 @@
                 <includes>
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-client</include>
-                  <include>org.apache.hudi:hudi-spark</include>
+                  <include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
                   <include>org.apache.hudi:hudi-hive</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
                   <include>org.apache.hudi:hudi-timeline-service</include>
@@ -83,8 +83,8 @@
                   <include>org.antlr:stringtemplate</include>
                   <include>org.apache.parquet:parquet-avro</include>
 
-                  <include>com.twitter:bijection-avro_2.11</include>
-                  <include>com.twitter:bijection-core_2.11</include>
+                  <include>com.twitter:bijection-avro_${scala.binary.version}</include>
+                  <include>com.twitter:bijection-core_${scala.binary.version}</include>
                   <include>io.dropwizard.metrics:metrics-core</include>
                   <include>io.dropwizard.metrics:metrics-graphite</include>
                   <include>com.yammer.metrics:metrics-core</include>
@@ -190,7 +190,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark</artifactId>
+      <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
 
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index 110ff0f..3346144 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -24,7 +24,7 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
-  <artifactId>hudi-utilities-bundle</artifactId>
+  <artifactId>hudi-utilities-bundle_2.11</artifactId>
   <packaging>jar</packaging>
 
   <properties>
@@ -67,8 +67,8 @@
                 <includes>
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-client</include>
-                  <include>org.apache.hudi:hudi-utilities</include>
-                  <include>org.apache.hudi:hudi-spark</include>
+                  <include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
+                  <include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
                   <include>org.apache.hudi:hudi-hive</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
                   <include>org.apache.hudi:hudi-timeline-service</include>
@@ -85,8 +85,8 @@
                   <include>org.antlr:stringtemplate</include>
                   <include>org.apache.parquet:parquet-avro</include>
 
-                  <include>com.twitter:bijection-avro_2.11</include>
-                  <include>com.twitter:bijection-core_2.11</include>
+                  <include>com.twitter:bijection-avro_${scala.binary.version}</include>
+                  <include>com.twitter:bijection-core_${scala.binary.version}</include>
                   <include>io.confluent:kafka-avro-serializer</include>
                   <include>io.confluent:common-config</include>
                   <include>io.confluent:common-utils</include>
@@ -94,8 +94,8 @@
                   <include>io.dropwizard.metrics:metrics-core</include>
                   <include>io.dropwizard.metrics:metrics-graphite</include>
                   <include>com.yammer.metrics:metrics-core</include>
-                  <include>org.apache.spark:spark-streaming-kafka-0-8_2.11</include>
-                  <include>org.apache.kafka:kafka_2.11</include>
+                  <include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include>
+                  <include>org.apache.kafka:kafka_${scala.binary.version}</include>
                   <include>com.101tec:zkclient</include>
                   <include>org.apache.kafka:kafka-clients</include>
 
@@ -200,12 +200,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-spark</artifactId>
+      <artifactId>hudi-spark_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-utilities</artifactId>
+      <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
 
diff --git a/pom.xml b/pom.xml
index 58355ac..3b3a95f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
 
     <java.version>1.8</java.version>
     <fasterxml.version>2.6.7</fasterxml.version>
+    <kafka.version>2.0.0</kafka.version>
     <glassfish.version>2.17</glassfish.version>
     <parquet.version>1.10.1</parquet.version>
     <junit.version>4.11</junit.version>
@@ -91,7 +92,7 @@
     <spark.version>2.4.4</spark.version>
     <avro.version>1.8.2</avro.version>
     <scala.version>2.11.8</scala.version>
-    <scala.libversion>2.11</scala.libversion>
+    <scala.binary.version>2.11</scala.binary.version>
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
     <scala-maven-plugin.version>3.3.1</scala-maven-plugin.version>
     <scalatest.version>3.0.1</scalatest.version>
@@ -423,8 +424,8 @@
       </dependency>
       <dependency>
         <groupId>com.fasterxml.jackson.module</groupId>
-        <artifactId>jackson-module-scala_2.11</artifactId>
-        <version>${fasterxml.version}</version>
+        <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+        <version>${fasterxml.version}.1</version>
       </dependency>
 
       <!-- Glassfish -->
@@ -470,13 +471,13 @@
       <!-- Spark -->
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-core_2.11</artifactId>
+        <artifactId>spark-core_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-sql_2.11</artifactId>
+        <artifactId>spark-sql_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
@@ -484,7 +485,7 @@
       <!-- Spark (Packages) -->
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-avro_2.11</artifactId>
+        <artifactId>spark-avro_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
@@ -1040,6 +1041,43 @@
         </plugins>
       </build>
     </profile>
+    <!-- Exists for backwards compatibility; profile doesn't do anything -->
+    <profile>
+      <id>scala-2.11</id>
+    </profile>
+
+    <profile>
+      <id>scala-2.12</id>
+      <properties>
+        <scala.version>2.12.10</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>enforce-versions</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <bannedDependencies>
+                      <excludes combine.children="append">
+                        <exclude>*:*_2.11</exclude>
+                      </excludes>
+                    </bannedDependencies>
+                  </rules>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
   <issueManagement>