You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/27 00:22:02 UTC

apex-malhar git commit: APEXMALHAR-2199 #closes #380 #resolve #comment Simplify the zookeeper url parser to use whatever user specified and support chroot path

Repository: apex-malhar
Updated Branches:
  refs/heads/master 7d9386d2a -> 3f30b81a6


APEXMALHAR-2199 #closes #380 #resolve #comment Simplify the zookeeper url parser to use whatever user specified and support chroot path


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3f30b81a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3f30b81a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3f30b81a

Branch: refs/heads/master
Commit: 3f30b81a6e123879f33f95bc025c35808860fedc
Parents: 7d9386d
Author: Siyuan Hua <hs...@apache.org>
Authored: Fri Aug 26 16:58:12 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Fri Aug 26 16:58:28 2016 -0700

----------------------------------------------------------------------
 .../contrib/kafka/HighlevelKafkaConsumer.java   |  2 +-
 .../contrib/kafka/KafkaConsumer.java            | 27 ++++++++------------
 .../contrib/kafka/KafkaMetadataUtil.java        |  8 ++++--
 .../contrib/kafka/KafkaInputOperatorTest.java   | 10 +++-----
 4 files changed, 22 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
index 2f7cece..5b9c5ed 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
@@ -122,7 +122,7 @@ public class HighlevelKafkaConsumer extends KafkaConsumer
       // create high level consumer for every cluster
       Properties config = new Properties();
       config.putAll(consumerConfig);
-      config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeperMap.get(cluster)));
+      config.setProperty("zookeeper.connect", zookeeperMap.get(cluster).iterator().next());
       // create consumer connector will start a daemon thread to monitor the metadata change 
       // we want to start this thread until the operator is activated 
       standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config)));

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
index 805fdc4..a67ff48 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
@@ -86,8 +86,16 @@ public abstract class KafkaConsumer implements Closeable
   protected String topic = "default_topic";
 
   /**
-   * A zookeeper map keyed by cluster id
-   * It's mandatory field
+   * A zookeeper map keyed by cluster id.
+   * It's mandatory field <br>
+   * Each cluster should have only one connection string contain all nodes in the cluster <br>
+   * zookeeper chroot path is also supported <br>
+   *
+   * Single cluster zookeeper example: <br>
+   * &nbsp;&nbsp;  node1:2181,node2:2181,node3:2181/your/kafka/data <br>
+   * Multi-cluster zookeeper example: <br>
+   * &nbsp;&nbsp;  cluster1::node1:2181,node2:2181,node3:2181/cluster1;cluster2::node1:2181/cluster2
+   *
    */
   @NotNull
   @Bind(JavaSerializer.class)
@@ -535,20 +543,7 @@ public abstract class KafkaConsumer implements Closeable
     for (String zk : zookeeper.split(";")) {
       String[] parts = zk.split("::");
       String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
-      String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(",");
-      String portId = "";
-      for (int idx = hostNames.length - 1; idx >= 0; idx--) {
-        String[] zkParts = hostNames[idx].split(":");
-        if (zkParts.length == 2) {
-          portId = zkParts[1];
-        }
-        if (!portId.isEmpty() && portId != "") {
-          theClusters.put(clusterId, zkParts[0] + ":" + portId);
-        } else {
-          throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeper + "\n"
-              + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2");
-        }
-      }
+      theClusters.put(clusterId, parts.length == 1 ? parts[0] : parts[1]);
     }
     return theClusters;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index af5045a..b9d4b1b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -105,10 +105,14 @@ public class KafkaMetadataUtil
       }});
   }
 
-
+  /**
+   * There is always only one string in zkHost
+   * @param zkHost
+   * @return
+   */
   public static Set<String> getBrokers(Set<String> zkHost){
 
-    ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$);
+    ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
     Set<String> brokerHosts = new HashSet<String>();
     for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
       brokerHosts.add(b.connectionString());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index e4a4dec..f3af37f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -488,14 +488,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     consumer.setTopic(TEST_TOPIC);
 
     testMeta.operator.setConsumer(consumer);
-    testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181");
+    testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182/chroot/dir;cluster2::node4:2181");
     latch.await(500, TimeUnit.MILLISECONDS);
 
-    Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().zookeeperMap.size());
-    Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().zookeeperMap.get("cluster1").size());
-    Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").toString());
-    Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().zookeeperMap.get("cluster2").size());
-    Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").toString());
+    Assert.assertEquals("Total size of clusters ", 2, testMeta.operator.getConsumer().zookeeperMap.size());
+    Assert.assertEquals("Connection url for cluster1 ", "node0,node1,node2:2181,node3:2182/chroot/dir", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").iterator().next());
+    Assert.assertEquals("Connection url for cluster 2 ", "node4:2181", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").iterator().next());
   }
 
 }