You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by tr...@apache.org on 2020/02/26 02:36:52 UTC

[flume] branch trunk updated: FLUME-3355 Upgrade Kafka to 2.3.1. (#316)

This is an automated email from the ASF dual-hosted git repository.

tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02bb8e6  FLUME-3355 Upgrade Kafka to 2.3.1. (#316)
02bb8e6 is described below

commit 02bb8e6854c8e5d1850a07a7fe5e3338178d679f
Author: Masatake Iwasaki <iw...@apache.org>
AuthorDate: Wed Feb 26 11:36:43 2020 +0900

    FLUME-3355 Upgrade Kafka to 2.3.1. (#316)
---
 .../src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java    | 2 +-
 .../java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java  | 2 +-
 .../src/main/java/org/apache/flume/source/kafka/KafkaSource.java      | 4 ++--
 .../src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java  | 2 +-
 pom.xml                                                               | 4 ++--
 5 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 852b4bd..45fe03b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -315,7 +315,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   private void migrateOffsets() {
     try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
             JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
-            Time.SYSTEM, "kafka.server", "SessionExpireListener");
+            Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty());
          KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
       if (kafkaOffsets == null) {
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
index 7657aa6..86620b6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -145,7 +145,7 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
     if (hasZookeeperOffsets) {
       KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(),
               JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
-              "kafka.server", "SessionExpireListener");
+              "kafka.server", "SessionExpireListener", scala.Option.empty());
       zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
       Long offset = tenthOffset + 1;
       zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index b02285d..925b2ee 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -479,7 +479,7 @@ public class KafkaSource extends AbstractPollableSource
   private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) {
     try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
             JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
-            Time.SYSTEM, "kafka.server", "SessionExpireListener")) {
+            Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty())) {
       List<Broker> brokerList =
               JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava();
       List<BrokerEndPoint> endPoints = brokerList.stream()
@@ -563,7 +563,7 @@ public class KafkaSource extends AbstractPollableSource
   private void migrateOffsets(String topicStr) {
     try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
             JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
-            Time.SYSTEM, "kafka.server", "SessionExpireListener");
+            Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty());
          KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
           getKafkaOffsets(consumer, topicStr);
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d866c98..acc674f 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -881,7 +881,7 @@ public class TestKafkaSource {
     if (hasZookeeperOffsets) {
       KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(),
               JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
-              "kafka.server", "SessionExpireListener");
+              "kafka.server", "SessionExpireListener", scala.Option.empty());
       zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
       Long offset = tenthOffset + 1;
       zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
diff --git a/pom.xml b/pom.xml
index 5ef4584..2e606c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,7 @@ limitations under the License.
     <jetty.version>9.4.6.v20170531</jetty.version>
     <joda-time.version>2.9.9</joda-time.version>
     <junit.version>4.10</junit.version>
-    <kafka.version>2.0.1</kafka.version>
+    <kafka.version>2.3.1</kafka.version>
     <kite.version>1.0.0</kite.version>
     <kudu.version>1.10.0</kudu.version>
     <lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>
@@ -116,7 +116,7 @@ limitations under the License.
     <xalan.version>2.7.2</xalan.version>
     <xerces.version>2.9.1</xerces.version>
     <wiremock.version>1.53</wiremock.version>
-    <zookeeper.version>3.4.5</zookeeper.version>
+    <zookeeper.version>3.4.13</zookeeper.version>
   </properties>
 
   <modules>