You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/24 10:44:01 UTC
[2/4] flink git commit: [FLINK-7174] [kafka connector] Bump Kafka
0.10 dependency to 0.10.2.1
[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1
This closes #4321
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6abd4029
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6abd4029
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6abd4029
Branch: refs/heads/release-1.3
Commit: 6abd40299040ca646e7e94313dd1e0d25a4c8d82
Parents: ab28c8e
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 13 11:07:28 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 18:39:38 2017 +0800
----------------------------------------------------------------------
.../flink-connector-kafka-0.10/pom.xml | 2 +-
.../kafka/KafkaTestEnvironmentImpl.java | 21 +++++++++++++++-----
2 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6abd4029/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 9fb0e61..84f1f3d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
- <kafka.version>0.10.0.1</kafka.version>
+ <kafka.version>0.10.2.1</kafka.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/6abd4029/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 3094172..7bcd88b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
import kafka.admin.AdminUtils;
import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.list.UnmodifiableList;
@@ -42,8 +42,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +61,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import scala.collection.mutable.ArraySeq;
+
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -250,11 +254,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
for (int i = 0; i < numKafkaServers; i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- if(secureMode) {
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+ if (secureMode) {
+ brokerConnectionString += hostAndPortToUrlString(
+ KafkaTestEnvironment.KAFKA_HOST,
+ brokers.get(i).socketServer().boundPort(
+ ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
} else {
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ brokerConnectionString += hostAndPortToUrlString(
+ KafkaTestEnvironment.KAFKA_HOST,
+ brokers.get(i).socketServer().boundPort(
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
}
+ brokerConnectionString += ",";
}
LOG.info("ZK and KafkaServer started.");
@@ -418,7 +429,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
try {
scala.Option<String> stringNone = scala.Option.apply(null);
- KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+ KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
server.startup();
return server;
}