You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/01/17 22:02:32 UTC
[incubator-hudi] branch master updated: [HUDI-238] Make Hudi
support Scala 2.12 (#1226)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 292c1e2 [HUDI-238] Make Hudi support Scala 2.12 (#1226)
292c1e2 is described below
commit 292c1e2ff436a711cbbb53ad9b1f6232121d53ec
Author: wenningd <we...@gmail.com>
AuthorDate: Fri Jan 17 14:02:21 2020 -0800
[HUDI-238] Make Hudi support Scala 2.12 (#1226)
* [HUDI-238] Rename scala related artifactId & add maven profile to support Scala 2.12
---
LICENSE | 2 +
dev/change-scala-version.sh | 66 ++++++++++++
docker/demo/config/kafka-source.properties | 4 +-
docker/hoodie/hadoop/hive_base/pom.xml | 4 +-
docker/hoodie/hadoop/pom.xml | 2 +-
hudi-cli/pom.xml | 12 ++-
hudi-client/pom.xml | 4 +-
hudi-integ-test/pom.xml | 4 +-
hudi-spark/pom.xml | 19 ++--
hudi-spark/run_hoodie_app.sh | 2 +-
hudi-utilities/pom.xml | 39 +++++--
.../hudi/utilities/sources/AvroKafkaSource.java | 17 +--
.../hudi/utilities/sources/JsonKafkaSource.java | 13 ++-
.../utilities/sources/helpers/KafkaOffsetGen.java | 120 ++++++++-------------
.../hudi/utilities/sources/TestKafkaSource.java | 22 ++--
.../delta-streamer-config/kafka-source.properties | 4 +-
packaging/hudi-spark-bundle/pom.xml | 12 +--
packaging/hudi-utilities-bundle/pom.xml | 18 ++--
pom.xml | 50 +++++++--
19 files changed, 261 insertions(+), 153 deletions(-)
diff --git a/LICENSE b/LICENSE
index 341988e..6b2997f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -245,6 +245,8 @@ This product includes code from Apache Spark
* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package
+* dev/change-scala-version.sh copied from https://github.com/apache/spark/blob/branch-2.4/dev/change-scala-version.sh
+
Copyright: 2014 and onwards The Apache Software Foundation
Home page: http://spark.apache.org/
License: http://www.apache.org/licenses/LICENSE-2.0
diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh
new file mode 100755
index 0000000..151581d
--- /dev/null
+++ b/dev/change-scala-version.sh
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+VALID_VERSIONS=( 2.11 2.12 )
+
+usage() {
+ echo "Usage: $(basename $0) [-h|--help] <version>
+where :
+ -h| --help Display this help text
+ valid version values : ${VALID_VERSIONS[*]}
+" 1>&2
+ exit 1
+}
+
+if [[ ($# -ne 1) || ( $1 == "--help") || $1 == "-h" ]]; then
+ usage
+fi
+
+TO_VERSION=$1
+
+check_scala_version() {
+ for i in ${VALID_VERSIONS[*]}; do [ $i = "$1" ] && return 0; done
+ echo "Invalid Scala version: $1. Valid versions: ${VALID_VERSIONS[*]}" 1>&2
+ exit 1
+}
+
+check_scala_version "$TO_VERSION"
+
+if [ $TO_VERSION = "2.11" ]; then
+ FROM_VERSION="2.12"
+else
+ FROM_VERSION="2.11"
+fi
+
+sed_i() {
+ sed -e "$1" "$2" > "$2.tmp" && mv "$2.tmp" "$2"
+}
+
+export -f sed_i
+
+BASEDIR=$(dirname $0)/..
+find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \
+ -exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \;
+
+# Also update <scala.binary.version> in parent POM
+# Match any scala binary version to ensure idempotency
+sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'$TO_VERSION'</' \
+ "$BASEDIR/pom.xml"
diff --git a/docker/demo/config/kafka-source.properties b/docker/demo/config/kafka-source.properties
index dbeba47..4d7f77c 100644
--- a/docker/demo/config/kafka-source.properties
+++ b/docker/demo/config/kafka-source.properties
@@ -26,5 +26,5 @@ hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.a
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=stock_ticks
#Kafka props
-metadata.broker.list=kafkabroker:9092
-auto.offset.reset=smallest
+bootstrap.servers=kafkabroker:9092
+auto.offset.reset=earliest
diff --git a/docker/hoodie/hadoop/hive_base/pom.xml b/docker/hoodie/hadoop/hive_base/pom.xml
index 497d9d4..5af64d0 100644
--- a/docker/hoodie/hadoop/hive_base/pom.xml
+++ b/docker/hoodie/hadoop/hive_base/pom.xml
@@ -57,9 +57,9 @@
<tasks>
<copy file="${project.basedir}/../../../../packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-${project.version}.jar" tofile="target/hoodie-hadoop-mr-bundle.jar" />
<copy file="${project.basedir}/../../../../packaging/hudi-hive-bundle/target/hudi-hive-bundle-${project.version}.jar" tofile="target/hoodie-hive-bundle.jar" />
- <copy file="${project.basedir}/../../../../packaging/hudi-spark-bundle/target/hudi-spark-bundle-${project.version}.jar" tofile="target/hoodie-spark-bundle.jar" />
+ <copy file="${project.basedir}/../../../../packaging/hudi-spark-bundle/target/hudi-spark-bundle_${scala.binary.version}-${project.version}.jar" tofile="target/hoodie-spark-bundle.jar" />
<copy
- file="${project.basedir}/../../../../packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-${project.version}.jar"
+ file="${project.basedir}/../../../../packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_${scala.binary.version}-${project.version}.jar"
tofile="target/hoodie-utilities.jar"/>
</tasks>
</configuration>
diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml
index 55e5c65..84380f7 100644
--- a/docker/hoodie/hadoop/pom.xml
+++ b/docker/hoodie/hadoop/pom.xml
@@ -42,7 +42,7 @@
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark-bundle</artifactId>
+ <artifactId>hudi-spark-bundle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 47f7c7f..d8e1903 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -147,7 +147,7 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-utilities</artifactId>
+ <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
@@ -160,16 +160,22 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 02842e9..63a6218 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -95,11 +95,11 @@
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<!-- Dropwizard Metrics -->
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index af82818..6c74a12 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -54,7 +54,7 @@
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark</artifactId>
+ <artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
@@ -83,7 +83,7 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark</artifactId>
+ <artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index e83d96d..bf18735 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -23,7 +23,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>hudi-spark</artifactId>
+ <artifactId>hudi-spark_2.11</artifactId>
<packaging>jar</packaging>
<properties>
@@ -190,13 +190,20 @@
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
+ <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
+ <exclusions>
+ <exclusion>
+ <!-- this version to conflict to spark-core_2.12 -->
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Parquet -->
@@ -208,17 +215,17 @@
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<!-- Spark (Packages) -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-avro_2.11</artifactId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
@@ -293,7 +300,7 @@
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.11</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh
index 17ba925..88ae209 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_app.sh
@@ -25,7 +25,7 @@ function error_exit {
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
#Ensure we pick the right jar even for hive11 builds
-HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle-*.jar | grep -v source | head -1`
+HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1`
if [ -z "$HADOOP_CONF_DIR" ]; then
echo "setting hadoop conf dir"
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 53a3a1a..59aae72 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -23,7 +23,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>hudi-utilities</artifactId>
+ <artifactId>hudi-utilities_2.11</artifactId>
<packaging>jar</packaging>
<properties>
@@ -109,7 +109,7 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark</artifactId>
+ <artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
@@ -128,19 +128,25 @@
<!-- Fasterxml -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
+ <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
@@ -151,7 +157,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
@@ -162,20 +168,26 @@
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-avro_2.11</artifactId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
+ <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ </dependency>
<!-- Dropwizard Metrics -->
<dependency>
@@ -197,8 +209,8 @@
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>bijection-avro_2.11</artifactId>
- <version>0.9.2</version>
+ <artifactId>bijection-avro_${scala.binary.version}</artifactId>
+ <version>0.9.3</version>
</dependency>
<!-- Kafka -->
@@ -223,6 +235,13 @@
<version>3.0.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Httpcomponents -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 18ebff4..b213988 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -24,16 +24,17 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
-import io.confluent.kafka.serializers.KafkaAvroDecoder;
-import kafka.serializer.StringDecoder;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+import org.apache.spark.streaming.kafka010.OffsetRange;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
@@ -47,6 +48,8 @@ public class AvroKafkaSource extends AvroSource {
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
+ props.put("key.deserializer", StringDeserializer.class);
+ props.put("value.deserializer", KafkaAvroDeserializer.class);
offsetGen = new KafkaOffsetGen(props);
}
@@ -64,9 +67,7 @@ public class AvroKafkaSource extends AvroSource {
}
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
- JavaRDD<GenericRecord> recordRDD =
- KafkaUtils.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class,
- offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj);
- return recordRDD;
+ return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index bd922ac..51a1ae1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -24,14 +24,15 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
-import kafka.serializer.StringDecoder;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+import org.apache.spark.streaming.kafka010.OffsetRange;
/**
* Read json kafka data.
@@ -45,6 +46,8 @@ public class JsonKafkaSource extends JsonSource {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(properties, sparkContext, sparkSession, schemaProvider);
+ properties.put("key.deserializer", StringDeserializer.class);
+ properties.put("value.deserializer", StringDeserializer.class);
offsetGen = new KafkaOffsetGen(properties);
}
@@ -61,7 +64,7 @@ public class JsonKafkaSource extends JsonSource {
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
- return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class,
- offsetGen.getKafkaParams(), offsetRanges).values();
+ return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index c17a5cf..ed5e4e9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -22,30 +22,24 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
-import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.streaming.kafka.KafkaCluster;
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
-import scala.Predef;
-import scala.collection.JavaConverters;
-import scala.collection.immutable.Map;
-import scala.collection.immutable.Set;
-import scala.collection.mutable.ArrayBuffer;
-import scala.collection.mutable.StringBuilder;
-import scala.util.Either;
-
/**
* Source to read data from Kafka, incrementally.
*/
@@ -58,8 +52,8 @@ public class KafkaOffsetGen {
/**
* Reconstruct checkpoint from string.
*/
- public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String checkpointStr) {
- HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<>();
+ public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) {
+ HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
if (checkpointStr.length() == 0) {
return offsetMap;
}
@@ -67,8 +61,7 @@ public class KafkaOffsetGen {
String topic = splits[0];
for (int i = 1; i < splits.length; i++) {
String[] subSplits = splits[i].split(":");
- offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])),
- new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1])));
+ offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
}
return offsetMap;
}
@@ -83,7 +76,7 @@ public class KafkaOffsetGen {
// at least 1 partition will be present.
sb.append(ranges[0].topic() + ",");
sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
- .collect(Collectors.joining(",")));
+ .collect(Collectors.joining(",")));
return sb.toString();
}
@@ -94,32 +87,32 @@ public class KafkaOffsetGen {
* @param toOffsetMap offsets of where each partitions is currently at
* @param numEvents maximum number of events to read.
*/
- public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, LeaderOffset> fromOffsetMap,
- HashMap<TopicAndPartition, LeaderOffset> toOffsetMap, long numEvents) {
+ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap,
+ Map<TopicPartition, Long> toOffsetMap, long numEvents) {
Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
// Create initial offset ranges for each 'to' partition, with from = to offsets.
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
toOffsetMap.entrySet().stream().map(e -> {
- TopicAndPartition tp = e.getKey();
- long fromOffset = fromOffsetMap.getOrDefault(tp, new LeaderOffset("", -1, 0)).offset();
+ TopicPartition tp = e.getKey();
+ long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
return OffsetRange.create(tp, fromOffset, fromOffset);
}).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
long allocedEvents = 0;
- java.util.Set<Integer> exhaustedPartitions = new HashSet<>();
+ Set<Integer> exhaustedPartitions = new HashSet<>();
// keep going until we have events to allocate and partitions still not exhausted.
while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
long remainingEvents = numEvents - allocedEvents;
long eventsPerPartition =
- (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
+ (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
// Allocate the remaining events to non-exhausted partitions, in round robin fashion
for (int i = 0; i < ranges.length; i++) {
OffsetRange range = ranges[i];
if (!exhaustedPartitions.contains(range.partition())) {
- long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset();
+ long toOffsetMax = toOffsetMap.get(range.topicPartition());
long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
if (toOffset == toOffsetMax) {
exhaustedPartitions.add(range.partition());
@@ -130,7 +123,7 @@ public class KafkaOffsetGen {
long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents));
toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
}
- ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
+ ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset);
}
}
}
@@ -144,28 +137,10 @@ public class KafkaOffsetGen {
}
/**
- * Helpers to deal with tricky scala <=> java conversions. (oh my!)
- */
- static class ScalaHelpers {
-
- public static <K, V> Map<K, V> toScalaMap(HashMap<K, V> m) {
- return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(Predef.conforms());
- }
-
- public static Set<String> toScalaSet(HashSet<String> s) {
- return JavaConverters.asScalaSetConverter(s).asScala().toSet();
- }
-
- public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> m) {
- return JavaConverters.mapAsJavaMapConverter(m).asJava();
- }
- }
-
- /**
* Kafka reset offset strategies.
*/
enum KafkaResetOffsetStrategies {
- LARGEST, SMALLEST
+ LATEST, EARLIEST
}
/**
@@ -175,20 +150,20 @@ public class KafkaOffsetGen {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
- private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
+ private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
- private final HashMap<String, String> kafkaParams;
+ private final HashMap<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
public KafkaOffsetGen(TypedProperties props) {
this.props = props;
- kafkaParams = new HashMap<String, String>();
+ kafkaParams = new HashMap<>();
for (Object prop : props.keySet()) {
- kafkaParams.put(prop.toString(), props.getString(prop.toString()));
+ kafkaParams.put(prop.toString(), props.get(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
@@ -197,31 +172,25 @@ public class KafkaOffsetGen {
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
// Obtain current metadata for the topic
- KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
- Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either =
- cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName))));
- if (either.isLeft()) {
- // log errors. and bail out.
- throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head());
- }
- Set<TopicAndPartition> topicPartitions = either.right().get();
+ KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
+ List<PartitionInfo> partitionInfoList;
+ partitionInfoList = consumer.partitionsFor(topicName);
+ Set<TopicPartition> topicPartitions = partitionInfoList.stream()
+ .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
// Determine the offset ranges to read from
- HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
- HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
+ Map<TopicPartition, Long> fromOffsets;
if (lastCheckpointStr.isPresent()) {
- fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
+ fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
- .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
+ .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
- case SMALLEST:
- fromOffsets =
- new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+ case EARLIEST:
+ fromOffsets = consumer.beginningOffsets(topicPartitions);
break;
- case LARGEST:
- fromOffsets =
- new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
+ case LATEST:
+ fromOffsets = consumer.endOffsets(topicPartitions);
break;
default:
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
@@ -229,8 +198,7 @@ public class KafkaOffsetGen {
}
// Obtain the latest offsets.
- HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets =
- new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
+ Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
@@ -245,15 +213,13 @@ public class KafkaOffsetGen {
// check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
// else return earliest offsets
- private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(KafkaCluster cluster,
- Option<String> lastCheckpointStr, Set<TopicAndPartition> topicPartitions) {
- HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
- CheckpointUtils.strToOffsets(lastCheckpointStr.get());
- HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
- new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+ private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
+ Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
+ Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+ Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
- .anyMatch(offset -> offset.getValue().offset() < earliestOffsets.get(offset.getKey()).offset());
+ .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
@@ -261,7 +227,7 @@ public class KafkaOffsetGen {
return topicName;
}
- public HashMap<String, String> getKafkaParams() {
+ public HashMap<String, Object> getKafkaParams() {
return kafkaParams;
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index d5eb6c3..b58247f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -28,14 +28,14 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
-import kafka.common.TopicAndPartition;
import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
-import org.apache.spark.streaming.kafka.KafkaTestUtils;
-import org.apache.spark.streaming.kafka.OffsetRange;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -44,6 +44,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.UUID;
import static org.junit.Assert.assertEquals;
@@ -84,13 +85,12 @@ public class TestKafkaSource extends UtilitiesTestBase {
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
- props.setProperty("metadata.broker.list", testUtils.brokerAddress());
- props.setProperty("auto.offset.reset", "smallest");
- props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+ props.setProperty("auto.offset.reset", "earliest");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.maxEventsFromKafkaSource));
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
return props;
}
@@ -214,10 +214,10 @@ public class TestKafkaSource extends UtilitiesTestBase {
assertEquals(Option.empty(), fetch6.getBatch());
}
- private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
- HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
+ private static HashMap<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
+ HashMap<TopicPartition, Long> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
- map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i]));
+ map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]);
}
return map;
}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties b/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
index 392720f..e256b8c 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
@@ -25,6 +25,6 @@ hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic=impressions
#Kafka props
-metadata.broker.list=localhost:9092
-auto.offset.reset=smallest
+bootstrap.servers=localhost:9092
+auto.offset.reset=earliest
schema.registry.url=http://localhost:8081
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index c56c789..a7e7664 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -23,7 +23,7 @@
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>hudi-spark-bundle</artifactId>
+ <artifactId>hudi-spark-bundle_2.11</artifactId>
<packaging>jar</packaging>
<properties>
@@ -32,7 +32,7 @@
</properties>
<build>
- <plugins>
+ <plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
@@ -66,7 +66,7 @@
<includes>
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-client</include>
- <include>org.apache.hudi:hudi-spark</include>
+ <include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-hive</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>org.apache.hudi:hudi-timeline-service</include>
@@ -83,8 +83,8 @@
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>
- <include>com.twitter:bijection-avro_2.11</include>
- <include>com.twitter:bijection-core_2.11</include>
+ <include>com.twitter:bijection-avro_${scala.binary.version}</include>
+ <include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.dropwizard.metrics:metrics-core</include>
<include>io.dropwizard.metrics:metrics-graphite</include>
<include>com.yammer.metrics:metrics-core</include>
@@ -190,7 +190,7 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark</artifactId>
+ <artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index 110ff0f..3346144 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -24,7 +24,7 @@
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>hudi-utilities-bundle</artifactId>
+ <artifactId>hudi-utilities-bundle_2.11</artifactId>
<packaging>jar</packaging>
<properties>
@@ -67,8 +67,8 @@
<includes>
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-client</include>
- <include>org.apache.hudi:hudi-utilities</include>
- <include>org.apache.hudi:hudi-spark</include>
+ <include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
+ <include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-hive</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>org.apache.hudi:hudi-timeline-service</include>
@@ -85,8 +85,8 @@
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>
- <include>com.twitter:bijection-avro_2.11</include>
- <include>com.twitter:bijection-core_2.11</include>
+ <include>com.twitter:bijection-avro_${scala.binary.version}</include>
+ <include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:common-config</include>
<include>io.confluent:common-utils</include>
@@ -94,8 +94,8 @@
<include>io.dropwizard.metrics:metrics-core</include>
<include>io.dropwizard.metrics:metrics-graphite</include>
<include>com.yammer.metrics:metrics-core</include>
- <include>org.apache.spark:spark-streaming-kafka-0-8_2.11</include>
- <include>org.apache.kafka:kafka_2.11</include>
+ <include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include>
+ <include>org.apache.kafka:kafka_${scala.binary.version}</include>
<include>com.101tec:zkclient</include>
<include>org.apache.kafka:kafka-clients</include>
@@ -200,12 +200,12 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark</artifactId>
+ <artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-utilities</artifactId>
+ <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/pom.xml b/pom.xml
index 58355ac..3b3a95f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<java.version>1.8</java.version>
<fasterxml.version>2.6.7</fasterxml.version>
+ <kafka.version>2.0.0</kafka.version>
<glassfish.version>2.17</glassfish.version>
<parquet.version>1.10.1</parquet.version>
<junit.version>4.11</junit.version>
@@ -91,7 +92,7 @@
<spark.version>2.4.4</spark.version>
<avro.version>1.8.2</avro.version>
<scala.version>2.11.8</scala.version>
- <scala.libversion>2.11</scala.libversion>
+ <scala.binary.version>2.11</scala.binary.version>
<apache-rat-plugin.version>0.12</apache-rat-plugin.version>
<scala-maven-plugin.version>3.3.1</scala-maven-plugin.version>
<scalatest.version>3.0.1</scalatest.version>
@@ -423,8 +424,8 @@
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>${fasterxml.version}</version>
+ <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+ <version>${fasterxml.version}.1</version>
</dependency>
<!-- Glassfish -->
@@ -470,13 +471,13 @@
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
@@ -484,7 +485,7 @@
<!-- Spark (Packages) -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-avro_2.11</artifactId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
@@ -1040,6 +1041,43 @@
</plugins>
</build>
</profile>
+ <!-- Exists for backwards compatibility; profile doesn't do anything -->
+ <profile>
+ <id>scala-2.11</id>
+ </profile>
+
+ <profile>
+ <id>scala-2.12</id>
+ <properties>
+ <scala.version>2.12.10</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce-versions</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <bannedDependencies>
+ <excludes combine.children="append">
+ <exclude>*:*_2.11</exclude>
+ </excludes>
+ </bannedDependencies>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
<issueManagement>