You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by tr...@apache.org on 2020/02/26 02:36:52 UTC
[flume] branch trunk updated: FLUME-3355 Upgrade Kafka to 2.3.1.
(#316)
This is an automated email from the ASF dual-hosted git repository.
tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 02bb8e6 FLUME-3355 Upgrade Kafka to 2.3.1. (#316)
02bb8e6 is described below
commit 02bb8e6854c8e5d1850a07a7fe5e3338178d679f
Author: Masatake Iwasaki <iw...@apache.org>
AuthorDate: Wed Feb 26 11:36:43 2020 +0900
FLUME-3355 Upgrade Kafka to 2.3.1. (#316)
---
.../src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java | 2 +-
.../java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java | 2 +-
.../src/main/java/org/apache/flume/source/kafka/KafkaSource.java | 4 ++--
.../src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java | 2 +-
pom.xml | 4 ++--
5 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 852b4bd..45fe03b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -315,7 +315,7 @@ public class KafkaChannel extends BasicChannelSemantics {
private void migrateOffsets() {
try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener");
+ Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
if (kafkaOffsets == null) {
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
index 7657aa6..86620b6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -145,7 +145,7 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
if (hasZookeeperOffsets) {
KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(),
JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
- "kafka.server", "SessionExpireListener");
+ "kafka.server", "SessionExpireListener", scala.Option.empty());
zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
Long offset = tenthOffset + 1;
zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index b02285d..925b2ee 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -479,7 +479,7 @@ public class KafkaSource extends AbstractPollableSource
private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) {
try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener")) {
+ Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty())) {
List<Broker> brokerList =
JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava();
List<BrokerEndPoint> endPoints = brokerList.stream()
@@ -563,7 +563,7 @@ public class KafkaSource extends AbstractPollableSource
private void migrateOffsets(String topicStr) {
try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener");
+ Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty());
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
getKafkaOffsets(consumer, topicStr);
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d866c98..acc674f 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -881,7 +881,7 @@ public class TestKafkaSource {
if (hasZookeeperOffsets) {
KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(),
JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
- "kafka.server", "SessionExpireListener");
+ "kafka.server", "SessionExpireListener", scala.Option.empty());
zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
Long offset = tenthOffset + 1;
zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
diff --git a/pom.xml b/pom.xml
index 5ef4584..2e606c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,7 @@ limitations under the License.
<jetty.version>9.4.6.v20170531</jetty.version>
<joda-time.version>2.9.9</joda-time.version>
<junit.version>4.10</junit.version>
- <kafka.version>2.0.1</kafka.version>
+ <kafka.version>2.3.1</kafka.version>
<kite.version>1.0.0</kite.version>
<kudu.version>1.10.0</kudu.version>
<lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>
@@ -116,7 +116,7 @@ limitations under the License.
<xalan.version>2.7.2</xalan.version>
<xerces.version>2.9.1</xerces.version>
<wiremock.version>1.53</wiremock.version>
- <zookeeper.version>3.4.5</zookeeper.version>
+ <zookeeper.version>3.4.13</zookeeper.version>
</properties>
<modules>