You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/24 10:44:01 UTC

[2/4] flink git commit: [FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

This closes #4321


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6abd4029
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6abd4029
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6abd4029

Branch: refs/heads/release-1.3
Commit: 6abd40299040ca646e7e94313dd1e0d25a4c8d82
Parents: ab28c8e
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 13 11:07:28 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 18:39:38 2017 +0800

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  2 +-
 .../kafka/KafkaTestEnvironmentImpl.java         | 21 +++++++++++++++-----
 2 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6abd4029/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 9fb0e61..84f1f3d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<kafka.version>0.10.0.1</kafka.version>
+		<kafka.version>0.10.2.1</kafka.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/6abd4029/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 3094172..7bcd88b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
@@ -42,8 +42,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +61,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import scala.collection.mutable.ArraySeq;
+
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -250,11 +254,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			for (int i = 0; i < numKafkaServers; i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
-				if(secureMode) {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				if (secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
 				} else {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
 				}
+				brokerConnectionString +=  ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -418,7 +429,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 			try {
 				scala.Option<String> stringNone = scala.Option.apply(null);
-				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
 				server.startup();
 				return server;
 			}