You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/09 10:10:54 UTC
[1/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java
client instead of scala. Thanks to Anbumani Balusamy for the patch.
Repository: camel
Updated Branches:
refs/heads/master 038e1617f -> 621a704c4
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 7934f41..0bb4740 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,24 +17,22 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -42,7 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
-
+
private static final String TOPIC_STRINGS = "test";
private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
private static final String TOPIC_BYTES = "testBytes";
@@ -52,15 +50,16 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class);
- private static ConsumerConnector stringsConsumerConn;
- private static ConsumerConnector bytesConsumerConn;
+ private static KafkaConsumer<String, String> stringsConsumerConn;
+ private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
@EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS
- + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
- + "&requestRequiredAcks=-1")
+ + "&requestRequiredAcks=-1")
private Endpoint toStrings;
- @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1")
+ @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1"
+ + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+ + "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
private Endpoint toBytes;
@Produce(uri = "direct:startStrings")
@@ -69,46 +68,42 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
@Produce(uri = "direct:startBytes")
private ProducerTemplate bytesTemplate;
-
@BeforeClass
public static void before() {
Properties stringsProps = new Properties();
-
- stringsProps.put("zookeeper.connect", "localhost:" + getZookeeperPort());
- stringsProps.put("group.id", GROUP_STRINGS);
- stringsProps.put("zookeeper.session.timeout.ms", "6000");
- stringsProps.put("zookeeper.connectiontimeout.ms", "12000");
- stringsProps.put("zookeeper.sync.time.ms", "200");
- stringsProps.put("auto.commit.interval.ms", "1000");
- stringsProps.put("auto.offset.reset", "smallest");
- stringsConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(stringsProps));
+
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort());
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ stringsConsumerConn = new KafkaConsumer<String, String>(stringsProps);
Properties bytesProps = new Properties();
bytesProps.putAll(stringsProps);
bytesProps.put("group.id", GROUP_BYTES);
- bytesConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(bytesProps));
+ bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ bytesConsumerConn = new KafkaConsumer<byte[], byte[]>(bytesProps);
}
@AfterClass
public static void after() {
- stringsConsumerConn.shutdown();
- bytesConsumerConn.shutdown();
+ stringsConsumerConn.close();
+ bytesConsumerConn.close();
}
@Override
- protected RouteBuilder[] createRouteBuilders() throws Exception {
- return new RouteBuilder[] {
- new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:startStrings").to(toStrings);
- }
- },
- new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:startBytes").to(toBytes);
- }
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:startStrings").to(toStrings);
+
+ from("direct:startBytes").to(toBytes);
}
};
}
@@ -120,14 +115,11 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(TOPIC_STRINGS, 5);
- topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5);
- createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap);
-
sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
@@ -140,11 +132,6 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(TOPIC_BYTES, 5);
- topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5);
- createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap);
-
Map<String, Object> inTopicHeaders = new HashMap<String, Object>();
inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders);
@@ -154,22 +141,47 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders);
+ createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch);
+
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
}
- private void createKafkaMessageConsumer(ConsumerConnector consumerConn, String topic, String topicInHeader,
- CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConn.createMessageStreams(topicCountMap);
+ private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn,
+ String topic, String topicInHeader, CountDownLatch messagesLatch) {
+
+ consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+ boolean run = true;
- ExecutorService executor = Executors.newFixedThreadPool(10);
- for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
- executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+ while (run) {
+ ConsumerRecords<String, String> records = consumerConn.poll(100);
+ for (ConsumerRecord<String, String> record : records) {
+ messagesLatch.countDown();
+ if (messagesLatch.getCount() == 0) {
+ run = false;
+ }
+ }
}
- for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topicInHeader)) {
- executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+
+ }
+
+ private void createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> consumerConn, String topic,
+ String topicInHeader, CountDownLatch messagesLatch) {
+
+ consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+ boolean run = true;
+
+ while (run) {
+ ConsumerRecords<byte[], byte[]> records = consumerConn.poll(100);
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ messagesLatch.countDown();
+ if (messagesLatch.getCount() == 0) {
+ run = false;
+ }
+ }
}
+
}
private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) {
@@ -186,23 +198,4 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
}
}
- private static class KakfaTopicConsumer implements Runnable {
- private final KafkaStream<byte[], byte[]> stream;
- private final CountDownLatch latch;
-
- public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) {
- this.stream = stream;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- String msg = new String(it.next().message());
- LOG.info("Get the message" + msg);
- latch.countDown();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index f2bcd6b..98f6421 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -18,12 +18,12 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultMessage;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
@@ -41,18 +41,17 @@ public class KafkaProducerTest {
@SuppressWarnings({"unchecked"})
public KafkaProducerTest() throws Exception {
- endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null);
+ endpoint = new KafkaEndpoint(
+ "kafka:broker1:1234,broker2:4567?topic=sometopic", null);
endpoint.setBrokers("broker1:1234,broker2:4567");
producer = new KafkaProducer(endpoint);
- producer.producer = Mockito.mock(Producer.class);
+ producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class));
}
@Test
public void testPropertyBuilder() throws Exception {
- endpoint.setPartitioner("com.sksamuel.someclass");
Properties props = producer.getProps();
- assertEquals("com.sksamuel.someclass", props.getProperty("partitioner.class"));
- assertEquals("broker1:1234,broker2:4567", props.getProperty("metadata.broker.list"));
+ assertEquals("broker1:1234,broker2:4567", props.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
}
@Test
@@ -63,20 +62,18 @@ public class KafkaProducerTest {
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
producer.process(exchange);
-
- Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
+ Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
}
@Test
public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
endpoint.setTopic(null);
Mockito.when(exchange.getIn()).thenReturn(in);
- in.setHeader(KafkaConstants.PARTITION_KEY, "4");
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
producer.process(exchange);
- verifySendMessage("4", "anotherTopic", "4");
+ verifySendMessage("anotherTopic");
}
@Test
@@ -112,10 +109,9 @@ public class KafkaProducerTest {
endpoint.setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
-
+ in.setHeader(KafkaConstants.KEY, "someKey");
producer.process(exchange);
-
- verifySendMessage("4", "someTopic", "4");
+ verifySendMessage("4", "someTopic", "someKey");
}
@Test
@@ -126,9 +122,9 @@ public class KafkaProducerTest {
producer.process(exchange);
- verifySendMessage("someKey", "someTopic", "someKey");
+ verifySendMessage("someTopic", "someKey");
}
-
+
@Test
public void processSendMessageWithBridgeEndpoint() throws Exception {
endpoint.setTopic("someTopic");
@@ -136,19 +132,44 @@ public class KafkaProducerTest {
Mockito.when(exchange.getIn()).thenReturn(in);
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
in.setHeader(KafkaConstants.KEY, "someKey");
-
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+ producer.process(exchange);
+
+ verifySendMessage("4", "someTopic", "someKey");
+ }
+
+ @Test // Message and Topic Name alone
+ public void processSendsMesssageWithMessageTopicName() throws Exception {
+ endpoint.setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+
producer.process(exchange);
-
- verifySendMessage("someKey", "someTopic", "someKey");
+
+ verifySendMessage("someTopic");
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected void verifySendMessage(String partitionKey, String topic, String messageKey) {
- ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
- Mockito.verify(producer.producer).send(captor.capture());
- assertEquals(partitionKey, captor.getValue().partitionKey());
+ ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+ assertEquals(new Integer(partitionKey), captor.getValue().partition());
+ assertEquals(messageKey, captor.getValue().key());
+ assertEquals(topic, captor.getValue().topic());
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected void verifySendMessage(String topic, String messageKey) {
+ ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
assertEquals(messageKey, captor.getValue().key());
assertEquals(topic, captor.getValue().topic());
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected void verifySendMessage(String topic) {
+ ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+ assertEquals(topic, captor.getValue().topic());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
deleted file mode 100644
index 039a2e7..0000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-public class SimplePartitioner implements Partitioner {
-
- public SimplePartitioner(VerifiableProperties props) {
- }
-
- /**
- * Uses the key to calculate a partition bucket id for routing
- * the data to the appropriate broker partition
- *
- * @return an integer between 0 and numPartitions-1
- */
- @Override
- public int partition(Object key, int numPartitions) {
- return key.hashCode() % numPartitions;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index ce11a47..42403c2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -27,13 +27,9 @@ import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.ZkUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.Option;
public class EmbeddedKafkaCluster {
- private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
-
private final List<Integer> ports;
private final String zkConnection;
private final Properties baseProperties;
@@ -68,7 +64,7 @@ public class EmbeddedKafkaCluster {
return null;
}
- public void createTopics(String...topics) {
+ public void createTopics(String... topics) {
for (String topic : topics) {
AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties());
}
@@ -112,10 +108,10 @@ public class EmbeddedKafkaCluster {
properties.setProperty("host.name", "localhost");
properties.setProperty("port", Integer.toString(port));
properties.setProperty("log.dir", logDir.getAbsolutePath());
- properties.setProperty("num.partitions", String.valueOf(1));
- properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
+ properties.setProperty("num.partitions", String.valueOf(1));
+ properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
+ System.out.println("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath());
properties.setProperty("log.flush.interval.messages", String.valueOf(1));
- LOG.info("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath());
KafkaServer broker = startBroker(properties);
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j.properties b/components/camel-kafka/src/test/resources/log4j.properties
index 67458fd..44266d1 100644
--- a/components/camel-kafka/src/test/resources/log4j.properties
+++ b/components/camel-kafka/src/test/resources/log4j.properties
@@ -32,4 +32,4 @@ log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/camel-kafka-test.log
-log4j.appender.out.append=true
+log4j.appender.out.append=true
\ No newline at end of file
[3/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java
client instead of scala. Thanks to Anbumani Balusamy for the patch.
Posted by da...@apache.org.
CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2aa831d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2aa831d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2aa831d
Branch: refs/heads/master
Commit: b2aa831da8c8f78f7d6ca908c5b33957bbc7fa24
Parents: 038e161
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 9 09:58:36 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 9 10:01:59 2016 +0100
----------------------------------------------------------------------
components/camel-kafka/pom.xml | 26 +-
.../camel/component/kafka/KafkaComponent.java | 22 +-
.../component/kafka/KafkaConfiguration.java | 1204 ++++++++++++------
.../camel/component/kafka/KafkaConstants.java | 13 +-
.../camel/component/kafka/KafkaConsumer.java | 183 +--
.../camel/component/kafka/KafkaEndpoint.java | 598 +++++----
.../camel/component/kafka/KafkaProducer.java | 64 +-
.../component/kafka/BaseEmbeddedKafkaTest.java | 21 +-
.../component/kafka/KafkaComponentTest.java | 224 ++--
.../kafka/KafkaConsumerBatchSizeTest.java | 64 +-
.../component/kafka/KafkaConsumerFullTest.java | 34 +-
.../component/kafka/KafkaConsumerTest.java | 8 +-
.../component/kafka/KafkaEndpointTest.java | 13 +-
.../component/kafka/KafkaProducerFullTest.java | 145 +--
.../component/kafka/KafkaProducerTest.java | 65 +-
.../component/kafka/SimplePartitioner.java | 39 -
.../kafka/embedded/EmbeddedKafkaCluster.java | 12 +-
.../src/test/resources/log4j.properties | 2 +-
18 files changed, 1618 insertions(+), 1119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 1a5f98f..f8bbdb5 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -15,7 +15,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -37,31 +38,35 @@
</properties>
<dependencies>
+
+ <!-- camel -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
+
+ <!-- kafka java client -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-version}</version>
+ </dependency>
+
+ <!-- kafka server for testing using scala -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka-version}</version>
+ <scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
- <exclusion>
- <artifactId>scala-library</artifactId>
- <groupId>org.scala-lang</groupId>
- </exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <scope>provided</scope>
- </dependency>
+ <!-- test -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
@@ -77,6 +82,7 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c9d4c2a..2981b3f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -20,8 +20,6 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.UriEndpointComponent;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.EndpointHelper;
public class KafkaComponent extends UriEndpointComponent {
@@ -34,30 +32,14 @@ public class KafkaComponent extends UriEndpointComponent {
}
@Override
- protected KafkaEndpoint createEndpoint(String uri,
- String remaining,
- Map<String, Object> params) throws Exception {
-
+ protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
String brokers = remaining.split("\\?")[0];
- Object confparam = params.get("configuration");
- if (confparam != null) {
- // need a special handling to resolve the reference before other parameters are set/merged into the config
- KafkaConfiguration confobj = null;
- if (confparam instanceof KafkaConfiguration) {
- confobj = (KafkaConfiguration)confparam;
- } else if (confparam instanceof String && EndpointHelper.isReferenceParameter((String)confparam)) {
- confobj = (KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), ((String)confparam).substring(1));
- }
- if (confobj != null) {
- endpoint.setConfiguration(confobj.copy());
- }
- params.remove("configuration");
- }
if (brokers != null) {
endpoint.getConfiguration().setBrokers(brokers);
}
setProperties(endpoint, params);
return endpoint;
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 894df0c..4a948c1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -18,28 +18,28 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
-import kafka.producer.DefaultPartitioner;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
@UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration {
+
+ @UriPath @Metadata(required = "true")
+ private String brokers;
- @UriParam
- private String zookeeperConnect;
- @UriParam
- private String zookeeperHost;
- @UriParam(defaultValue = "2181")
- private int zookeeperPort = 2181;
@UriParam @Metadata(required = "true")
private String topic;
@UriParam
private String groupId;
- @UriParam(defaultValue = "DefaultPartitioner")
- private String partitioner = DefaultPartitioner.class.getCanonicalName();
+ @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
+ private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@UriParam(label = "consumer", defaultValue = "10")
private int consumerStreams = 10;
@UriParam(label = "consumer", defaultValue = "1")
@@ -53,134 +53,302 @@ public class KafkaConfiguration implements Cloneable {
@UriParam
private String clientId;
+ //key.deserializer
+ @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+ private String keyDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+ //value.deserializer
+ @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+ private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+ //fetch.min.bytes
+ @UriParam(label = "consumer", defaultValue = "1024")
+ private Integer fetchMinBytes = 1024;
+ //heartbeat.interval.ms
+ @UriParam(label = "consumer", defaultValue = "3000")
+ private Integer heartbeatIntervalMs = 3000;
+ //max.partition.fetch.bytes
+ @UriParam(label = "consumer", defaultValue = "1048576")
+ private Integer maxPartitionFetchBytes = 1048576;
+ //session.timeout.ms
+ @UriParam(label = "consumer", defaultValue = "30000")
+ private Integer sessionTimeoutMs = 30000;
+ //auto.offset.reset
+ @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
+ private String autoOffsetReset = "latest";
+ //partition.assignment.strategy
+ @UriParam(label = "consumer", defaultValue = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR)
+ private String partitionAssignor = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR;
+ //request.timeout.ms
+ @UriParam(label = "consumer", defaultValue = "40000")
+ private Integer consumerRequestTimeoutMs = 40000;
+ //auto.commit.interval.ms
+ @UriParam(label = "consumer", defaultValue = "5000")
+ private Integer autoCommitIntervalMs = 5000;
+ //check.crcs
+ @UriParam(label = "consumer", defaultValue = "true")
+ private Boolean checkCrcs = true;
+ //fetch.max.wait.ms
+ @UriParam(label = "consumer", defaultValue = "500")
+ private Integer fetchWaitMaxMs = 500;
+
//Consumer configuration properties
@UriParam(label = "consumer")
private String consumerId;
- @UriParam(label = "consumer", defaultValue = "30000")
- private Integer socketTimeoutMs = 30 * 1000;
- @UriParam(label = "consumer", defaultValue = "" + 64 * 1024)
- private Integer socketReceiveBufferBytes = 64 * 1024;
- @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024)
- private Integer fetchMessageMaxBytes = 1024 * 1024;
@UriParam(label = "consumer", defaultValue = "true")
private Boolean autoCommitEnable = true;
- @UriParam(label = "consumer", defaultValue = "60000")
- private Integer autoCommitIntervalMs = 60 * 1000;
- @UriParam(label = "consumer", defaultValue = "2")
- private Integer queuedMaxMessageChunks = 2;
- @UriParam(label = "consumer", defaultValue = "4")
- private Integer rebalanceMaxRetries = 4;
- @UriParam(label = "consumer", defaultValue = "1")
- private Integer fetchMinBytes = 1;
- @UriParam(label = "consumer", defaultValue = "100")
- private Integer fetchWaitMaxMs = 100;
- @UriParam(label = "consumer", defaultValue = "2000")
- private Integer rebalanceBackoffMs = 2000;
- @UriParam(label = "consumer", defaultValue = "200")
- private Integer refreshLeaderBackoffMs = 200;
- @UriParam(label = "consumer", defaultValue = "largest", enums = "smallest,largest,fail")
- private String autoOffsetReset = "largest";
- @UriParam(label = "consumer")
- private Integer consumerTimeoutMs;
- @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka")
- private String offsetsStorage = "zookeeper";
- @UriParam(label = "consumer", defaultValue = "true")
- private Boolean dualCommitEnabled = true;
-
- //Zookeepr configuration properties
- @UriParam
- private Integer zookeeperSessionTimeoutMs;
- @UriParam
- private Integer zookeeperConnectionTimeoutMs;
- @UriParam
- private Integer zookeeperSyncTimeMs;
//Producer configuration properties
- @UriPath
- private String brokers;
- @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync")
- private String producerType = "sync";
- @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy")
- private String compressionCodec = "none";
- @UriParam(label = "producer")
- private String compressedTopics;
- @UriParam(label = "producer", defaultValue = "3")
- private Integer messageSendMaxRetries = 3;
@UriParam(label = "producer", defaultValue = "100")
private Integer retryBackoffMs = 100;
- @UriParam(label = "producer", defaultValue = "600000")
- private Integer topicMetadataRefreshIntervalMs = 600 * 1000;
-
- //Sync producer config
- @UriParam(label = "producer", defaultValue = "" + 100 * 1024)
- private Integer sendBufferBytes = 100 * 1024;
- @UriParam(label = "producer", defaultValue = "0")
- private short requestRequiredAcks;
- @UriParam(label = "producer", defaultValue = "10000")
- private Integer requestTimeoutMs = 10000;
//Async producer config
- @UriParam(label = "producer", defaultValue = "5000")
- private Integer queueBufferingMaxMs = 5000;
@UriParam(label = "producer", defaultValue = "10000")
private Integer queueBufferingMaxMessages = 10000;
@UriParam(label = "producer")
- private Integer queueEnqueueTimeoutMs;
- @UriParam(label = "producer", defaultValue = "200")
- private Integer batchNumMessages = 200;
- @UriParam(label = "producer")
private String serializerClass;
@UriParam(label = "producer")
private String keySerializerClass;
+ @UriParam(label = "producer", defaultValue = "1")
+ private Integer requestRequiredAcks = 1;
+ //buffer.memory
+ @UriParam(label = "producer", defaultValue = "33554432")
+ private Integer bufferMemorySize = 33554432;
+ //compression.type
+ @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4")
+ private String compressionCodec = "none";
+ //retries
+ @UriParam(label = "producer", defaultValue = "0")
+ private Integer retries = 0;
+ // SSL
+ // ssl.key.password
+ @UriParam(label = "producer")
+ private String sslKeyPassword;
+ // ssl.keystore.location
+ @UriParam(label = "producer")
+ private String sslKeystoreLocation;
+ // ssl.keystore.password
+ @UriParam(label = "producer")
+ private String sslKeystorePassword;
+ //ssl.truststore.location
+ @UriParam(label = "producer")
+ private String sslTruststoreLocation;
+ //ssl.truststore.password
+ @UriParam(label = "producer")
+ private String sslTruststorePassword;
+ //batch.size
+ @UriParam(label = "producer", defaultValue = "16384")
+ private Integer producerBatchSize = 16384;
+ //connections.max.idle.ms
+ @UriParam(label = "producer", defaultValue = "540000")
+ private Integer connectionMaxIdleMs = 540000;
+ //linger.ms
+ @UriParam(label = "producer", defaultValue = "0")
+ private Integer lingerMs = 0;
+ //linger.ms
+ @UriParam(label = "producer", defaultValue = "60000")
+ private Integer maxBlockMs = 60000;
+ //max.request.size
+ @UriParam(label = "producer", defaultValue = "1048576")
+ private Integer maxRequestSize = 1048576;
+ //receive.buffer.bytes
+ @UriParam(label = "producer", defaultValue = "32768")
+ private Integer receiveBufferBytes = 32768;
+ //request.timeout.ms
+ @UriParam(label = "producer", defaultValue = "30000")
+ private Integer requestTimeoutMs = 30000;
+ // SASL & sucurity Protocol
+ //sasl.kerberos.service.name
+ @UriParam(label = "producer")
+ private String saslKerberosServiceName;
+ //security.protocol
+ @UriParam(label = "producer", defaultValue = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL)
+ private String securityProtocol = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+ //send.buffer.bytes
+ @UriParam(label = "producer", defaultValue = "131072")
+ private Integer sendBufferBytes = 131072;
+ //SSL
+ //ssl.enabled.protocols
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)
+ private String sslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+ //ssl.keystore.type
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE)
+ private String sslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+ //ssl.protocol
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_PROTOCOL)
+ private String sslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL;
+ //ssl.provider
+ @UriParam(label = "producer")
+ private String sslProvider;
+ //ssl.truststore.type
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
+ private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+ //timeout.ms
+ @UriParam(label = "producer", defaultValue = "30000")
+ private Integer timeoutMs = 30000;
+ //block.on.buffer.full
+ @UriParam(label = "producer", defaultValue = "false")
+ private Boolean blockOnBufferFull = false;
+ //max.in.flight.requests.per.connection
+ @UriParam(label = "producer", defaultValue = "5")
+ private Integer maxInFlightRequest = 5;
+ //metadata.fetch.timeout.ms
+ @UriParam(label = "producer", defaultValue = "60000")
+ private Integer metadataFetchTimeoutMs = 600 * 1000;
+ //metadata.max.age.ms
+ @UriParam(label = "producer", defaultValue = "300000")
+ private Integer metadataMaxAgeMs = 300000;
+ //metric.reporters
+ @UriParam(label = "producer")
+ private String metricReporters;
+ //metrics.num.samples
+ @UriParam(label = "producer", defaultValue = "2")
+ private Integer noOfMetricsSample = 2;
+ //metrics.sample.window.ms
+ @UriParam(label = "producer", defaultValue = "30000")
+ private Integer metricsSampleWindowMs = 30000;
+ //reconnect.backoff.ms
+ @UriParam(label = "producer", defaultValue = "50")
+ private Integer reconnectBackoffMs = 50;
+ //SASL
+ //sasl.kerberos.kinit.cmd
+ @UriParam(label = "producer", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
+ private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
+ //sasl.kerberos.min.time.before.relogin
+ @UriParam(label = "producer", defaultValue = "60000")
+ private Integer kerberosBeforeReloginMinTime = 60000;
+ //sasl.kerberos.ticket.renew.jitter
+ @UriParam(label = "producer", defaultValue = "0.05")
+ private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER;
+ //sasl.kerberos.ticket.renew.window.factor
+ @UriParam(label = "producer", defaultValue = "0.8")
+ private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
+ //SSL
+ //ssl.cipher.suites
+ @UriParam(label = "producer")
+ private String sslCipherSuites;
+ //ssl.endpoint.identification.algorithm
+ @UriParam(label = "producer")
+ private String sslEndpointAlgorithm;
+ //ssl.keymanager.algorithm
+ @UriParam(label = "producer", defaultValue = "SunX509")
+ private String sslKeymanagerAlgorithm = "SunX509";
+ //ssl.trustmanager.algorithm
+ @UriParam(label = "producer", defaultValue = "PKIX")
+ private String sslTrustmanagerAlgorithm = "PKIX";
+
public KafkaConfiguration() {
}
public Properties createProducerProperties() {
Properties props = new Properties();
- addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks());
- addPropertyIfNotNull(props, "partitioner.class", getPartitioner());
- addPropertyIfNotNull(props, "serializer.class", getSerializerClass());
- addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass());
- addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs());
- addPropertyIfNotNull(props, "producer.type", getProducerType());
- addPropertyIfNotNull(props, "compression.codec", getCompressionCodec());
- addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics());
- addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries());
- addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs());
- addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs());
- addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs());
- addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages());
- addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs());
- addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages());
- addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes());
- addPropertyIfNotNull(props, "client.id", getClientId());
+ addPropertyIfNotNull(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializerClass());
+ addPropertyIfNotNull(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializerClass());
+ addPropertyIfNotNull(props, ProducerConfig.ACKS_CONFIG, getRequestRequiredAcks());
+ addPropertyIfNotNull(props, ProducerConfig.BUFFER_MEMORY_CONFIG, getBufferMemorySize());
+ addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
+ addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
+ // SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+ addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getRetries());
+ addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize());
+ addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId());
+ addPropertyIfNotNull(props, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+ addPropertyIfNotNull(props, ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
+ addPropertyIfNotNull(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, getMaxBlockMs());
+ addPropertyIfNotNull(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getMaxRequestSize());
+ addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner());
+ addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+ addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+ // Security protocol
+ addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+ addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+ addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
+ addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
+ addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
+ addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
+ addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+ addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+ addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+ addPropertyIfNotNull(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+ addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+ addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
+
return props;
}
public Properties createConsumerProperties() {
Properties props = new Properties();
- addPropertyIfNotNull(props, "consumer.id", getConsumerId());
- addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs());
- addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes());
- addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes());
- addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable());
- addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs());
- addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages());
- addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
- addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
- addPropertyIfNotNull(props, "queued.max.message.chunks", getQueuedMaxMessageChunks());
- addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries());
- addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs());
- addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs());
- addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset());
- addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs());
- addPropertyIfNotNull(props, "client.id", getClientId());
- addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs());
- addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
- addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs());
- addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage());
- addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled());
+ addPropertyIfNotNull(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer());
+ addPropertyIfNotNull(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
+ addPropertyIfNotNull(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes());
+ addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+ addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
+ addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+ // SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+ addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
+ addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+ addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable());
+ addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+ addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+ addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+ // Security protocol
+ addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+ addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+ addPropertyIfNotNull(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
+ addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs());
+ addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
+ addPropertyIfNotNull(props, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchWaitMaxMs());
+ addPropertyIfNotNull(props, ConsumerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+ addPropertyIfNotNull(props, ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+ addPropertyIfNotNull(props, ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+ addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+ addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+ addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
return props;
}
@@ -191,65 +359,6 @@ public class KafkaConfiguration implements Cloneable {
}
}
- public String getZookeeperConnect() {
- if (this.zookeeperConnect != null) {
- return zookeeperConnect;
- } else {
- return getZookeeperHost() + ":" + getZookeeperPort();
- }
- }
-
- /**
- * Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server.
- * To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the
- * form hostname1:port1,hostname2:port2,hostname3:port3.
- * The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data
- * under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string.
- * For example to give a chroot path of /chroot/path you would give the connection
- * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
- */
- public void setZookeeperConnect(String zookeeperConnect) {
- this.zookeeperConnect = zookeeperConnect;
-
- // connect overrides host and port
- this.zookeeperHost = null;
- this.zookeeperPort = -1;
- }
-
- public String getZookeeperHost() {
- return zookeeperHost;
- }
-
- /**
- * The zookeeper host to use.
- * <p/>
- * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
- * <p/>
- * This option can only be used if zookeeperConnect is not in use.
- */
- public void setZookeeperHost(String zookeeperHost) {
- if (this.zookeeperConnect == null) {
- this.zookeeperHost = zookeeperHost;
- }
- }
-
- public int getZookeeperPort() {
- return zookeeperPort;
- }
-
- /**
- * The zookeeper port to use
- * <p/>
- * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
- * <p/>
- * This option can only be used if zookeeperConnect is not in use.
- */
- public void setZookeeperPort(int zookeeperPort) {
- if (this.zookeeperConnect == null) {
- this.zookeeperPort = zookeeperPort;
- }
- }
-
public String getGroupId() {
return groupId;
}
@@ -278,7 +387,7 @@ public class KafkaConfiguration implements Cloneable {
}
/**
- * Name of the topic to use
+ * Name of the topic to use.
*/
public void setTopic(String topic) {
this.topic = topic;
@@ -351,440 +460,741 @@ public class KafkaConfiguration implements Cloneable {
this.consumerId = consumerId;
}
- public Integer getSocketTimeoutMs() {
- return socketTimeoutMs;
+ public Boolean isAutoCommitEnable() {
+ return autoCommitEnable;
}
/**
- * The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms.
+ * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
+ * This committed offset will be used when the process fails as the position from which the new consumer will begin.
*/
- public void setSocketTimeoutMs(Integer socketTimeoutMs) {
- this.socketTimeoutMs = socketTimeoutMs;
+ public void setAutoCommitEnable(Boolean autoCommitEnable) {
+ this.autoCommitEnable = autoCommitEnable;
}
- public Integer getSocketReceiveBufferBytes() {
- return socketReceiveBufferBytes;
+ public Integer getAutoCommitIntervalMs() {
+ return autoCommitIntervalMs;
}
/**
- * The socket receive buffer for network requests
+ * The frequency in ms that the consumer offsets are committed to zookeeper.
*/
- public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
- this.socketReceiveBufferBytes = socketReceiveBufferBytes;
+ public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+ this.autoCommitIntervalMs = autoCommitIntervalMs;
}
- public Integer getFetchMessageMaxBytes() {
- return fetchMessageMaxBytes;
+ public Integer getFetchMinBytes() {
+ return fetchMinBytes;
}
/**
- * The number of byes of messages to attempt to fetch for each topic-partition in each fetch request.
- * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer.
- * The fetch request size must be at least as large as the maximum message size the server allows or else it
- * is possible for the producer to send messages larger than the consumer can fetch.
+ * The minimum amount of data the server should return for a fetch request.
+ * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
*/
- public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
- this.fetchMessageMaxBytes = fetchMessageMaxBytes;
+ public void setFetchMinBytes(Integer fetchMinBytes) {
+ this.fetchMinBytes = fetchMinBytes;
}
- public Boolean isAutoCommitEnable() {
- return autoCommitEnable;
+ public Integer getFetchWaitMaxMs() {
+ return fetchWaitMaxMs;
}
/**
- * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
- * This committed offset will be used when the process fails as the position from which the new consumer will begin.
+ * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
*/
- public void setAutoCommitEnable(Boolean autoCommitEnable) {
- this.autoCommitEnable = autoCommitEnable;
+ public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+ this.fetchWaitMaxMs = fetchWaitMaxMs;
}
- public Integer getAutoCommitIntervalMs() {
- return autoCommitIntervalMs;
+ public String getAutoOffsetReset() {
+ return autoOffsetReset;
}
/**
- * The frequency in ms that the consumer offsets are committed to zookeeper.
+ * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
+ * smallest : automatically reset the offset to the smallest offset
+ * largest : automatically reset the offset to the largest offset
+ * fail: throw exception to the consumer
*/
- public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
- this.autoCommitIntervalMs = autoCommitIntervalMs;
+ public void setAutoOffsetReset(String autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
}
- public Integer getQueuedMaxMessageChunks() {
- return queuedMaxMessageChunks;
+ public String getBrokers() {
+ return brokers;
}
/**
- * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
+ * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
+ * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+ * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
+ * <p/>
+ * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
*/
- public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
- this.queuedMaxMessageChunks = queuedMaxMessageChunks;
+ public void setBrokers(String brokers) {
+ this.brokers = brokers;
}
- public Integer getRebalanceMaxRetries() {
- return rebalanceMaxRetries;
+ public String getCompressionCodec() {
+ return compressionCodec;
}
/**
- * When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
- * If the set of consumers changes while this assignment is taking place the rebalance will fail and retry.
- * This setting controls the maximum number of attempts before giving up.
+ * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
*/
- public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
- this.rebalanceMaxRetries = rebalanceMaxRetries;
+ public void setCompressionCodec(String compressionCodec) {
+ this.compressionCodec = compressionCodec;
}
- public Integer getFetchMinBytes() {
- return fetchMinBytes;
+ public Integer getRetryBackoffMs() {
+ return retryBackoffMs;
}
/**
- * The minimum amount of data the server should return for a fetch request.
- * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
+ * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
+ * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
*/
- public void setFetchMinBytes(Integer fetchMinBytes) {
- this.fetchMinBytes = fetchMinBytes;
+ public void setRetryBackoffMs(Integer retryBackoffMs) {
+ this.retryBackoffMs = retryBackoffMs;
}
- public Integer getFetchWaitMaxMs() {
- return fetchWaitMaxMs;
+ public Integer getSendBufferBytes() {
+ return sendBufferBytes;
}
/**
- * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
+ * Socket write buffer size
*/
- public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
- this.fetchWaitMaxMs = fetchWaitMaxMs;
+ public void setSendBufferBytes(Integer sendBufferBytes) {
+ this.sendBufferBytes = sendBufferBytes;
}
- public Integer getRebalanceBackoffMs() {
- return rebalanceBackoffMs;
+ public Integer getRequestTimeoutMs() {
+ return requestTimeoutMs;
}
/**
- * Backoff time between retries during rebalance.
+ * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
*/
- public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
- this.rebalanceBackoffMs = rebalanceBackoffMs;
+ public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+ this.requestTimeoutMs = requestTimeoutMs;
}
- public Integer getRefreshLeaderBackoffMs() {
- return refreshLeaderBackoffMs;
+ public Integer getQueueBufferingMaxMessages() {
+ return queueBufferingMaxMessages;
}
/**
- * Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
+ * The maximum number of unsent messages that can be queued up the producer when using async
+ * mode before either the producer must be blocked or data must be dropped.
*/
- public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
- this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
+ public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+ this.queueBufferingMaxMessages = queueBufferingMaxMessages;
}
- public String getAutoOffsetReset() {
- return autoOffsetReset;
+ public String getSerializerClass() {
+ if (serializerClass == null) {
+ return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+ }
+ return serializerClass;
}
/**
- * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
- * smallest : automatically reset the offset to the smallest offset
- * largest : automatically reset the offset to the largest offset
- * fail: throw exception to the consumer
+ * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
+ * The default class is kafka.serializer.DefaultEncoder
*/
- public void setAutoOffsetReset(String autoOffsetReset) {
- this.autoOffsetReset = autoOffsetReset;
+ public void setSerializerClass(String serializerClass) {
+ this.serializerClass = serializerClass;
}
- public Integer getConsumerTimeoutMs() {
- return consumerTimeoutMs;
+ public String getKeySerializerClass() {
+ if (keySerializerClass == null) {
+ return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+ }
+ return keySerializerClass;
}
/**
- * Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
+ * The serializer class for keys (defaults to the same as for messages if nothing is given).
*/
- public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
- this.consumerTimeoutMs = consumerTimeoutMs;
+ public void setKeySerializerClass(String keySerializerClass) {
+ this.keySerializerClass = keySerializerClass;
}
- public Integer getZookeeperSessionTimeoutMs() {
- return zookeeperSessionTimeoutMs;
+ public String getKerberosInitCmd() {
+ return kerberosInitCmd;
}
/**
- * ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
+ * Kerberos kinit command path. Default is /usr/bin/kinit
*/
- public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) {
- this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
+ public void setKerberosInitCmd(String kerberosInitCmd) {
+ this.kerberosInitCmd = kerberosInitCmd;
}
- public Integer getZookeeperConnectionTimeoutMs() {
- return zookeeperConnectionTimeoutMs;
+ public Integer getKerberosBeforeReloginMinTime() {
+ return kerberosBeforeReloginMinTime;
}
/**
- * The max time that the client waits while establishing a connection to zookeeper.
+ * Login thread sleep time between refresh attempts.
*/
- public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
- this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
+ public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
+ this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime;
}
- public Integer getZookeeperSyncTimeMs() {
- return zookeeperSyncTimeMs;
+ public Double getKerberosRenewJitter() {
+ return kerberosRenewJitter;
}
/**
- * How far a ZK follower can be behind a ZK leader
+ * Percentage of random jitter added to the renewal time.
*/
- public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
- this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
+ public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+ this.kerberosRenewJitter = kerberosRenewJitter;
}
- public String getBrokers() {
- return brokers;
+ public Double getKerberosRenewWindowFactor() {
+ return kerberosRenewWindowFactor;
}
/**
- * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
- * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
- * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
- * <p/>
- * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+ * Login thread will sleep until the specified window factor of time from last
+ * refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.
*/
- public void setBrokers(String brokers) {
- this.brokers = brokers;
+ public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
+ this.kerberosRenewWindowFactor = kerberosRenewWindowFactor;
}
- public String getProducerType() {
- return producerType;
+ public String getSslCipherSuites() {
+ return sslCipherSuites;
}
/**
- * This parameter specifies whether the messages are sent asynchronously in a background thread.
- * Valid values are (1) async for asynchronous send and (2) sync for synchronous send.
- * By setting the producer to async we allow batching together of requests (which is great for throughput)
- * but open the possibility of a failure of the client machine dropping unsent data.
+ * A list of cipher suites. This is a named combination of authentication, encryption,
+ * MAC and key exchange algorithm used to negotiate the security settings for a network connection
+ * using TLS or SSL network protocol.By default all the available cipher suites are supported.
*/
- public void setProducerType(String producerType) {
- this.producerType = producerType;
+ public void setSslCipherSuites(String sslCipherSuites) {
+ this.sslCipherSuites = sslCipherSuites;
}
- public String getCompressionCodec() {
- return compressionCodec;
+ public String getSslEndpointAlgorithm() {
+ return sslEndpointAlgorithm;
}
/**
- * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
+ * The endpoint identification algorithm to validate server hostname using server certificate.
*/
- public void setCompressionCodec(String compressionCodec) {
- this.compressionCodec = compressionCodec;
+ public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+ this.sslEndpointAlgorithm = sslEndpointAlgorithm;
}
- public String getCompressedTopics() {
- return compressedTopics;
+ public String getSslKeymanagerAlgorithm() {
+ return sslKeymanagerAlgorithm;
}
/**
- * This parameter allows you to set whether compression should be turned on for particular topics.
- * If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any.
- * If the list of compressed topics is empty, then enable the specified compression codec for all topics.
- * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+ * The algorithm used by key manager factory for SSL connections. Default value is the key
+ * manager factory algorithm configured for the Java Virtual Machine.
*/
- public void setCompressedTopics(String compressedTopics) {
- this.compressedTopics = compressedTopics;
+ public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+ this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm;
}
- public Integer getMessageSendMaxRetries() {
- return messageSendMaxRetries;
+ public String getSslTrustmanagerAlgorithm() {
+ return sslTrustmanagerAlgorithm;
}
/**
- * This property will cause the producer to automatically retry a failed send request.
- * This property specifies the number of retries when such failures occur. Note that setting a non-zero value here
- * can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.
+ * The algorithm used by trust manager factory for SSL connections. Default value is the
+ * trust manager factory algorithm configured for the Java Virtual Machine.
*/
- public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
- this.messageSendMaxRetries = messageSendMaxRetries;
+ public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+ this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm;
}
- public Integer getRetryBackoffMs() {
- return retryBackoffMs;
+ public String getSslEnabledProtocols() {
+ return sslEnabledProtocols;
}
/**
- * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
- * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
+ * The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.
*/
- public void setRetryBackoffMs(Integer retryBackoffMs) {
- this.retryBackoffMs = retryBackoffMs;
+ public void setSslEnabledProtocols(String sslEnabledProtocols) {
+ this.sslEnabledProtocols = sslEnabledProtocols;
}
- public Integer getTopicMetadataRefreshIntervalMs() {
- return topicMetadataRefreshIntervalMs;
+ public String getSslKeystoreType() {
+ return sslKeystoreType;
}
/**
- * The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing,
- * leader not available...). It will also poll regularly (default: every 10min so 600000ms).
- * If you set this to a negative value, metadata will only get refreshed on failure.
- * If you set this to zero, the metadata will get refreshed after each message sent (not recommended).
- * Important note: the refresh happen only AFTER the message is sent, so if the producer never
- * sends a message the metadata is never refreshed
+ * The file format of the key store file. This is optional for client. Default value is JKS
*/
- public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) {
- this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
+ public void setSslKeystoreType(String sslKeystoreType) {
+ this.sslKeystoreType = sslKeystoreType;
}
- public Integer getSendBufferBytes() {
- return sendBufferBytes;
+ public String getSslProtocol() {
+ return sslProtocol;
}
/**
- * Socket write buffer size
+ * The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases.
+ * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs,
+ * but their usage is discouraged due to known security vulnerabilities.
*/
- public void setSendBufferBytes(Integer sendBufferBytes) {
- this.sendBufferBytes = sendBufferBytes;
+ public void setSslProtocol(String sslProtocol) {
+ this.sslProtocol = sslProtocol;
+ }
+
+ public String getSslProvider() {
+ return sslProvider;
+ }
+
+ /**
+ * The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
+ */
+ public void setSslProvider(String sslProvider) {
+ this.sslProvider = sslProvider;
+ }
+
+ public String getSslTruststoreType() {
+ return sslTruststoreType;
+ }
+
+ /**
+ * The file format of the trust store file. Default value is JKS.
+ */
+ public void setSslTruststoreType(String sslTruststoreType) {
+ this.sslTruststoreType = sslTruststoreType;
+ }
+
+ public String getSaslKerberosServiceName() {
+ return saslKerberosServiceName;
+ }
+
+ /**
+ * The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS
+ * config or in Kafka's config.
+ */
+ public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+ this.saslKerberosServiceName = saslKerberosServiceName;
+ }
+
+ public String getSecurityProtocol() {
+ return securityProtocol;
+ }
+
+ /**
+ * Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.
+ */
+ public void setSecurityProtocol(String securityProtocol) {
+ this.securityProtocol = securityProtocol;
+ }
+
+ public String getSslKeyPassword() {
+ return sslKeyPassword;
+ }
+
+ /**
+ * The password of the private key in the key store file. This is optional for client.
+ */
+ public void setSslKeyPassword(String sslKeyPassword) {
+ this.sslKeyPassword = sslKeyPassword;
+ }
+
+ public String getSslKeystoreLocation() {
+ return sslKeystoreLocation;
}
- public short getRequestRequiredAcks() {
+ /**
+ * The location of the key store file. This is optional for client and can be used for two-way
+ * authentication for client.
+ */
+ public void setSslKeystoreLocation(String sslKeystoreLocation) {
+ this.sslKeystoreLocation = sslKeystoreLocation;
+ }
+
+ public String getSslKeystorePassword() {
+ return sslKeystorePassword;
+ }
+
+ /**
+ * The store password for the key store file.This is optional for client and only needed
+ * if ssl.keystore.location is configured.
+ */
+ public void setSslKeystorePassword(String sslKeystorePassword) {
+ this.sslKeystorePassword = sslKeystorePassword;
+ }
+
+ public String getSslTruststoreLocation() {
+ return sslTruststoreLocation;
+ }
+
+ /**
+ * The location of the trust store file.
+ */
+ public void setSslTruststoreLocation(String sslTruststoreLocation) {
+ this.sslTruststoreLocation = sslTruststoreLocation;
+ }
+
+ public String getSslTruststorePassword() {
+ return sslTruststorePassword;
+ }
+
+
+ /**
+ * The password for the trust store file.
+ */
+ public void setSslTruststorePassword(String sslTruststorePassword) {
+ this.sslTruststorePassword = sslTruststorePassword;
+ }
+
+ public Integer getBufferMemorySize() {
+ return bufferMemorySize;
+ }
+
+ /**
+ * The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
+ * If records are sent faster than they can be delivered to the server the producer will either block
+ * or throw an exception based on the preference specified by block.on.buffer.full.This setting should
+ * correspond roughly to the total memory the producer will use, but is not a hard bound since not all
+ * memory the producer uses is used for buffering. Some additional memory will be used for compression
+ * (if compression is enabled) as well as for maintaining in-flight requests.
+ */
+ public void setBufferMemorySize(Integer bufferMemorySize) {
+ this.bufferMemorySize = bufferMemorySize;
+ }
+
+ public Boolean getBlockOnBufferFull() {
+ return blockOnBufferFull;
+ }
+
+ /**
+ * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors.
+ * By default this setting is true and we block, however in some scenarios blocking is not desirable and it
+ * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw
+ * a BufferExhaustedException if a recrord is sent and the buffer space is full.
+ */
+ public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+ this.blockOnBufferFull = blockOnBufferFull;
+ }
+
+ public Integer getRequestRequiredAcks() {
return requestRequiredAcks;
}
/**
- * This value controls when a produce request is considered completed. Specifically,
- * how many other brokers must have committed the data to their log and acknowledged this to the leader?
- * Typical values are (0, 1 or -1):
- * 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7).
- * This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
- * 1, which means that the producer gets an acknowledgement after the leader replica has received the data.
- * This option provides better durability as the client waits until the server acknowledges the request as successful
- * (only messages that were written to the now-dead leader but not yet replicated will be lost).
- * -1, The producer gets an acknowledgement after all in-sync replicas have received the data.
- * This option provides the greatest level of durability.
- * However, it does not completely eliminate the risk of message loss because the number of in sync replicas may,
- * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas
- * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting.
- * Please read the Replication section of the design documentation for a more in-depth discussion.
+ * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+ * This controls the durability of records that are sent. The following settings are common:
+ * acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all.
+ * The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server
+ * has received the record in this case, and the retries configuration will not take effect (as the client won't generally
+ * know of any failures). The offset given back for each record will always be set to -1.
+ * acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement
+ * from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have
+ * replicated it then the record will be lost.
+ * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the
+ * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
*/
- public void setRequestRequiredAcks(short requestRequiredAcks) {
+ public void setRequestRequiredAcks(Integer requestRequiredAcks) {
this.requestRequiredAcks = requestRequiredAcks;
}
- public Integer getRequestTimeoutMs() {
- return requestTimeoutMs;
+ public Integer getRetries() {
+ return retries;
}
/**
- * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
+ * Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
+ * Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially
+ * change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second
+ * succeeds, then the second record may appear first.
*/
- public void setRequestTimeoutMs(Integer requestTimeoutMs) {
- this.requestTimeoutMs = requestTimeoutMs;
+ public void setRetries(Integer retries) {
+ this.retries = retries;
}
- public Integer getQueueBufferingMaxMs() {
- return queueBufferingMaxMs;
+ public Integer getProducerBatchSize() {
+ return producerBatchSize;
}
/**
- * Maximum time to buffer data when using async mode.
- * For example a setting of 100 will try to batch together 100ms of messages to send at once.
- * This will improve throughput but adds message delivery latency due to the buffering.
+ * The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
+ * This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
+ * No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each
+ * partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero
+ * will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the
+ * specified batch size in anticipation of additional records.
*/
- public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
- this.queueBufferingMaxMs = queueBufferingMaxMs;
+ public void setProducerBatchSize(Integer producerBatchSize) {
+ this.producerBatchSize = producerBatchSize;
}
- public Integer getQueueBufferingMaxMessages() {
- return queueBufferingMaxMessages;
+ public Integer getConnectionMaxIdleMs() {
+ return connectionMaxIdleMs;
}
/**
- * The maximum number of unsent messages that can be queued up the producer when using async
- * mode before either the producer must be blocked or data must be dropped.
+ * Close idle connections after the number of milliseconds specified by this config.
*/
- public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
- this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+ public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+ this.connectionMaxIdleMs = connectionMaxIdleMs;
}
- public Integer getQueueEnqueueTimeoutMs() {
- return queueEnqueueTimeoutMs;
+ public Integer getLingerMs() {
+ return lingerMs;
}
/**
- * The amount of time to block before dropping messages when running in async mode and the buffer has reached
- * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full
- * (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.
+ * The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this
+ * occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce
+ * the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is,
+ * rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that
+ * the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on
+ * the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting,
+ * however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more
+ * records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the
+ * number of requests sent but would add up to 5ms of latency to records sent in the absense of load.
*/
- public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
- this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
+ public void setLingerMs(Integer lingerMs) {
+ this.lingerMs = lingerMs;
}
- public Integer getBatchNumMessages() {
- return batchNumMessages;
+ public Integer getMaxBlockMs() {
+ return maxBlockMs;
}
/**
- * The number of messages to send in one batch when using async mode.
- * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
+ * The configuration controls how long sending to kafka will block. These methods can be
+ * blocked for multiple reasons. For e.g: buffer full, metadata unavailable.This configuration imposes maximum limit on the total time spent
+ * in fetching metadata, serialization of key and value, partitioning and allocation of buffer memory when doing a send(). In case of
+ * partitionsFor(), this configuration imposes a maximum time threshold on waiting for metadata
*/
- public void setBatchNumMessages(Integer batchNumMessages) {
- this.batchNumMessages = batchNumMessages;
+ public void setMaxBlockMs(Integer maxBlockMs) {
+ this.maxBlockMs = maxBlockMs;
}
- public String getSerializerClass() {
- return serializerClass;
+ public Integer getMaxRequestSize() {
+ return maxRequestSize;
}
/**
- * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
- * The default class is kafka.serializer.DefaultEncoder
+ * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size
+ * which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid
+ * sending huge requests.
*/
- public void setSerializerClass(String serializerClass) {
- this.serializerClass = serializerClass;
+ public void setMaxRequestSize(Integer maxRequestSize) {
+ this.maxRequestSize = maxRequestSize;
}
- public String getKeySerializerClass() {
- return keySerializerClass;
+ public Integer getReceiveBufferBytes() {
+ return receiveBufferBytes;
}
/**
- * The serializer class for keys (defaults to the same as for messages if nothing is given).
+ * The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
*/
- public void setKeySerializerClass(String keySerializerClass) {
- this.keySerializerClass = keySerializerClass;
+ public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+ this.receiveBufferBytes = receiveBufferBytes;
}
- public String getOffsetsStorage() {
- return offsetsStorage;
+ public Integer getTimeoutMs() {
+ return timeoutMs;
}
/**
- * Select where offsets should be stored (zookeeper or kafka).
+ * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the
+ * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments
+ * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include
+ * the network latency of the request.
*/
- public void setOffsetsStorage(String offsetsStorage) {
- this.offsetsStorage = offsetsStorage;
+ public void setTimeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
}
- public Boolean isDualCommitEnabled() {
- return dualCommitEnabled;
+ public Integer getMaxInFlightRequest() {
+ return maxInFlightRequest;
}
/**
- * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka).
- * This is required during migration from zookeeper-based offset storage to kafka-based offset storage.
- * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated
- * to the new version that commits offsets to the broker (instead of directly to ZooKeeper).
+ * The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting
+ * is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
*/
- public void setDualCommitEnabled(Boolean dualCommitEnabled) {
- this.dualCommitEnabled = dualCommitEnabled;
+ public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+ this.maxInFlightRequest = maxInFlightRequest;
+ }
+
+ public Integer getMetadataFetchTimeoutMs() {
+ return metadataFetchTimeoutMs;
}
/**
- * Returns a copy of this configuration
+ * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions.
+ * This fetch to succeed before throwing an exception back to the client.
*/
- public KafkaConfiguration copy() {
- try {
- return (KafkaConfiguration)clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeCamelException(e);
- }
+ public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+ this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
+ }
+
+ public Integer getMetadataMaxAgeMs() {
+ return metadataMaxAgeMs;
+ }
+
+ /**
+ * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership
+ * changes to proactively discover any new brokers or partitions.
+ */
+ public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+ this.metadataMaxAgeMs = metadataMaxAgeMs;
+ }
+
+ public String getMetricReporters() {
+ return metricReporters;
+ }
+
+ /**
+ * A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be
+ * notified of new metric creation. The JmxReporter is always included to register JMX statistics.
+ */
+ public void setMetricReporters(String metricReporters) {
+ this.metricReporters = metricReporters;
+ }
+
+ public Integer getNoOfMetricsSample() {
+ return noOfMetricsSample;
+ }
+
+ /**
+ * The number of samples maintained to compute metrics.
+ */
+ public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+ this.noOfMetricsSample = noOfMetricsSample;
+ }
+
+ public Integer getMetricsSampleWindowMs() {
+ return metricsSampleWindowMs;
+ }
+
+ /**
+ * The number of samples maintained to compute metrics.
+ */
+ public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+ this.metricsSampleWindowMs = metricsSampleWindowMs;
+ }
+
+ public Integer getReconnectBackoffMs() {
+ return reconnectBackoffMs;
+ }
+
+ /**
+ * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host
+ * in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+ */
+ public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+ this.reconnectBackoffMs = reconnectBackoffMs;
+ }
+
+ public Integer getHeartbeatIntervalMs() {
+ return heartbeatIntervalMs;
+ }
+
+ /**
+ * The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.
+ * Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new
+ * consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set
+ * no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
+ */
+ public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ }
+
+ public Integer getMaxPartitionFetchBytes() {
+ return maxPartitionFetchBytes;
+ }
+
+ /**
+ * The maximum amount of data per-partition the server will return. The maximum total memory used for
+ * a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the
+ * maximum message size the server allows or else it is possible for the producer to send messages larger
+ * than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message
+ * on a certain partition.
+ */
+ public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+ this.maxPartitionFetchBytes = maxPartitionFetchBytes;
+ }
+
+ public Integer getSessionTimeoutMs() {
+ return sessionTimeoutMs;
+ }
+
+ /**
+ * The timeout used to detect failures when using Kafka's group management facilities.
+ */
+ public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ }
+
+ public String getPartitionAssignor() {
+ return partitionAssignor;
+ }
+
+ /**
+ * The class name of the partition assignment strategy that the client will use to distribute
+ * partition ownership amongst consumer instances when group management is used
+ */
+ public void setPartitionAssignor(String partitionAssignor) {
+ this.partitionAssignor = partitionAssignor;
+ }
+
+ public Integer getConsumerRequestTimeoutMs() {
+ return consumerRequestTimeoutMs;
+ }
+
+ /**
+ * The configuration controls the maximum amount of time the client will wait for the response
+ * of a request. If the response is not received before the timeout elapses the client will resend
+ * the request if necessary or fail the request if retries are exhausted.
+ */
+ public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+ this.consumerRequestTimeoutMs = consumerRequestTimeoutMs;
+ }
+
+ public Boolean getCheckCrcs() {
+ return checkCrcs;
+ }
+
+ /**
+ * Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk
+ * corruption to the messages occurred. This check adds some overhead, so it may be disabled in
+ * cases seeking extreme performance.
+ */
+ public void setCheckCrcs(Boolean checkCrcs) {
+ this.checkCrcs = checkCrcs;
+ }
+
+ public String getKeyDeserializer() {
+ return keyDeserializer;
+ }
+
+ /**
+ * Deserializer class for key that implements the Deserializer interface.
+ */
+ public void setKeyDeserializer(String keyDeserializer) {
+ this.keyDeserializer = keyDeserializer;
+ }
+
+ public String getValueDeserializer() {
+ return valueDeserializer;
+ }
+
+ /**
+ * Deserializer class for value that implements the Deserializer interface.
+ */
+ public void setValueDeserializer(String valueDeserializer) {
+ this.valueDeserializer = valueDeserializer;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index d3ff482..db99a09 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -16,21 +16,20 @@
*/
package org.apache.camel.component.kafka;
-/**
- *
- */
public final class KafkaConstants {
- public static final String DEFAULT_GROUP = "group1";
-
public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
- public static final String PARTITION = "kafka.PARTITION";
- public static final String KEY = "kafka.KEY";
+ public static final String PARTITION = "kafka.EXCHANGE_NAME";
+ public static final String KEY = "kafka.CONTENT_TYPE";
public static final String TOPIC = "kafka.TOPIC";
public static final String OFFSET = "kafka.OFFSET";
public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";
+ public static final String KAFKA_DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final String KAFKA_DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
+ public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor";
private KafkaConstants() {
// Utility class
[2/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java
client instead of scala. Thanks to Anbumani Balusamy for the patch.
Posted by da...@apache.org.
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 94893fb..e65ed3b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,30 +16,19 @@
*/
package org.apache.camel.component.kafka;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Arrays;
import java.util.Properties;
-import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- */
public class KafkaConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
@@ -47,15 +36,14 @@ public class KafkaConsumer extends DefaultConsumer {
protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private final Processor processor;
- private Map<ConsumerConnector, CyclicBarrier> consumerBarriers;
-
+
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
- this.consumerBarriers = new HashMap<ConsumerConnector, CyclicBarrier>();
- if (endpoint.getZookeeperConnect() == null) {
- throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
+
+ if (endpoint.getBrokers() == null) {
+ throw new IllegalArgumentException("BootStrap servers must be specified");
}
if (endpoint.getGroupId() == null) {
throw new IllegalArgumentException("groupId must not be null");
@@ -64,57 +52,25 @@ public class KafkaConsumer extends DefaultConsumer {
Properties getProps() {
Properties props = endpoint.getConfiguration().createConsumerProperties();
- props.put("zookeeper.connect", endpoint.getZookeeperConnect());
- props.put("group.id", endpoint.getGroupId());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId());
return props;
}
@Override
protected void doStart() throws Exception {
super.doStart();
- log.info("Starting Kafka consumer");
-
+ LOG.info("Starting Kafka consumer");
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConsumersCount(); i++) {
- ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
-
- // commit periodically
- if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
- if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
- && endpoint.getConsumerStreams() > 1) {
- LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
- }
- CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
- for (final KafkaStream<byte[], byte[]> stream : streams) {
- executor.submit(new BatchingConsumerTask(stream, barrier));
- }
- consumerBarriers.put(consumer, barrier);
- } else {
- // auto commit
- for (final KafkaStream<byte[], byte[]> stream : streams) {
- executor.submit(new AutoCommitConsumerTask(consumer, stream));
- }
- consumerBarriers.put(consumer, null);
- }
+ executor.submit(new KafkaFetchRecords(endpoint.getTopic(), i + "", getProps()));
}
-
}
@Override
protected void doStop() throws Exception {
super.doStop();
- log.info("Stopping Kafka consumer");
-
- for (ConsumerConnector consumer : consumerBarriers.keySet()) {
- if (consumer != null) {
- consumer.shutdown();
- }
- }
- consumerBarriers.clear();
+ LOG.info("Stopping Kafka consumer");
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
@@ -126,102 +82,57 @@ public class KafkaConsumer extends DefaultConsumer {
executor = null;
}
- class BatchingConsumerTask implements Runnable {
+ class KafkaFetchRecords implements Runnable {
- private KafkaStream<byte[], byte[]> stream;
- private CyclicBarrier barrier;
+ private final org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+ private final String topicName;
+ private final String threadId;
+ private final Properties kafkaProps;
- public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) {
- this.stream = stream;
- this.barrier = barrier;
+ public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+ this.topicName = topicName;
+ this.threadId = topicName + "-" + "Thread " + id;
+ this.kafkaProps = kafkaProps;
+ this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
}
+ @Override
+ @SuppressWarnings("unchecked")
public void run() {
-
int processed = 0;
- boolean consumerTimeout;
- MessageAndMetadata<byte[], byte[]> mm;
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- boolean hasNext = true;
- while (hasNext) {
- try {
- consumerTimeout = false;
- // only poll the next message if we are allowed to run and are not suspending
- if (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
- mm = it.next();
- Exchange exchange = endpoint.createKafkaExchange(mm);
+ try {
+ LOG.debug("Subscribing {} to topic {}", threadId, topicName);
+ consumer.subscribe(Arrays.asList(topicName));
+ while (isRunAllowed() && !isSuspendingOrSuspended()) {
+ ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);
+ for (ConsumerRecord<Object, Object> record : records) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
+ }
+ Exchange exchange = endpoint.createKafkaExchange(record);
try {
processor.process(exchange);
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ getExceptionHandler().handleException("Error during processing", exchange, e);
}
processed++;
- } else {
- // we don't need to process the message
- hasNext = false;
- }
- } catch (ConsumerTimeoutException e) {
- LOG.debug("Consumer timeout occurred due " + e.getMessage(), e);
- consumerTimeout = true;
- }
-
- if (processed >= endpoint.getBatchSize() || consumerTimeout
- || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round
- try {
- barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
- if (!consumerTimeout) {
- processed = 0;
+ // if autocommit is false
+ if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
+ if (processed >= endpoint.getBatchSize()) {
+ consumer.commitSync();
+ processed = 0;
+ }
}
- } catch (Exception e) {
- getExceptionHandler().handleException("Error waiting for batch to complete", e);
- break;
}
}
+ LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
+ consumer.unsubscribe();
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
}
}
- }
-
- class CommitOffsetTask implements Runnable {
-
- private final ConsumerConnector consumer;
-
- public CommitOffsetTask(ConsumerConnector consumer) {
- this.consumer = consumer;
- }
- @Override
- public void run() {
- LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
- consumer.commitOffsets();
- }
}
- class AutoCommitConsumerTask implements Runnable {
-
- private final ConsumerConnector consumer;
- private KafkaStream<byte[], byte[]> stream;
-
- public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) {
- this.consumer = consumer;
- this.stream = stream;
- }
-
- public void run() {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- // only poll the next message if we are allowed to run and are not suspending
- while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
- MessageAndMetadata<byte[], byte[]> mm = it.next();
- Exchange exchange = endpoint.createKafkaExchange(mm);
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
- }
- // no more data so commit offset
- LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
- consumer.commitOffsets();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 760ec19a..5a56c39 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -16,10 +16,9 @@
*/
package org.apache.camel.component.kafka;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
-import kafka.message.MessageAndMetadata;
-
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -29,6 +28,7 @@ import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* The kafka component allows messages to be sent to (or consumed from) Apache Kafka brokers.
@@ -38,8 +38,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
@UriParam
private KafkaConfiguration configuration = new KafkaConfiguration();
-
- @UriParam(description = "If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.", defaultValue = "false")
+ @UriParam
private boolean bridgeEndpoint;
public KafkaEndpoint() {
@@ -73,30 +72,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
@Override
public Producer createProducer() throws Exception {
- String msgClassName = getConfiguration().getSerializerClass();
- String keyClassName = getConfiguration().getKeySerializerClass();
- if (msgClassName == null) {
- msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER;
- }
- if (keyClassName == null) {
- keyClassName = msgClassName;
- }
-
- ClassLoader cl = getClass().getClassLoader();
-
- Class<?> k;
- try {
- k = cl.loadClass(keyClassName);
- } catch (ClassNotFoundException x) {
- k = getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName);
- }
- Class<?> v;
- try {
- v = cl.loadClass(msgClassName);
- } catch (ClassNotFoundException x) {
- v = getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName);
- }
- return createProducer(k, v, this);
+ return createProducer(this);
}
@Override
@@ -104,411 +80,597 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
return true;
}
+ @Override
+ public boolean isMultipleConsumersSupported() {
+ return true;
+ }
+
public ExecutorService createExecutor() {
return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
}
- public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
+ public Exchange createKafkaExchange(ConsumerRecord record) {
Exchange exchange = super.createExchange();
Message message = exchange.getIn();
- message.setHeader(KafkaConstants.PARTITION, mm.partition());
- message.setHeader(KafkaConstants.TOPIC, mm.topic());
- message.setHeader(KafkaConstants.OFFSET, mm.offset());
- if (mm.key() != null) {
- message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+ message.setHeader(KafkaConstants.PARTITION, record.partition());
+ message.setHeader(KafkaConstants.TOPIC, record.topic());
+ message.setHeader(KafkaConstants.OFFSET, record.offset());
+ if (record.key() != null) {
+ message.setHeader(KafkaConstants.KEY, record.key());
}
- message.setBody(mm.message());
+ message.setBody(record.value());
return exchange;
}
- protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass, KafkaEndpoint endpoint) {
- return new KafkaProducer<K, V>(endpoint);
+ protected KafkaProducer createProducer(KafkaEndpoint endpoint) {
+ return new KafkaProducer(endpoint);
}
// Delegated properties from the configuration
//-------------------------------------------------------------------------
- public String getZookeeperConnect() {
- return configuration.getZookeeperConnect();
+ public Properties createProducerProperties() {
+ return configuration.createProducerProperties();
+ }
+
+ public void setValueDeserializer(String valueDeserializer) {
+ configuration.setValueDeserializer(valueDeserializer);
}
- public void setZookeeperConnect(String zookeeperConnect) {
- configuration.setZookeeperConnect(zookeeperConnect);
+ public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+ configuration.setRequestTimeoutMs(requestTimeoutMs);
}
- public String getZookeeperHost() {
- return configuration.getZookeeperHost();
+ public void setProducerBatchSize(Integer producerBatchSize) {
+ configuration.setProducerBatchSize(producerBatchSize);
}
- public void setZookeeperHost(String zookeeperHost) {
- configuration.setZookeeperHost(zookeeperHost);
+ public void setRetryBackoffMs(Integer retryBackoffMs) {
+ configuration.setRetryBackoffMs(retryBackoffMs);
}
- public int getZookeeperPort() {
- return configuration.getZookeeperPort();
+ public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+ configuration.setNoOfMetricsSample(noOfMetricsSample);
}
- public void setZookeeperPort(int zookeeperPort) {
- configuration.setZookeeperPort(zookeeperPort);
+ public String getMetricReporters() {
+ return configuration.getMetricReporters();
}
- public String getGroupId() {
- return configuration.getGroupId();
+ public void setSslKeystoreType(String sslKeystoreType) {
+ configuration.setSslKeystoreType(sslKeystoreType);
+ }
+
+ public void setSslCipherSuites(String sslCipherSuites) {
+ configuration.setSslCipherSuites(sslCipherSuites);
+ }
+
+ public void setClientId(String clientId) {
+ configuration.setClientId(clientId);
+ }
+
+ public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+ configuration.setMetricsSampleWindowMs(metricsSampleWindowMs);
+ }
+
+ public String getKeyDeserializer() {
+ return configuration.getKeyDeserializer();
+ }
+
+ public int getConsumersCount() {
+ return configuration.getConsumersCount();
+ }
+
+ public String getSslKeyPassword() {
+ return configuration.getSslKeyPassword();
+ }
+
+ public void setSendBufferBytes(Integer sendBufferBytes) {
+ configuration.setSendBufferBytes(sendBufferBytes);
+ }
+
+ public Boolean isAutoCommitEnable() {
+ return configuration.isAutoCommitEnable();
+ }
+
+ public Integer getMaxBlockMs() {
+ return configuration.getMaxBlockMs();
+ }
+
+ public String getConsumerId() {
+ return configuration.getConsumerId();
+ }
+
+ public void setSslProtocol(String sslProtocol) {
+ configuration.setSslProtocol(sslProtocol);
+ }
+
+ public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+ configuration.setReceiveBufferBytes(receiveBufferBytes);
+ }
+
+ public Boolean getCheckCrcs() {
+ return configuration.getCheckCrcs();
}
public void setGroupId(String groupId) {
configuration.setGroupId(groupId);
}
- public String getPartitioner() {
- return configuration.getPartitioner();
+ public String getCompressionCodec() {
+ return configuration.getCompressionCodec();
}
- public void setPartitioner(String partitioner) {
- configuration.setPartitioner(partitioner);
+ public String getGroupId() {
+ return configuration.getGroupId();
}
- public String getTopic() {
- return configuration.getTopic();
+ public void setSslTruststoreLocation(String sslTruststoreLocation) {
+ configuration.setSslTruststoreLocation(sslTruststoreLocation);
}
- public void setTopic(String topic) {
- configuration.setTopic(topic);
+ public String getKerberosInitCmd() {
+ return configuration.getKerberosInitCmd();
}
- public String getBrokers() {
- return configuration.getBrokers();
+ public String getAutoOffsetReset() {
+ return configuration.getAutoOffsetReset();
+ }
+
+ public void setAutoCommitEnable(Boolean autoCommitEnable) {
+ configuration.setAutoCommitEnable(autoCommitEnable);
+ }
+
+ public void setSerializerClass(String serializerClass) {
+ configuration.setSerializerClass(serializerClass);
+ }
+
+ public Integer getQueueBufferingMaxMessages() {
+ return configuration.getQueueBufferingMaxMessages();
+ }
+
+ public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+ configuration.setSslEndpointAlgorithm(sslEndpointAlgorithm);
+ }
+
+ public void setRetries(Integer retries) {
+ configuration.setRetries(retries);
+ }
+
+ public void setAutoOffsetReset(String autoOffsetReset) {
+ configuration.setAutoOffsetReset(autoOffsetReset);
+ }
+
+ public Integer getSessionTimeoutMs() {
+ return configuration.getSessionTimeoutMs();
+ }
+
+ public Integer getBufferMemorySize() {
+ return configuration.getBufferMemorySize();
+ }
+
+ public String getKeySerializerClass() {
+ return configuration.getKeySerializerClass();
+ }
+
+ public void setSslProvider(String sslProvider) {
+ configuration.setSslProvider(sslProvider);
+ }
+
+ public void setFetchMinBytes(Integer fetchMinBytes) {
+ configuration.setFetchMinBytes(fetchMinBytes);
+ }
+
+ public Integer getAutoCommitIntervalMs() {
+ return configuration.getAutoCommitIntervalMs();
+ }
+
+ public void setKeySerializerClass(String keySerializerClass) {
+ configuration.setKeySerializerClass(keySerializerClass);
+ }
+
+ public Integer getConnectionMaxIdleMs() {
+ return configuration.getConnectionMaxIdleMs();
+ }
+
+ public Integer getReceiveBufferBytes() {
+ return configuration.getReceiveBufferBytes();
}
public void setBrokers(String brokers) {
configuration.setBrokers(brokers);
}
- public int getConsumerStreams() {
- return configuration.getConsumerStreams();
+ public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+ configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
}
- public void setConsumerStreams(int consumerStreams) {
- configuration.setConsumerStreams(consumerStreams);
+ public String getValueDeserializer() {
+ return configuration.getValueDeserializer();
}
- public int getBatchSize() {
- return configuration.getBatchSize();
+ public String getPartitioner() {
+ return configuration.getPartitioner();
}
- public void setBatchSize(int batchSize) {
- this.configuration.setBatchSize(batchSize);
+ public String getSslTruststoreLocation() {
+ return configuration.getSslTruststoreLocation();
+ }
+
+ public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+ configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+ }
+
+ public String getSslProvider() {
+ return configuration.getSslProvider();
+ }
+
+ public void setMetricReporters(String metricReporters) {
+ configuration.setMetricReporters(metricReporters);
+ }
+
+ public void setSslTruststorePassword(String sslTruststorePassword) {
+ configuration.setSslTruststorePassword(sslTruststorePassword);
+ }
+
+ public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+ configuration.setMaxInFlightRequest(maxInFlightRequest);
+ }
+
+ public String getTopic() {
+ return configuration.getTopic();
}
public int getBarrierAwaitTimeoutMs() {
return configuration.getBarrierAwaitTimeoutMs();
}
- public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
- this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+ public Integer getFetchMinBytes() {
+ return configuration.getFetchMinBytes();
}
- public int getConsumersCount() {
- return this.configuration.getConsumersCount();
+ public Integer getHeartbeatIntervalMs() {
+ return configuration.getHeartbeatIntervalMs();
}
- public void setConsumersCount(int consumersCount) {
- this.configuration.setConsumersCount(consumersCount);
+ public void setKeyDeserializer(String keyDeserializer) {
+ configuration.setKeyDeserializer(keyDeserializer);
}
- public void setConsumerTimeoutMs(int consumerTimeoutMs) {
- configuration.setConsumerTimeoutMs(consumerTimeoutMs);
+ public Integer getMaxRequestSize() {
+ return configuration.getMaxRequestSize();
}
- public void setSerializerClass(String serializerClass) {
- configuration.setSerializerClass(serializerClass);
+ public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+ configuration.setMetadataMaxAgeMs(metadataMaxAgeMs);
}
- public void setQueueBufferingMaxMessages(int queueBufferingMaxMessages) {
- configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
+ public String getSslKeystoreType() {
+ return configuration.getSslKeystoreType();
}
- public int getFetchWaitMaxMs() {
- return configuration.getFetchWaitMaxMs();
+ public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
+ configuration.setKerberosRenewWindowFactor(kerberosRenewWindowFactor);
}
- public Integer getZookeeperConnectionTimeoutMs() {
- return configuration.getZookeeperConnectionTimeoutMs();
+ public Integer getKerberosBeforeReloginMinTime() {
+ return configuration.getKerberosBeforeReloginMinTime();
}
- public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
- configuration.setZookeeperConnectionTimeoutMs(zookeeperConnectionTimeoutMs);
+ public String getSslEnabledProtocols() {
+ return configuration.getSslEnabledProtocols();
}
- public void setMessageSendMaxRetries(int messageSendMaxRetries) {
- configuration.setMessageSendMaxRetries(messageSendMaxRetries);
+ public Integer getMaxInFlightRequest() {
+ return configuration.getMaxInFlightRequest();
}
- public int getQueueBufferingMaxMs() {
- return configuration.getQueueBufferingMaxMs();
+ public Integer getProducerBatchSize() {
+ return configuration.getProducerBatchSize();
}
- public void setRequestRequiredAcks(short requestRequiredAcks) {
- configuration.setRequestRequiredAcks(requestRequiredAcks);
+ public void setSslKeystorePassword(String sslKeystorePassword) {
+ configuration.setSslKeystorePassword(sslKeystorePassword);
}
- public Integer getRebalanceBackoffMs() {
- return configuration.getRebalanceBackoffMs();
+ public Boolean getBlockOnBufferFull() {
+ return configuration.getBlockOnBufferFull();
}
- public void setQueueEnqueueTimeoutMs(int queueEnqueueTimeoutMs) {
- configuration.setQueueEnqueueTimeoutMs(queueEnqueueTimeoutMs);
+ public void setCheckCrcs(Boolean checkCrcs) {
+ configuration.setCheckCrcs(checkCrcs);
}
- public int getFetchMessageMaxBytes() {
- return configuration.getFetchMessageMaxBytes();
+ public int getConsumerStreams() {
+ return configuration.getConsumerStreams();
}
- public int getQueuedMaxMessages() {
- return configuration.getQueuedMaxMessageChunks();
+ public void setConsumersCount(int consumersCount) {
+ configuration.setConsumersCount(consumersCount);
}
- public int getAutoCommitIntervalMs() {
- return configuration.getAutoCommitIntervalMs();
+ public int getBatchSize() {
+ return configuration.getBatchSize();
}
- public void setSocketTimeoutMs(int socketTimeoutMs) {
- configuration.setSocketTimeoutMs(socketTimeoutMs);
+ public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+ configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
}
- public void setAutoCommitIntervalMs(int autoCommitIntervalMs) {
- configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
+ public void setSslTruststoreType(String sslTruststoreType) {
+ configuration.setSslTruststoreType(sslTruststoreType);
}
- public void setRequestTimeoutMs(int requestTimeoutMs) {
- configuration.setRequestTimeoutMs(requestTimeoutMs);
+ public Integer getConsumerRequestTimeoutMs() {
+ return configuration.getConsumerRequestTimeoutMs();
}
- public void setCompressedTopics(String compressedTopics) {
- configuration.setCompressedTopics(compressedTopics);
+ public String getSslKeystorePassword() {
+ return configuration.getSslKeystorePassword();
}
- public int getSocketReceiveBufferBytes() {
- return configuration.getSocketReceiveBufferBytes();
+ public void setSslKeyPassword(String sslKeyPassword) {
+ configuration.setSslKeyPassword(sslKeyPassword);
}
- public void setSendBufferBytes(int sendBufferBytes) {
- configuration.setSendBufferBytes(sendBufferBytes);
+ public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+ configuration.setBlockOnBufferFull(blockOnBufferFull);
}
- public void setFetchMessageMaxBytes(int fetchMessageMaxBytes) {
- configuration.setFetchMessageMaxBytes(fetchMessageMaxBytes);
+ public Integer getRequestRequiredAcks() {
+ return configuration.getRequestRequiredAcks();
}
- public int getRefreshLeaderBackoffMs() {
- return configuration.getRefreshLeaderBackoffMs();
+ public Double getKerberosRenewWindowFactor() {
+ return configuration.getKerberosRenewWindowFactor();
}
- public void setFetchWaitMaxMs(int fetchWaitMaxMs) {
- configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
+ public void setKerberosInitCmd(String kerberosInitCmd) {
+ configuration.setKerberosInitCmd(kerberosInitCmd);
}
- public int getTopicMetadataRefreshIntervalMs() {
- return configuration.getTopicMetadataRefreshIntervalMs();
+ public Integer getRetryBackoffMs() {
+ return configuration.getRetryBackoffMs();
}
- public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
- configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
+ public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+ configuration.setSslTrustmanagerAlgorithm(sslTrustmanagerAlgorithm);
}
- public Integer getConsumerTimeoutMs() {
- return configuration.getConsumerTimeoutMs();
+ public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+ configuration.setConsumerRequestTimeoutMs(consumerRequestTimeoutMs);
}
- public void setAutoCommitEnable(boolean autoCommitEnable) {
- configuration.setAutoCommitEnable(autoCommitEnable);
+ public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+ configuration.setReconnectBackoffMs(reconnectBackoffMs);
}
- public String getCompressionCodec() {
- return configuration.getCompressionCodec();
+ public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+ configuration.setKerberosRenewJitter(kerberosRenewJitter);
+ }
+
+ public String getSslKeystoreLocation() {
+ return configuration.getSslKeystoreLocation();
}
- public void setProducerType(String producerType) {
- configuration.setProducerType(producerType);
+ public Integer getNoOfMetricsSample() {
+ return configuration.getNoOfMetricsSample();
+ }
+
+ public String getSslKeymanagerAlgorithm() {
+ return configuration.getSslKeymanagerAlgorithm();
+ }
+
+ public void setConsumerId(String consumerId) {
+ configuration.setConsumerId(consumerId);
}
public String getClientId() {
return configuration.getClientId();
}
- public int getFetchMinBytes() {
- return configuration.getFetchMinBytes();
+ public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+ configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
}
- public String getAutoOffsetReset() {
- return configuration.getAutoOffsetReset();
+ public String getSslCipherSuites() {
+ return configuration.getSslCipherSuites();
}
- public void setRefreshLeaderBackoffMs(int refreshLeaderBackoffMs) {
- configuration.setRefreshLeaderBackoffMs(refreshLeaderBackoffMs);
+ public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+ configuration.setRequestRequiredAcks(requestRequiredAcks);
}
- public void setAutoOffsetReset(String autoOffsetReset) {
- configuration.setAutoOffsetReset(autoOffsetReset);
+ public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+ configuration.setConnectionMaxIdleMs(connectionMaxIdleMs);
}
- public void setConsumerId(String consumerId) {
- configuration.setConsumerId(consumerId);
+ public String getSslTrustmanagerAlgorithm() {
+ return configuration.getSslTrustmanagerAlgorithm();
}
- public int getRetryBackoffMs() {
- return configuration.getRetryBackoffMs();
+ public String getSslTruststorePassword() {
+ return configuration.getSslTruststorePassword();
}
- public int getRebalanceMaxRetries() {
- return configuration.getRebalanceMaxRetries();
+ public void setTimeoutMs(Integer timeoutMs) {
+ configuration.setTimeoutMs(timeoutMs);
}
- public Boolean isAutoCommitEnable() {
- return configuration.isAutoCommitEnable();
+ public void setConsumerStreams(int consumerStreams) {
+ configuration.setConsumerStreams(consumerStreams);
}
- public void setQueueBufferingMaxMs(int queueBufferingMaxMs) {
- configuration.setQueueBufferingMaxMs(queueBufferingMaxMs);
+ public String getSslTruststoreType() {
+ return configuration.getSslTruststoreType();
}
- public void setRebalanceMaxRetries(int rebalanceMaxRetries) {
- configuration.setRebalanceMaxRetries(rebalanceMaxRetries);
+ public String getSecurityProtocol() {
+ return configuration.getSecurityProtocol();
}
- public int getZookeeperSessionTimeoutMs() {
- return configuration.getZookeeperSessionTimeoutMs();
+ public void setBufferMemorySize(Integer bufferMemorySize) {
+ configuration.setBufferMemorySize(bufferMemorySize);
}
- public void setKeySerializerClass(String keySerializerClass) {
- configuration.setKeySerializerClass(keySerializerClass);
+ public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+ configuration.setSaslKerberosServiceName(saslKerberosServiceName);
}
public void setCompressionCodec(String compressionCodec) {
configuration.setCompressionCodec(compressionCodec);
}
- public void setClientId(String clientId) {
- configuration.setClientId(clientId);
+ public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
+ configuration.setKerberosBeforeReloginMinTime(kerberosBeforeReloginMinTime);
}
- public int getSocketTimeoutMs() {
- return configuration.getSocketTimeoutMs();
+ public Integer getMetadataMaxAgeMs() {
+ return configuration.getMetadataMaxAgeMs();
}
- public String getCompressedTopics() {
- return configuration.getCompressedTopics();
+ public String getSerializerClass() {
+ return configuration.getSerializerClass();
}
- public int getZookeeperSyncTimeMs() {
- return configuration.getZookeeperSyncTimeMs();
+ public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+ configuration.setSslKeymanagerAlgorithm(sslKeymanagerAlgorithm);
}
- public void setSocketReceiveBufferBytes(int socketReceiveBufferBytes) {
- configuration.setSocketReceiveBufferBytes(socketReceiveBufferBytes);
+ public void setMaxRequestSize(Integer maxRequestSize) {
+ configuration.setMaxRequestSize(maxRequestSize);
}
- public int getQueueEnqueueTimeoutMs() {
- return configuration.getQueueEnqueueTimeoutMs();
+ public Double getKerberosRenewJitter() {
+ return configuration.getKerberosRenewJitter();
}
- public int getQueueBufferingMaxMessages() {
- return configuration.getQueueBufferingMaxMessages();
+ public String getPartitionAssignor() {
+ return configuration.getPartitionAssignor();
}
- public void setZookeeperSyncTimeMs(int zookeeperSyncTimeMs) {
- configuration.setZookeeperSyncTimeMs(zookeeperSyncTimeMs);
+ public Integer getMetadataFetchTimeoutMs() {
+ return configuration.getMetadataFetchTimeoutMs();
}
- public String getKeySerializerClass() {
- return configuration.getKeySerializerClass();
+ public void setSecurityProtocol(String securityProtocol) {
+ configuration.setSecurityProtocol(securityProtocol);
}
- public void setTopicMetadataRefreshIntervalMs(int topicMetadataRefreshIntervalMs) {
- configuration.setTopicMetadataRefreshIntervalMs(topicMetadataRefreshIntervalMs);
+ public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+ configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
}
- public void setBatchNumMessages(int batchNumMessages) {
- configuration.setBatchNumMessages(batchNumMessages);
+ public String getSaslKerberosServiceName() {
+ return configuration.getSaslKerberosServiceName();
}
- public int getSendBufferBytes() {
- return configuration.getSendBufferBytes();
+ public void setBatchSize(int batchSize) {
+ configuration.setBatchSize(batchSize);
}
- public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
- configuration.setRebalanceBackoffMs(rebalanceBackoffMs);
+ public Integer getLingerMs() {
+ return configuration.getLingerMs();
}
- public void setQueuedMaxMessages(int queuedMaxMessages) {
- configuration.setQueuedMaxMessageChunks(queuedMaxMessages);
+ public Integer getRetries() {
+ return configuration.getRetries();
}
- public void setRetryBackoffMs(int retryBackoffMs) {
- configuration.setRetryBackoffMs(retryBackoffMs);
+ public Integer getMaxPartitionFetchBytes() {
+ return configuration.getMaxPartitionFetchBytes();
}
- public int getBatchNumMessages() {
- return configuration.getBatchNumMessages();
+ public String getSslEndpointAlgorithm() {
+ return configuration.getSslEndpointAlgorithm();
}
- public short getRequestRequiredAcks() {
- return configuration.getRequestRequiredAcks();
+ public Integer getReconnectBackoffMs() {
+ return configuration.getReconnectBackoffMs();
}
- public String getProducerType() {
- return configuration.getProducerType();
+ public void setLingerMs(Integer lingerMs) {
+ configuration.setLingerMs(lingerMs);
}
- public String getConsumerId() {
- return configuration.getConsumerId();
+ public void setPartitionAssignor(String partitionAssignor) {
+ configuration.setPartitionAssignor(partitionAssignor);
}
- public int getMessageSendMaxRetries() {
- return configuration.getMessageSendMaxRetries();
+ public Integer getRequestTimeoutMs() {
+ return configuration.getRequestTimeoutMs();
}
- public void setFetchMinBytes(int fetchMinBytes) {
- configuration.setFetchMinBytes(fetchMinBytes);
+ public Properties createConsumerProperties() {
+ return configuration.createConsumerProperties();
}
- public String getSerializerClass() {
- return configuration.getSerializerClass();
+ public void setTopic(String topic) {
+ configuration.setTopic(topic);
}
- public int getRequestTimeoutMs() {
- return configuration.getRequestTimeoutMs();
+ public Integer getFetchWaitMaxMs() {
+ return configuration.getFetchWaitMaxMs();
}
- @Override
- public boolean isMultipleConsumersSupported() {
- return true;
+ public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+ configuration.setSessionTimeoutMs(sessionTimeoutMs);
}
- public boolean isBridgeEndpoint() {
- return bridgeEndpoint;
+ public void setSslEnabledProtocols(String sslEnabledProtocols) {
+ configuration.setSslEnabledProtocols(sslEnabledProtocols);
}
- public void setBridgeEndpoint(boolean bridgeEndpoint) {
- this.bridgeEndpoint = bridgeEndpoint;
+ public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+ configuration.setHeartbeatIntervalMs(heartbeatIntervalMs);
}
- public String getOffsetsStorage() {
- return configuration.getOffsetsStorage();
+ public void setMaxBlockMs(Integer maxBlockMs) {
+ configuration.setMaxBlockMs(maxBlockMs);
}
- public void setOffsetsStorage(String offsetsStorage) {
- configuration.setOffsetsStorage(offsetsStorage);
+ public void setSslKeystoreLocation(String sslKeystoreLocation) {
+ configuration.setSslKeystoreLocation(sslKeystoreLocation);
}
- public Boolean isDualCommitEnabled() {
- return configuration.isDualCommitEnabled();
+ public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+ configuration.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
+ }
+
+ public void setPartitioner(String partitioner) {
+ configuration.setPartitioner(partitioner);
+ }
+
+ public String getBrokers() {
+ return configuration.getBrokers();
}
- public void setDualCommitEnabled(boolean dualCommitEnabled) {
- configuration.setDualCommitEnabled(dualCommitEnabled);
+ public Integer getMetricsSampleWindowMs() {
+ return configuration.getMetricsSampleWindowMs();
}
+ public Integer getSendBufferBytes() {
+ return configuration.getSendBufferBytes();
+ }
+
+ public Integer getTimeoutMs() {
+ return configuration.getTimeoutMs();
+ }
+
+ public String getSslProtocol() {
+ return configuration.getSslProtocol();
+ }
+
+ public boolean isBridgeEndpoint() {
+ return bridgeEndpoint;
+ }
+
+ /**
+ * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
+ */
+ public void setBridgeEndpoint(boolean bridgeEndpoint) {
+ this.bridgeEndpoint = bridgeEndpoint;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 06a0317..6f9ea79 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,20 +18,16 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.camel.CamelException;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
-/**
- *
- */
-public class KafkaProducer<K, V> extends DefaultProducer {
+public class KafkaProducer extends DefaultProducer {
- protected Producer<K, V> producer;
+ private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
public KafkaProducer(KafkaEndpoint endpoint) {
@@ -39,26 +35,38 @@ public class KafkaProducer<K, V> extends DefaultProducer {
this.endpoint = endpoint;
}
- @Override
- protected void doStop() throws Exception {
- if (producer != null) {
- producer.close();
- }
- }
-
Properties getProps() {
Properties props = endpoint.getConfiguration().createProducerProperties();
if (endpoint.getBrokers() != null) {
- props.put("metadata.broker.list", endpoint.getBrokers());
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
}
return props;
}
+ public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
+ return kafkaProducer;
+ }
+
+ /**
+ * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance.
+ */
+ public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (kafkaProducer != null) {
+ kafkaProducer.close();
+ }
+ }
+
@Override
protected void doStart() throws Exception {
Properties props = getProps();
- ProducerConfig config = new ProducerConfig(props);
- producer = new Producer<K, V>(config);
+ if (kafkaProducer == null) {
+ kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
+ }
}
@Override
@@ -71,26 +79,26 @@ public class KafkaProducer<K, V> extends DefaultProducer {
if (topic == null) {
throw new CamelExchangeException("No topic key set", exchange);
}
- K partitionKey = (K) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+ Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
boolean hasPartitionKey = partitionKey != null;
- K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY);
+ Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
boolean hasMessageKey = messageKey != null;
- V msg = (V) exchange.getIn().getBody();
- KeyedMessage<K, V> data;
+ Object msg = exchange.getIn().getBody();
+ ProducerRecord record;
if (hasPartitionKey && hasMessageKey) {
- data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
- } else if (hasPartitionKey) {
- data = new KeyedMessage<K, V>(topic, partitionKey, msg);
+ record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg);
} else if (hasMessageKey) {
- data = new KeyedMessage<K, V>(topic, messageKey, msg);
+ record = new ProducerRecord(topic, messageKey, msg);
} else {
log.warn("No message key or partition key set");
- data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
+ record = new ProducerRecord(topic, msg);
}
- producer.send(data);
+
+ // TODO: add support for async callback in the send
+ kafkaProducer.send(record);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 0d2b003..d87f885 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -30,26 +30,23 @@ import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class BaseEmbeddedKafkaTest extends CamelTestSupport {
+
static EmbeddedZookeeper embeddedZookeeper;
static EmbeddedKafkaCluster embeddedKafkaCluster;
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
private static volatile int zookeeperPort;
-
+
private static volatile int karfkaPort;
-
+
@BeforeClass
public static void beforeClass() {
// start from somewhere in the 23xxx range
zookeeperPort = AvailablePortFinder.getNextAvailable(23000);
// find another ports for proxy route test
karfkaPort = AvailablePortFinder.getNextAvailable(24000);
-
+
embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort);
List<Integer> kafkaPorts = new ArrayList<Integer>();
// -1 for any available port
@@ -60,9 +57,9 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
} catch (IOException e) {
e.printStackTrace();
}
- LOG.info("Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
+ System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
embeddedKafkaCluster.startup();
- LOG.info("Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
+ System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
}
@AfterClass
@@ -70,7 +67,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
embeddedKafkaCluster.shutdown();
embeddedZookeeper.shutdown();
}
-
+
@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry jndi = super.createRegistry();
@@ -81,7 +78,6 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
jndi.bind("prop", prop);
return jndi;
}
-
@Override
protected CamelContext createCamelContext() throws Exception {
@@ -89,12 +85,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
context.addComponent("properties", new PropertiesComponent("ref:prop"));
return context;
}
-
protected static int getZookeeperPort() {
return zookeeperPort;
}
-
+
protected static int getKarfkaPort() {
return karfkaPort;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index eb6dd09..31c2dd6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -18,14 +18,17 @@ package org.apache.camel.component.kafka;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import org.apache.camel.CamelContext;
-import org.apache.camel.spi.Registry;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class KafkaComponentTest {
@@ -34,10 +37,6 @@ public class KafkaComponentTest {
@Test
public void testPropertiesSet() throws Exception {
Map<String, Object> params = new HashMap<String, Object>();
- params.put("zookeeperHost", "somehost");
- params.put("zookeeperPort", 2987);
- params.put("portNumber", 14123);
- params.put("consumerStreams", "3");
params.put("topic", "mytopic");
params.put("partitioner", "com.class.Party");
@@ -45,97 +44,166 @@ public class KafkaComponentTest {
String remaining = "broker1:12345,broker2:12566";
KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
- assertEquals("somehost:2987", endpoint.getZookeeperConnect());
- assertEquals("somehost", endpoint.getZookeeperHost());
- assertEquals(2987, endpoint.getZookeeperPort());
assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
assertEquals("mytopic", endpoint.getTopic());
- assertEquals(3, endpoint.getConsumerStreams());
assertEquals("com.class.Party", endpoint.getPartitioner());
}
@Test
- public void testZookeeperConnectPropertyOverride() throws Exception {
+ public void testAllProducerConfigProperty() throws Exception {
Map<String, Object> params = new HashMap<String, Object>();
- params.put("zookeeperConnect", "thehost:2181/chroot");
- params.put("zookeeperHost", "somehost");
- params.put("zookeeperPort", 2987);
- params.put("portNumber", 14123);
- params.put("consumerStreams", "3");
- params.put("topic", "mytopic");
- params.put("partitioner", "com.class.Party");
+ setProducerProperty(params);
- String uri = "kafka:broker1:12345,broker2:12566";
- String remaining = "broker1:12345,broker2:12566";
+ String uri = "kafka:dev1:12345,dev2:12566";
+ String remaining = "dev1:12345,dev2:12566";
KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
- assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect());
- assertNull(endpoint.getZookeeperHost());
- assertEquals(-1, endpoint.getZookeeperPort());
- assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
- assertEquals("mytopic", endpoint.getTopic());
- assertEquals(3, endpoint.getConsumerStreams());
- assertEquals("com.class.Party", endpoint.getPartitioner());
+
+ assertEquals(new Integer(0), endpoint.getRequestRequiredAcks());
+ assertEquals(new Integer(1), endpoint.getBufferMemorySize());
+ assertEquals(new Integer(10), endpoint.getProducerBatchSize());
+ assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
+ assertEquals(new Integer(1), endpoint.getMaxBlockMs());
+ assertEquals(false, endpoint.getBlockOnBufferFull());
+ assertEquals(new Integer(1), endpoint.getBufferMemorySize());
+ assertEquals("testing", endpoint.getClientId());
+ assertEquals("none", endpoint.getCompressionCodec());
+ assertEquals(new Integer(1), endpoint.getLingerMs());
+ assertEquals(new Integer(100), endpoint.getMaxRequestSize());
+ assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
+ assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
+ assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
+ assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
+ assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
+ assertEquals(new Integer(0), endpoint.getRetries());
+ assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
+ assertEquals(765, endpoint.getSendBufferBytes().intValue());
+ assertEquals(new Integer(2045), endpoint.getTimeoutMs());
+ assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
+ assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport", endpoint.getMetricReporters());
+ assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
+ assertEquals(new Integer(12344), endpoint.getMetricsSampleWindowMs());
+ assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, endpoint.getSerializerClass());
+ assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, endpoint.getKeySerializerClass());
+ assertEquals("testing", endpoint.getSslKeyPassword());
+ assertEquals("/abc", endpoint.getSslKeystoreLocation());
+ assertEquals("testing", endpoint.getSslKeystorePassword());
+ assertEquals("/abc", endpoint.getSslTruststoreLocation());
+ assertEquals("testing", endpoint.getSslTruststorePassword());
+ assertEquals("test", endpoint.getSaslKerberosServiceName());
+ assertEquals("PLAINTEXT", endpoint.getSecurityProtocol());
+ assertEquals("TLSv1.2", endpoint.getSslEnabledProtocols());
+ assertEquals("JKS", endpoint.getSslKeystoreType());
+ assertEquals("TLS", endpoint.getSslProtocol());
+ assertEquals("test", endpoint.getSslProvider());
+ assertEquals("JKS", endpoint.getSslTruststoreType());
+ assertEquals("/usr/bin/kinit", endpoint.getKerberosInitCmd());
+ assertEquals(new Integer(60000), endpoint.getKerberosBeforeReloginMinTime());
+ assertEquals(new Double(0.05), endpoint.getKerberosRenewJitter());
+ assertEquals(new Double(0.8), endpoint.getKerberosRenewWindowFactor());
+ assertEquals("MAC", endpoint.getSslCipherSuites());
+ assertEquals("test", endpoint.getSslEndpointAlgorithm());
+ assertEquals("SunX509", endpoint.getSslKeymanagerAlgorithm());
+ assertEquals("PKIX", endpoint.getSslTrustmanagerAlgorithm());
}
@Test
- public void testPropertiesConfigrationMerge() throws Exception {
+ public void testAllProducerKeys() throws Exception {
Map<String, Object> params = new HashMap<String, Object>();
- params.put("portNumber", 14123);
- params.put("consumerStreams", "3");
- params.put("topic", "mytopic");
- params.put("partitioner", "com.class.Party");
- KafkaConfiguration kc = new KafkaConfiguration();
- kc.setZookeeperHost("somehost");
- kc.setZookeeperPort(2987);
- kc.setTopic("default");
- params.put("configuration", kc);
-
- String uri = "kafka:broker1:12345,broker2:12566";
- String remaining = "broker1:12345,broker2:12566";
+ String uri = "kafka:dev1:12345,dev2:12566";
+ String remaining = "dev1:12345,dev2:12566";
KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
- assertEquals("somehost:2987", endpoint.getZookeeperConnect());
- assertEquals("somehost", endpoint.getZookeeperHost());
- assertEquals(2987, endpoint.getZookeeperPort());
- assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
- assertEquals("mytopic", endpoint.getTopic());
- assertEquals(3, endpoint.getConsumerStreams());
- assertEquals("com.class.Party", endpoint.getPartitioner());
- assertNull("dirty", kc.getBrokers());
- assertEquals("default", kc.getTopic());
+ assertEquals(endpoint.getConfiguration().createProducerProperties().keySet(), getProducerKeys().keySet());
}
- @Test
- public void testPropertiesConfigrationRefMerge() throws Exception {
- Map<String, Object> params = new HashMap<String, Object>();
- params.put("portNumber", 14123);
- params.put("consumerStreams", "3");
- params.put("topic", "mytopic");
- params.put("partitioner", "com.class.Party");
-
- KafkaConfiguration kc = new KafkaConfiguration();
- kc.setZookeeperHost("somehost");
- kc.setZookeeperPort(2987);
- kc.setTopic("default");
- Registry registry = Mockito.mock(Registry.class);
- Mockito.when(registry.lookupByName("baseconf")).thenReturn(kc);
- Mockito.when(context.getRegistry()).thenReturn(registry);
- params.put("configuration", "#baseconf");
-
- String uri = "kafka:broker1:12345,broker2:12566";
- String remaining = "broker1:12345,broker2:12566";
+ private Properties getProducerKeys() {
+ Properties props = new Properties();
+
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
+ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
+ props.put(ProducerConfig.RETRIES_CONFIG, "0");
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
+ props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000");
+ props.put(ProducerConfig.LINGER_MS_CONFIG, "0");
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
+ props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1048576");
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+ props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
+ props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
+ props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
+ props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+ props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
+ props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
+ props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
+ props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");
+ props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "50");
+ props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
+ props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2, TLSv1.1, TLSv1");
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+ props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+ props.put(SaslConfigs.SASL_KERBEROS_KINIT_CMD, "/usr/bin/kinit");
+ props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "60000");
+ props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, "0.05");
+ props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, "0.8");
+ props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509");
+ props.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX");
+
+ return props;
+ }
- KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
- assertEquals("somehost:2987", endpoint.getZookeeperConnect());
- assertEquals("somehost", endpoint.getZookeeperHost());
- assertEquals(2987, endpoint.getZookeeperPort());
- assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
- assertEquals("mytopic", endpoint.getTopic());
- assertEquals(3, endpoint.getConsumerStreams());
- assertEquals("com.class.Party", endpoint.getPartitioner());
- assertNull("dirty", kc.getBrokers());
- assertEquals("default", kc.getTopic());
+ private void setProducerProperty(Map<String, Object> params) {
+ params.put("requestRequiredAcks", 0);
+ params.put("bufferMemorySize", 1);
+ params.put("compressionCodec", "none");
+ params.put("retries", 0);
+ params.put("producerBatchSize", 10);
+ params.put("connectionMaxIdleMs", 12);
+ params.put("lingerMs", 1);
+ params.put("maxBlockMs", 1);
+ params.put("maxRequestSize", 100);
+ params.put("receiveBufferBytes", 23);
+ params.put("requestTimeoutMs", 100);
+ params.put("sendBufferBytes", 765);
+ params.put("timeoutMs", 2045);
+ params.put("blockOnBufferFull", false);
+ params.put("maxInFlightRequest", 1);
+ params.put("metadataFetchTimeoutMs", 9043);
+ params.put("metadataMaxAgeMs", 1029);
+ params.put("reconnectBackoffMs", 234);
+ params.put("retryBackoffMs", 3782);
+ params.put("noOfMetricsSample", 3);
+ params.put("metricReporters", "org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport");
+ params.put("metricsSampleWindowMs", 12344);
+ params.put("clientId", "testing");
+ params.put("sslKeyPassword", "testing");
+ params.put("sslKeystoreLocation", "/abc");
+ params.put("sslKeystorePassword", "testing");
+ params.put("sslTruststoreLocation", "/abc");
+ params.put("sslTruststorePassword", "testing");
+ params.put("saslKerberosServiceName", "test");
+ params.put("securityProtocol", "PLAINTEXT");
+ params.put("sslEnabledProtocols", "TLSv1.2");
+ params.put("sslKeystoreType", "JKS");
+ params.put("sslProtocol", "TLS");
+ params.put("sslProvider", "test");
+ params.put("sslTruststoreType", "JKS");
+ params.put("kerberosInitCmd", "/usr/bin/kinit");
+ params.put("kerberosBeforeReloginMinTime", 60000);
+ params.put("kerberosRenewJitter", 0.05);
+ params.put("kerberosRenewWindowFactor", 0.8);
+ params.put("sslCipherSuites", "MAC");
+ params.put("sslEndpointAlgorithm", "test");
+ params.put("sslKeymanagerAlgorithm", "SunX509");
+ params.put("sslTrustmanagerAlgorithm", "PKIX");
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index a6f59c0..9128c62 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -18,13 +18,12 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -34,39 +33,38 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
public static final String TOPIC = "test";
@EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
- + "&zookeeperHost=localhost"
- + "&zookeeperPort={{zookeeperPort}}"
+ "&groupId=group1"
- + "&autoOffsetReset=smallest"
+ + "&autoOffsetReset=earliest"
+ "&autoCommitEnable=false"
+ "&batchSize=3"
+ "&consumerStreams=10"
- // If set the consumerTiemout too small the test will fail in JDK7
- + "&consumerTimeoutMs=300"
+ "&barrierAwaitTimeoutMs=1000"
- )
+ )
private Endpoint from;
@EndpointInject(uri = "mock:result")
private MockEndpoint to;
- private Producer<String, String> producer;
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
@Before
public void before() {
Properties props = new Properties();
- props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner");
- props.put("request.required.acks", "1");
- ProducerConfig config = new ProducerConfig(props);
- producer = new Producer<String, String>(config);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
}
@After
public void after() {
- producer.close();
+ if (producer != null) {
+ producer.close();
+ }
}
@Override
@@ -74,47 +72,39 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from(from).autoStartup(true).to(to).setId("First");
+ from(from).routeId("foo").to(to).setId("First");
}
};
}
@Test
public void kafkaMessagesIsConsumedByCamel() throws Exception {
+
//First 2 must not be committed since batch size is 3
to.expectedBodiesReceivedInAnyOrder("m1", "m2");
for (int k = 1; k <= 2; k++) {
String msg = "m" + k;
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
+ ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
producer.send(data);
}
- to.assertIsSatisfied(3000);
-
+ to.assertIsSatisfied();
+
to.reset();
+
+ to.expectedBodiesReceivedInAnyOrder("m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
+
//Restart endpoint,
- from.getCamelContext().stop();
- from.getCamelContext().start();
-
- to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
+ context.stopRoute("foo");
+ context.startRoute("foo");
//Second route must wake up and consume all from scratch and commit 9 consumed
for (int k = 3; k <= 10; k++) {
String msg = "m" + k;
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
+ ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
producer.send(data);
}
- to.assertIsSatisfied(3000);
-
- to.reset();
- //Restart endpoint,
- from.getCamelContext().stop();
- from.getCamelContext().start();
-
-
- //Only one message should left to consume by this consumer group
- to.expectedMessageCount(1);
- to.assertIsSatisfied(3000);
+ to.assertIsSatisfied();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index c49444b..4f8a0fd 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -19,46 +19,49 @@ package org.apache.camel.component.kafka;
import java.io.IOException;
import java.util.Properties;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
public static final String TOPIC = "test";
- @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}"
- + "&groupId=group1&autoOffsetReset=smallest")
+ @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
+ + "&groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+ + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true")
private Endpoint from;
@EndpointInject(uri = "mock:result")
private MockEndpoint to;
- private Producer<String, String> producer;
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
@Before
public void before() {
Properties props = new Properties();
- props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner");
- props.put("request.required.acks", "1");
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
- ProducerConfig config = new ProducerConfig(props);
- producer = new Producer<String, String>(config);
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
}
@After
public void after() {
- producer.close();
+ if (producer != null) {
+ producer.close();
+ }
}
@Override
@@ -78,10 +81,11 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
for (int k = 0; k < 5; k++) {
String msg = "message-" + k;
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
+ ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
producer.send(data);
}
to.assertIsSatisfied(3000);
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index b51c09e..86bc163 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -28,21 +28,21 @@ public class KafkaConsumerTest {
private Processor processor = mock(Processor.class);
@Test(expected = IllegalArgumentException.class)
- public void consumerRequiresZookeeperConnect() throws Exception {
+ public void consumerRequiresBootstrapServers() throws Exception {
Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
new KafkaConsumer(endpoint, processor);
}
@Test(expected = IllegalArgumentException.class)
public void consumerRequiresGroupId() throws Exception {
- Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+ Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234");
new KafkaConsumer(endpoint, processor);
}
@Test
- public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception {
+ public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception {
Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
- Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+ Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181");
new KafkaConsumer(endpoint, processor);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index be16766..bd25886 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -18,11 +18,8 @@ package org.apache.camel.component.kafka;
import java.net.URISyntaxException;
-import kafka.message.Message;
-import kafka.message.MessageAndMetadata;
-
-import kafka.serializer.DefaultDecoder;
import org.apache.camel.Exchange;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -35,12 +32,8 @@ public class KafkaEndpointTest {
KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent());
endpoint.setBrokers("localhost");
- Message message = new Message("mymessage".getBytes(), "somekey".getBytes());
- DefaultDecoder decoder = new DefaultDecoder(null);
- MessageAndMetadata<byte[], byte[]> mm =
- new MessageAndMetadata<byte[], byte[]>("topic", 4, message, 56, decoder, decoder);
-
- Exchange exchange = endpoint.createKafkaExchange(mm);
+ ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 4, 56, "somekey", "");
+ Exchange exchange = endpoint.createKafkaExchange(record);
assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY));
assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC));
assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));
[4/4] camel git commit: CAMEL-9467: Adjust feature for camel-kafka
Posted by da...@apache.org.
CAMEL-9467: Adjust feature for camel-kafka
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/621a704c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/621a704c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/621a704c
Branch: refs/heads/master
Commit: 621a704c455fbd2c3ae6e2ea2b21c0e7da5e5f23
Parents: b2aa831
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 9 10:09:38 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 9 10:09:38 2016 +0100
----------------------------------------------------------------------
platforms/karaf/features/src/main/resources/features.xml | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/621a704c/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 0062335..5809e92 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -973,13 +973,9 @@
<bundle>mvn:org.apache.camel/camel-jxpath/${project.version}</bundle>
</feature>
<feature name='camel-kafka' version='${project.version}' resolver='(obr)' start-level='50'>
- <bundle dependency='true'>mvn:org.scala-lang/scala-library/${scala-2.10-version}</bundle>
- <bundle dependency='true'>mvn:org.apache.zookeeper/zookeeper/${zookeeper-version}</bundle>
- <bundle dependency="true">wrap:mvn:com.101tec/zkclient/${zkclient-version}$Bundle-Version=${zkclient-version}&Export-Package=*;-noimport:=true;version="${zkclient-version}"</bundle>
- <bundle dependency='true'>wrap:mvn:com.yammer.metrics/metrics-core/${yammer-metrics-version}$Bundle-Version=${yammer-metrics-version}&Export-Package=*;-noimport:=true;version="${yammer-metrics-version}"</bundle>
- <bundle dependency='true'>wrap:mvn:com.yammer.metrics/metrics-annotation/${yammer-metrics-version}$Bundle-Version=${yammer-metrics-version}&Export-Package=*;-noimport:=true;version="${yammer-metrics-version}"</bundle>
- <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/${kafka-clients-bundle-version}</bundle>
- <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka_2.10/${kafka-bundle-version}</bundle>
+ <bundle dependency='true'>wrap:mvn:org.apache.kafka/kafka-clients/${kafka-version}</bundle>
+ <bundle dependency='true'>mvn:org.xerial.snappy/snappy-java/${snappy-version}</bundle>
+ <bundle dependency='true'>wrap:mvn:net.jpountz.lz4/lz4/1.2.0</bundle>
<feature version='${project.version}'>camel-core</feature>
<bundle>mvn:org.apache.camel/camel-kafka/${project.version}</bundle>
</feature>