You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by ju...@apache.org on 2018/09/19 23:59:16 UTC

bigtop git commit: Add missing scala patch for flume

Repository: bigtop
Updated Branches:
  refs/heads/master fc97b9ee7 -> 288e129d1


Add missing scala patch for flume

The commit is to add patch to fix flume/kafka build issue which is
missed in BIGTOP-3082.

Signed-off-by: Jun He <ju...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/288e129d
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/288e129d
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/288e129d

Branch: refs/heads/master
Commit: 288e129d1832e10393ce6fc13259a735008d9537
Parents: fc97b9e
Author: Jun He <ju...@linaro.org>
Authored: Thu Sep 20 07:41:52 2018 +0800
Committer: Jun He <ju...@apache.org>
Committed: Wed Sep 19 23:58:39 2018 +0000

----------------------------------------------------------------------
 .../src/common/flume/patch2-scala-symbol.diff   | 96 ++++++++++++++++++++
 1 file changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/288e129d/bigtop-packages/src/common/flume/patch2-scala-symbol.diff
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/common/flume/patch2-scala-symbol.diff b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff
new file mode 100644
index 0000000..17833d4
--- /dev/null
+++ b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff
@@ -0,0 +1,96 @@
+From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001
+From: Anton Chevychalov <ca...@arenadata.io>
+Date: Wed, 25 Oct 2017 15:47:48 +0300
+Subject: [PATCH] Fix kafka and Scala 2.11 trouble
+
+---
+ .../org/apache/flume/channel/kafka/KafkaChannel.java   |  4 ++--
+ .../org/apache/flume/source/kafka/KafkaSource.java     | 18 ++++++++++++++----
+ 2 files changed, 16 insertions(+), 6 deletions(-)
+
+diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+index 5bd9be0..46494fd 100644
+--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
++++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+@@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+-import static scala.collection.JavaConverters.asJavaListConverter;
++import scala.collection.JavaConverters;
+ 
+ public class KafkaChannel extends BasicChannelSemantics {
+ 
+@@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics {
+   private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
+     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
+-    List<String> partitions = asJavaListConverter(
++    List<String> partitions = JavaConverters.seqAsJavaListConverter(
+         client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
+     for (String partition : partitions) {
+       TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
+diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+index ffdc96e..960e9e8 100644
+--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
++++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+@@ -28,8 +28,10 @@ import java.util.Properties;
+ import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.regex.Pattern;
++import java.util.stream.Collectors;
+ 
+ import com.google.common.annotations.VisibleForTesting;
++import kafka.cluster.Broker;
+ import kafka.cluster.BrokerEndPoint;
+ import kafka.utils.ZKGroupTopicDirs;
+ import kafka.utils.ZkUtils;
+@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
+ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+ import org.apache.kafka.common.PartitionInfo;
+ import org.apache.kafka.common.TopicPartition;
++import org.apache.kafka.common.network.ListenerName;
+ import org.apache.kafka.common.protocol.SecurityProtocol;
+ import org.apache.kafka.common.security.JaasUtils;
+ import org.slf4j.Logger;
+@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.base.Optional;
+ import scala.Option;
++import scala.collection.Seq;
+ 
+ import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
+-import static scala.collection.JavaConverters.asJavaListConverter;
++import scala.collection.JavaConverters;
+ 
+ /**
+  * A Source for Kafka which reads messages from kafka topics.
+@@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource
+     ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
+         JaasUtils.isZkSecurityEnabled());
+     try {
+-      List<BrokerEndPoint> endPoints =
+-          asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
++      Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster();
++      List<Broker> brokerList = JavaConverters.seqAsJavaListConverter(
++          zkUtils.getAllBrokersInCluster()).asJava();
++      List<BrokerEndPoint> endPoints = brokerList.stream()
++              .map(broker -> broker.getBrokerEndPoint(
++                  ListenerName.forSecurityProtocol(securityProtocol))
++              )
++              .collect(Collectors.toList());
+       List<String> connections = new ArrayList<>();
+       for (BrokerEndPoint endPoint : endPoints) {
+         connections.add(endPoint.connectionString());
+@@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource
+                                                                      String topicStr) {
+     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
+-    List<String> partitions = asJavaListConverter(
++    List<String> partitions = JavaConverters.seqAsJavaListConverter(
+         client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
+     for (String partition : partitions) {
+       TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
+-- 
+1.9.1
+