You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/09/27 13:23:35 UTC

[6/7] git commit: CAMEL-7864: Updates to allow setting Kafka's zookeeper.connect directly.

CAMEL-7864: Updates to allow setting Kafka's zookeeper.connect directly.

- Added new property to KafkaConfiguration for zookeeperConnection and
  configured it to override the zookeeperHost and zookeeperPort
  properties.
- Created getZookeeperConnect method on KafkaConfiguration to return
  the zookeeperConnect property if set or the combination of host ":"
  port if zookeeperConnect is not set.
- Added zookeeperConnect get and set methods on KafkaEndpoint to
  delegate to KafkaConfiguration.
- Updated KafkaConsumer to use the getZookeeperConnect method on the
  KafkaEndpoint.
- Added tests for the changes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/daccc8e0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/daccc8e0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/daccc8e0

Branch: refs/heads/camel-2.13.x
Commit: daccc8e063ec3b305ddb1ef20a9a57b982c45328
Parents: 3a5497a
Author: john.shields <jo...@tubemogul.com>
Authored: Thu Sep 25 20:57:23 2014 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Sep 27 13:23:14 2014 +0200

----------------------------------------------------------------------
 .../component/kafka/KafkaConfiguration.java     | 25 +++++++++++++++++--
 .../camel/component/kafka/KafkaConsumer.java    |  9 +++----
 .../camel/component/kafka/KafkaEndpoint.java    |  9 +++++++
 .../component/kafka/KafkaComponentTest.java     | 26 ++++++++++++++++++++
 .../component/kafka/KafkaConsumerTest.java      | 14 ++++++++---
 5 files changed, 71 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 88d5017..881ef3c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -21,6 +21,7 @@ import java.util.Properties;
 import kafka.producer.DefaultPartitioner;
 
 public class KafkaConfiguration {
+    private String zookeeperConnect;
     private String zookeeperHost;
     private int zookeeperPort = 2181;
     private String topic;
@@ -127,13 +128,31 @@ public class KafkaConfiguration {
             props.put(key, value.toString());
         }
     }
+    
+    public String getZookeeperConnect() {
+        if (this.zookeeperConnect != null) {
+            return zookeeperConnect;
+        } else {
+            return getZookeeperHost() + ":" + getZookeeperPort();
+        }
+    }
+
+    public void setZookeeperConnect(String zookeeperConnect) {
+        this.zookeeperConnect = zookeeperConnect;
+        
+        // connect overrides host and port
+        this.zookeeperHost = null;
+        this.zookeeperPort = -1;
+    }
 
     public String getZookeeperHost() {
         return zookeeperHost;
     }
 
     public void setZookeeperHost(String zookeeperHost) {
-        this.zookeeperHost = zookeeperHost;
+        if (this.zookeeperConnect == null) {
+            this.zookeeperHost = zookeeperHost;
+        }
     }
 
     public int getZookeeperPort() {
@@ -141,7 +160,9 @@ public class KafkaConfiguration {
     }
 
     public void setZookeeperPort(int zookeeperPort) {
-        this.zookeeperPort = zookeeperPort;
+        if (this.zookeeperConnect == null) {
+            this.zookeeperPort = zookeeperPort;
+        }
     }
 
     public String getGroupId() {

http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 3087a14..f801328 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -46,11 +46,8 @@ public class KafkaConsumer extends DefaultConsumer {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
-        if (endpoint.getZookeeperHost() == null) {
-            throw new IllegalArgumentException("zookeeper host must be specified");
-        }
-        if (endpoint.getZookeeperPort() == 0) {
-            throw new IllegalArgumentException("zookeeper port must be specified");
+        if (endpoint.getZookeeperConnect() == null) { 
+            throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
         }
         if (endpoint.getGroupId() == null) {
             throw new IllegalArgumentException("groupId must not be null");
@@ -59,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     Properties getProps() {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
-        props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort());
+        props.put("zookeeper.connect", endpoint.getZookeeperConnect());
         props.put("group.id", endpoint.getGroupId());
         return props;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 002d15e..deed68a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -102,6 +102,14 @@ public class KafkaEndpoint extends DefaultEndpoint {
     // Delegated properties from the configuration
     //-------------------------------------------------------------------------
 
+    public String getZookeeperConnect() {
+    	return configuration.getZookeeperConnect();
+    }
+
+    public void setZookeeperConnect(String zookeeperConnect) {
+		configuration.setZookeeperConnect(zookeeperConnect);
+	}
+        
     public String getZookeeperHost() {
         return configuration.getZookeeperHost();
     }
@@ -417,4 +425,5 @@ public class KafkaEndpoint extends DefaultEndpoint {
     public int getRequestTimeoutMs() {
         return configuration.getRequestTimeoutMs();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index c11edaf..15cef7a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class KafkaComponentTest {
 
@@ -43,6 +44,7 @@ public class KafkaComponentTest {
         String remaining = "broker1:12345,broker2:12566";
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
+        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
         assertEquals("somehost", endpoint.getZookeeperHost());
         assertEquals(2987, endpoint.getZookeeperPort());
         assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
@@ -50,4 +52,28 @@ public class KafkaComponentTest {
         assertEquals(3, endpoint.getConsumerStreams());
         assertEquals("com.class.Party", endpoint.getPartitioner());
     }
+
+    @Test
+    public void testZookeeperConnectPropertyOverride() throws Exception {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("zookeeperConnect", "thehost:2181/chroot");
+        params.put("zookeeperHost", "somehost");
+        params.put("zookeeperPort", 2987);
+        params.put("portNumber", 14123);
+        params.put("consumerStreams", "3");
+        params.put("topic", "mytopic");
+        params.put("partitioner", "com.class.Party");
+
+        String uri = "kafka:broker1:12345,broker2:12566";
+        String remaining = "broker1:12345,broker2:12566";
+
+        KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
+        assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect());
+        assertNull(endpoint.getZookeeperHost());
+        assertEquals(-1, endpoint.getZookeeperPort());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
+        assertEquals("mytopic", endpoint.getTopic());
+        assertEquals(3, endpoint.getConsumerStreams());
+        assertEquals("com.class.Party", endpoint.getPartitioner());
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 740f116..b51c09e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -28,15 +28,21 @@ public class KafkaConsumerTest {
     private Processor processor = mock(Processor.class);
 
     @Test(expected = IllegalArgumentException.class)
-    public void consumerRequiresZookeeperHost() throws Exception {
-        Mockito.when(endpoint.getZookeeperPort()).thenReturn(2181);
+    public void consumerRequiresZookeeperConnect() throws Exception {
+        Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void consumerRequiresZookeeperPort() throws Exception {
-        Mockito.when(endpoint.getZookeeperHost()).thenReturn("localhost");
+    public void consumerRequiresGroupId() throws Exception {
+        Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
         new KafkaConsumer(endpoint, processor);
     }
 
+    @Test
+    public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception {
+        Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
+        Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+        new KafkaConsumer(endpoint, processor);
+    }
 }