You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/27 00:22:02 UTC
apex-malhar git commit: APEXMALHAR-2199 #closes #380 #resolve
#comment Simplify the zookeeper url parser to use whatever user specified and
support chroot path
Repository: apex-malhar
Updated Branches:
refs/heads/master 7d9386d2a -> 3f30b81a6
APEXMALHAR-2199 #closes #380 #resolve #comment Simplify the zookeeper url parser to use whatever user specified and support chroot path
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3f30b81a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3f30b81a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3f30b81a
Branch: refs/heads/master
Commit: 3f30b81a6e123879f33f95bc025c35808860fedc
Parents: 7d9386d
Author: Siyuan Hua <hs...@apache.org>
Authored: Fri Aug 26 16:58:12 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Fri Aug 26 16:58:28 2016 -0700
----------------------------------------------------------------------
.../contrib/kafka/HighlevelKafkaConsumer.java | 2 +-
.../contrib/kafka/KafkaConsumer.java | 27 ++++++++------------
.../contrib/kafka/KafkaMetadataUtil.java | 8 ++++--
.../contrib/kafka/KafkaInputOperatorTest.java | 10 +++-----
4 files changed, 22 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
index 2f7cece..5b9c5ed 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
@@ -122,7 +122,7 @@ public class HighlevelKafkaConsumer extends KafkaConsumer
// create high level consumer for every cluster
Properties config = new Properties();
config.putAll(consumerConfig);
- config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeperMap.get(cluster)));
+ config.setProperty("zookeeper.connect", zookeeperMap.get(cluster).iterator().next());
// create consumer connector will start a daemon thread to monitor the metadata change
// we want to start this thread until the operator is activated
standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config)));
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
index 805fdc4..a67ff48 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
@@ -86,8 +86,16 @@ public abstract class KafkaConsumer implements Closeable
protected String topic = "default_topic";
/**
- * A zookeeper map keyed by cluster id
- * It's mandatory field
+ * A zookeeper map keyed by cluster id.
+ * It's mandatory field <br>
+ * Each cluster should have only one connection string contain all nodes in the cluster <br>
+ * zookeeper chroot path is also supported <br>
+ *
+ * Single cluster zookeeper example: <br>
+ * node1:2181,node2:2181,node3:2181/your/kafka/data <br>
+ * Multi-cluster zookeeper example: <br>
+ * cluster1::node1:2181,node2:2181,node3:2181/cluster1;cluster2::node1:2181/cluster2
+ *
*/
@NotNull
@Bind(JavaSerializer.class)
@@ -535,20 +543,7 @@ public abstract class KafkaConsumer implements Closeable
for (String zk : zookeeper.split(";")) {
String[] parts = zk.split("::");
String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
- String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(",");
- String portId = "";
- for (int idx = hostNames.length - 1; idx >= 0; idx--) {
- String[] zkParts = hostNames[idx].split(":");
- if (zkParts.length == 2) {
- portId = zkParts[1];
- }
- if (!portId.isEmpty() && portId != "") {
- theClusters.put(clusterId, zkParts[0] + ":" + portId);
- } else {
- throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeper + "\n"
- + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2");
- }
- }
+ theClusters.put(clusterId, parts.length == 1 ? parts[0] : parts[1]);
}
return theClusters;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index af5045a..b9d4b1b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -105,10 +105,14 @@ public class KafkaMetadataUtil
}});
}
-
+ /**
+ * There is always only one string in zkHost
+ * @param zkHost
+ * @return
+ */
public static Set<String> getBrokers(Set<String> zkHost){
- ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$);
+ ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
Set<String> brokerHosts = new HashSet<String>();
for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
brokerHosts.add(b.connectionString());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index e4a4dec..f3af37f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -488,14 +488,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
consumer.setTopic(TEST_TOPIC);
testMeta.operator.setConsumer(consumer);
- testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181");
+ testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182/chroot/dir;cluster2::node4:2181");
latch.await(500, TimeUnit.MILLISECONDS);
- Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().zookeeperMap.size());
- Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().zookeeperMap.get("cluster1").size());
- Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").toString());
- Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().zookeeperMap.get("cluster2").size());
- Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").toString());
+ Assert.assertEquals("Total size of clusters ", 2, testMeta.operator.getConsumer().zookeeperMap.size());
+ Assert.assertEquals("Connection url for cluster1 ", "node0,node1,node2:2181,node3:2182/chroot/dir", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").iterator().next());
+ Assert.assertEquals("Connection url for cluster 2 ", "node4:2181", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").iterator().next());
}
}