You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/09/27 13:23:35 UTC
[6/7] git commit: CAMEL-7864: Updates to allow setting Kafka's
zookeeper.connect directly.
CAMEL-7864: Updates to allow setting Kafka's zookeeper.connect directly.
- Added new property to KafkaConfiguration for zookeeperConnection and
configured it to override the zookeeperHost and zookeeperPort
properties.
- Created getZookeeperConnect method on KafkaConfiguration to return
the zookeeperConnect property if set or the combination of host ":"
port if zookeeperConnect is not set.
- Added zookeeperConnect get and set methods on KafkaEndpoint to
delegate to KafkaConfiguration.
- Updated KafkaConsumer to use the getZookeeperConnect method on the
KafkaEndpoint.
- Added tests for the changes.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/daccc8e0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/daccc8e0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/daccc8e0
Branch: refs/heads/camel-2.13.x
Commit: daccc8e063ec3b305ddb1ef20a9a57b982c45328
Parents: 3a5497a
Author: john.shields <jo...@tubemogul.com>
Authored: Thu Sep 25 20:57:23 2014 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Sep 27 13:23:14 2014 +0200
----------------------------------------------------------------------
.../component/kafka/KafkaConfiguration.java | 25 +++++++++++++++++--
.../camel/component/kafka/KafkaConsumer.java | 9 +++----
.../camel/component/kafka/KafkaEndpoint.java | 9 +++++++
.../component/kafka/KafkaComponentTest.java | 26 ++++++++++++++++++++
.../component/kafka/KafkaConsumerTest.java | 14 ++++++++---
5 files changed, 71 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 88d5017..881ef3c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -21,6 +21,7 @@ import java.util.Properties;
import kafka.producer.DefaultPartitioner;
public class KafkaConfiguration {
+ private String zookeeperConnect;
private String zookeeperHost;
private int zookeeperPort = 2181;
private String topic;
@@ -127,13 +128,31 @@ public class KafkaConfiguration {
props.put(key, value.toString());
}
}
+
+ public String getZookeeperConnect() {
+ if (this.zookeeperConnect != null) {
+ return zookeeperConnect;
+ } else {
+ return getZookeeperHost() + ":" + getZookeeperPort();
+ }
+ }
+
+ public void setZookeeperConnect(String zookeeperConnect) {
+ this.zookeeperConnect = zookeeperConnect;
+
+ // connect overrides host and port
+ this.zookeeperHost = null;
+ this.zookeeperPort = -1;
+ }
public String getZookeeperHost() {
return zookeeperHost;
}
public void setZookeeperHost(String zookeeperHost) {
- this.zookeeperHost = zookeeperHost;
+ if (this.zookeeperConnect == null) {
+ this.zookeeperHost = zookeeperHost;
+ }
}
public int getZookeeperPort() {
@@ -141,7 +160,9 @@ public class KafkaConfiguration {
}
public void setZookeeperPort(int zookeeperPort) {
- this.zookeeperPort = zookeeperPort;
+ if (this.zookeeperConnect == null) {
+ this.zookeeperPort = zookeeperPort;
+ }
}
public String getGroupId() {
http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 3087a14..f801328 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -46,11 +46,8 @@ public class KafkaConsumer extends DefaultConsumer {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
- if (endpoint.getZookeeperHost() == null) {
- throw new IllegalArgumentException("zookeeper host must be specified");
- }
- if (endpoint.getZookeeperPort() == 0) {
- throw new IllegalArgumentException("zookeeper port must be specified");
+ if (endpoint.getZookeeperConnect() == null) {
+ throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
}
if (endpoint.getGroupId() == null) {
throw new IllegalArgumentException("groupId must not be null");
@@ -59,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer {
Properties getProps() {
Properties props = endpoint.getConfiguration().createConsumerProperties();
- props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort());
+ props.put("zookeeper.connect", endpoint.getZookeeperConnect());
props.put("group.id", endpoint.getGroupId());
return props;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 002d15e..deed68a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -102,6 +102,14 @@ public class KafkaEndpoint extends DefaultEndpoint {
// Delegated properties from the configuration
//-------------------------------------------------------------------------
+ public String getZookeeperConnect() {
+ return configuration.getZookeeperConnect();
+ }
+
+ public void setZookeeperConnect(String zookeeperConnect) {
+ configuration.setZookeeperConnect(zookeeperConnect);
+ }
+
public String getZookeeperHost() {
return configuration.getZookeeperHost();
}
@@ -417,4 +425,5 @@ public class KafkaEndpoint extends DefaultEndpoint {
public int getRequestTimeoutMs() {
return configuration.getRequestTimeoutMs();
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index c11edaf..15cef7a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class KafkaComponentTest {
@@ -43,6 +44,7 @@ public class KafkaComponentTest {
String remaining = "broker1:12345,broker2:12566";
KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
+ assertEquals("somehost:2987", endpoint.getZookeeperConnect());
assertEquals("somehost", endpoint.getZookeeperHost());
assertEquals(2987, endpoint.getZookeeperPort());
assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
@@ -50,4 +52,28 @@ public class KafkaComponentTest {
assertEquals(3, endpoint.getConsumerStreams());
assertEquals("com.class.Party", endpoint.getPartitioner());
}
+
+ @Test
+ public void testZookeeperConnectPropertyOverride() throws Exception {
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put("zookeeperConnect", "thehost:2181/chroot");
+ params.put("zookeeperHost", "somehost");
+ params.put("zookeeperPort", 2987);
+ params.put("portNumber", 14123);
+ params.put("consumerStreams", "3");
+ params.put("topic", "mytopic");
+ params.put("partitioner", "com.class.Party");
+
+ String uri = "kafka:broker1:12345,broker2:12566";
+ String remaining = "broker1:12345,broker2:12566";
+
+ KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
+ assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect());
+ assertNull(endpoint.getZookeeperHost());
+ assertEquals(-1, endpoint.getZookeeperPort());
+ assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
+ assertEquals("mytopic", endpoint.getTopic());
+ assertEquals(3, endpoint.getConsumerStreams());
+ assertEquals("com.class.Party", endpoint.getPartitioner());
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/daccc8e0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 740f116..b51c09e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -28,15 +28,21 @@ public class KafkaConsumerTest {
private Processor processor = mock(Processor.class);
@Test(expected = IllegalArgumentException.class)
- public void consumerRequiresZookeeperHost() throws Exception {
- Mockito.when(endpoint.getZookeeperPort()).thenReturn(2181);
+ public void consumerRequiresZookeeperConnect() throws Exception {
+ Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
new KafkaConsumer(endpoint, processor);
}
@Test(expected = IllegalArgumentException.class)
- public void consumerRequiresZookeeperPort() throws Exception {
- Mockito.when(endpoint.getZookeeperHost()).thenReturn("localhost");
+ public void consumerRequiresGroupId() throws Exception {
+ Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
new KafkaConsumer(endpoint, processor);
}
+ @Test
+ public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception {
+ Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
+ Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+ new KafkaConsumer(endpoint, processor);
+ }
}