You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/04/01 09:54:35 UTC
[1/2] git commit: CAMEL-7335 Expose kafka configuration properties to
the camel component with thanks to Fabien
Repository: camel
Updated Branches:
refs/heads/master c4fee06c6 -> cd733ddf0
CAMEL-7335 Expose kafka configuration properties to the camel component with thanks to Fabien
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2a680eb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2a680eb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2a680eb
Branch: refs/heads/master
Commit: e2a680eb947820ccd4fe20949366df3f1a3c3a33
Parents: c4fee06
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Apr 1 15:30:07 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Apr 1 15:30:07 2014 +0800
----------------------------------------------------------------------
.../component/kafka/KafkaConfiguration.java | 442 +++++++++++++++++++
.../camel/component/kafka/KafkaConsumer.java | 5 +-
.../camel/component/kafka/KafkaEndpoint.java | 357 +++++++++++++--
.../camel/component/kafka/KafkaProducer.java | 5 +-
.../camel/component/kafka/KafkaProducerIT.java | 3 +-
5 files changed, 763 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
new file mode 100644
index 0000000..88d5017
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import kafka.producer.DefaultPartitioner;
+
+public class KafkaConfiguration {
+ private String zookeeperHost;
+ private int zookeeperPort = 2181;
+ private String topic;
+ private String groupId;
+ private String partitioner = DefaultPartitioner.class.getCanonicalName();
+ private int consumerStreams = 10;
+
+ //Common configuration properties
+ private String clientId;
+
+ //Consumer configuration properties
+ private String consumerId;
+ private Integer socketTimeoutMs;
+ private Integer socketReceiveBufferBytes;
+ private Integer fetchMessageMaxBytes;
+ private Boolean autoCommitEnable;
+ private Integer autoCommitIntervalMs;
+ private Integer queuedMaxMessages;
+ private Integer rebalanceMaxRetries;
+ private Integer fetchMinBytes;
+ private Integer fetchWaitMaxMs;
+ private Integer rebalanceBackoffMs;
+ private Integer refreshLeaderBackoffMs;
+ private String autoOffsetReset;
+ private Integer consumerTimeoutMs;
+
+ //Zookeepr configuration properties
+ private Integer zookeeperSessionTimeoutMs;
+ private Integer zookeeperConnectionTimeoutMs;
+ private Integer zookeeperSyncTimeMs;
+
+ //Producer configuration properties
+ private String producerType;
+ private String compressionCodec;
+ private String compressedTopics;
+ private Integer messageSendMaxRetries;
+ private Integer retryBackoffMs;
+ private Integer topicMetadataRefreshIntervalMs;
+
+ //Sync producer config
+ private Integer sendBufferBytes;
+ private short requestRequiredAcks;
+ private Integer requestTimeoutMs;
+
+ //Async producer config
+ private Integer queueBufferingMaxMs;
+ private Integer queueBufferingMaxMessages;
+ private Integer queueEnqueueTimeoutMs;
+ private Integer batchNumMessages;
+ private String serializerClass;
+ private String keySerializerClass;
+
+ public KafkaConfiguration() {
+ }
+
+ public Properties createProducerProperties() {
+ Properties props = new Properties();
+ addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks());
+ addPropertyIfNotNull(props, "partitioner.class", getPartitioner());
+ addPropertyIfNotNull(props, "serializer.class", getSerializerClass());
+ addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass());
+ addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs());
+ addPropertyIfNotNull(props, "producer.type", getProducerType());
+ addPropertyIfNotNull(props, "compression.codec", getCompressionCodec());
+ addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics());
+ addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries());
+ addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs());
+ addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs());
+ addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs());
+ addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages());
+ addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs());
+ addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages());
+ addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes());
+ addPropertyIfNotNull(props, "client.id", getClientId());
+ return props;
+ }
+
+ public Properties createConsumerProperties() {
+ Properties props = new Properties();
+ addPropertyIfNotNull(props, "consumer.id", getConsumerId());
+ addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs());
+ addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes());
+ addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes());
+ addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable());
+ addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs());
+ addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages());
+ addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
+ addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
+ addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries());
+ addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs());
+ addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs());
+ addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset());
+ addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs());
+ addPropertyIfNotNull(props, "client.id", getClientId());
+ addPropertyIfNotNull(props, "zookeeper.session.timeout.ms ", getZookeeperSessionTimeoutMs());
+ addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
+ addPropertyIfNotNull(props, "zookeeper.sync.time.ms ", getZookeeperSyncTimeMs());
+ return props;
+ }
+
+ private static <T> void addPropertyIfNotNull(Properties props, String key, T value) {
+ if (value != null) {
+ // Kafka expects all properties as String
+ props.put(key, value.toString());
+ }
+ }
+
+ public String getZookeeperHost() {
+ return zookeeperHost;
+ }
+
+ public void setZookeeperHost(String zookeeperHost) {
+ this.zookeeperHost = zookeeperHost;
+ }
+
+ public int getZookeeperPort() {
+ return zookeeperPort;
+ }
+
+ public void setZookeeperPort(int zookeeperPort) {
+ this.zookeeperPort = zookeeperPort;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public String getPartitioner() {
+ return partitioner;
+ }
+
+ public void setPartitioner(String partitioner) {
+ this.partitioner = partitioner;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getConsumerStreams() {
+ return consumerStreams;
+ }
+
+ public void setConsumerStreams(int consumerStreams) {
+ this.consumerStreams = consumerStreams;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public Integer getSocketTimeoutMs() {
+ return socketTimeoutMs;
+ }
+
+ public void setSocketTimeoutMs(Integer socketTimeoutMs) {
+ this.socketTimeoutMs = socketTimeoutMs;
+ }
+
+ public Integer getSocketReceiveBufferBytes() {
+ return socketReceiveBufferBytes;
+ }
+
+ public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
+ this.socketReceiveBufferBytes = socketReceiveBufferBytes;
+ }
+
+ public Integer getFetchMessageMaxBytes() {
+ return fetchMessageMaxBytes;
+ }
+
+ public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
+ this.fetchMessageMaxBytes = fetchMessageMaxBytes;
+ }
+
+ public Boolean isAutoCommitEnable() {
+ return autoCommitEnable;
+ }
+
+ public void setAutoCommitEnable(Boolean autoCommitEnable) {
+ this.autoCommitEnable = autoCommitEnable;
+ }
+
+ public Integer getAutoCommitIntervalMs() {
+ return autoCommitIntervalMs;
+ }
+
+ public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+ this.autoCommitIntervalMs = autoCommitIntervalMs;
+ }
+
+ public Integer getQueuedMaxMessages() {
+ return queuedMaxMessages;
+ }
+
+ public void setQueuedMaxMessages(Integer queuedMaxMessages) {
+ this.queuedMaxMessages = queuedMaxMessages;
+ }
+
+ public Integer getRebalanceMaxRetries() {
+ return rebalanceMaxRetries;
+ }
+
+ public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
+ this.rebalanceMaxRetries = rebalanceMaxRetries;
+ }
+
+ public Integer getFetchMinBytes() {
+ return fetchMinBytes;
+ }
+
+ public void setFetchMinBytes(Integer fetchMinBytes) {
+ this.fetchMinBytes = fetchMinBytes;
+ }
+
+ public Integer getFetchWaitMaxMs() {
+ return fetchWaitMaxMs;
+ }
+
+ public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+ this.fetchWaitMaxMs = fetchWaitMaxMs;
+ }
+
+ public Integer getRebalanceBackoffMs() {
+ return rebalanceBackoffMs;
+ }
+
+ public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
+ this.rebalanceBackoffMs = rebalanceBackoffMs;
+ }
+
+ public Integer getRefreshLeaderBackoffMs() {
+ return refreshLeaderBackoffMs;
+ }
+
+ public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
+ this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
+ }
+
+ public String getAutoOffsetReset() {
+ return autoOffsetReset;
+ }
+
+ public void setAutoOffsetReset(String autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
+ }
+
+ public Integer getConsumerTimeoutMs() {
+ return consumerTimeoutMs;
+ }
+
+ public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
+ this.consumerTimeoutMs = consumerTimeoutMs;
+ }
+
+ public Integer getZookeeperSessionTimeoutMs() {
+ return zookeeperSessionTimeoutMs;
+ }
+
+ public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) {
+ this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
+ }
+
+ public Integer getZookeeperConnectionTimeoutMs() {
+ return zookeeperConnectionTimeoutMs;
+ }
+
+ public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
+ this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
+ }
+
+ public Integer getZookeeperSyncTimeMs() {
+ return zookeeperSyncTimeMs;
+ }
+
+ public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
+ this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
+ }
+
+ public String getProducerType() {
+ return producerType;
+ }
+
+ public void setProducerType(String producerType) {
+ this.producerType = producerType;
+ }
+
+ public String getCompressionCodec() {
+ return compressionCodec;
+ }
+
+ public void setCompressionCodec(String compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ }
+
+ public String getCompressedTopics() {
+ return compressedTopics;
+ }
+
+ public void setCompressedTopics(String compressedTopics) {
+ this.compressedTopics = compressedTopics;
+ }
+
+ public Integer getMessageSendMaxRetries() {
+ return messageSendMaxRetries;
+ }
+
+ public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
+ this.messageSendMaxRetries = messageSendMaxRetries;
+ }
+
+ public Integer getRetryBackoffMs() {
+ return retryBackoffMs;
+ }
+
+ public void setRetryBackoffMs(Integer retryBackoffMs) {
+ this.retryBackoffMs = retryBackoffMs;
+ }
+
+ public Integer getTopicMetadataRefreshIntervalMs() {
+ return topicMetadataRefreshIntervalMs;
+ }
+
+ public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) {
+ this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
+ }
+
+ public Integer getSendBufferBytes() {
+ return sendBufferBytes;
+ }
+
+ public void setSendBufferBytes(Integer sendBufferBytes) {
+ this.sendBufferBytes = sendBufferBytes;
+ }
+
+ public short getRequestRequiredAcks() {
+ return requestRequiredAcks;
+ }
+
+ public void setRequestRequiredAcks(short requestRequiredAcks) {
+ this.requestRequiredAcks = requestRequiredAcks;
+ }
+
+ public Integer getRequestTimeoutMs() {
+ return requestTimeoutMs;
+ }
+
+ public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+ this.requestTimeoutMs = requestTimeoutMs;
+ }
+
+ public Integer getQueueBufferingMaxMs() {
+ return queueBufferingMaxMs;
+ }
+
+ public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
+ this.queueBufferingMaxMs = queueBufferingMaxMs;
+ }
+
+ public Integer getQueueBufferingMaxMessages() {
+ return queueBufferingMaxMessages;
+ }
+
+ public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+ this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+ }
+
+ public Integer getQueueEnqueueTimeoutMs() {
+ return queueEnqueueTimeoutMs;
+ }
+
+ public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
+ this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
+ }
+
+ public Integer getBatchNumMessages() {
+ return batchNumMessages;
+ }
+
+ public void setBatchNumMessages(Integer batchNumMessages) {
+ this.batchNumMessages = batchNumMessages;
+ }
+
+ public String getSerializerClass() {
+ return serializerClass;
+ }
+
+ public void setSerializerClass(String serializerClass) {
+ this.serializerClass = serializerClass;
+ }
+
+ public String getKeySerializerClass() {
+ return keySerializerClass;
+ }
+
+ public void setKeySerializerClass(String keySerializerClass) {
+ this.keySerializerClass = keySerializerClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 8db7c54..990e942 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -63,12 +63,9 @@ public class KafkaConsumer extends DefaultConsumer {
}
Properties getProps() {
- Properties props = new Properties();
+ Properties props = endpoint.getConfiguration().createConsumerProperties();
props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort());
props.put("group.id", endpoint.getGroupId());
- props.put("zookeeper.session.timeout.ms", "400");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
return props;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index f88e3d6..9bdb100 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -35,12 +35,7 @@ import org.apache.camel.impl.DefaultMessage;
public class KafkaEndpoint extends DefaultEndpoint {
private String brokers;
- private String zookeeperHost;
- private int zookeeperPort;
- private String groupId;
- private int consumerStreams = 10;
- private String partitioner;
- private String topic;
+ private KafkaConfiguration configuration = new KafkaConfiguration();
public KafkaEndpoint() {
}
@@ -52,44 +47,97 @@ public class KafkaEndpoint extends DefaultEndpoint {
this.brokers = remaining.split("\\?")[0];
}
+ public KafkaConfiguration getConfiguration() {
+ if (configuration == null) {
+ configuration = createConfiguration();
+ }
+ return configuration;
+ }
+
+ public void setConfiguration(KafkaConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ protected KafkaConfiguration createConfiguration() {
+ return new KafkaConfiguration();
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ KafkaConsumer consumer = new KafkaConsumer(this, processor);
+ configureConsumer(consumer);
+ return consumer;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new KafkaProducer(this);
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public ExecutorService createExecutor() {
+ return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+ }
+
+ public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
+ Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
+
+ Message message = new DefaultMessage();
+ message.setHeader(KafkaConstants.PARTITION, mm.partition());
+ message.setHeader(KafkaConstants.TOPIC, mm.topic());
+ message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+ message.setBody(mm.message());
+ exchange.setIn(message);
+
+ return exchange;
+ }
+
+
+ // Delegated properties from the configuration
+ //-------------------------------------------------------------------------
+
public String getZookeeperHost() {
- return zookeeperHost;
+ return configuration.getZookeeperHost();
}
public void setZookeeperHost(String zookeeperHost) {
- this.zookeeperHost = zookeeperHost;
+ configuration.setZookeeperHost(zookeeperHost);
}
public int getZookeeperPort() {
- return zookeeperPort;
+ return configuration.getZookeeperPort();
}
public void setZookeeperPort(int zookeeperPort) {
- this.zookeeperPort = zookeeperPort;
+ configuration.setZookeeperPort(zookeeperPort);
}
public String getGroupId() {
- return groupId;
+ return configuration.getGroupId();
}
public void setGroupId(String groupId) {
- this.groupId = groupId;
+ configuration.setGroupId(groupId);
}
public String getPartitioner() {
- return partitioner;
+ return configuration.getPartitioner();
}
public void setPartitioner(String partitioner) {
- this.partitioner = partitioner;
+ configuration.setPartitioner(partitioner);
}
public String getTopic() {
- return topic;
+ return configuration.getTopic();
}
public void setTopic(String topic) {
- this.topic = topic;
+ configuration.setTopic(topic);
}
public String getBrokers() {
@@ -97,45 +145,274 @@ public class KafkaEndpoint extends DefaultEndpoint {
}
public int getConsumerStreams() {
- return consumerStreams;
+ return configuration.getConsumerStreams();
}
public void setConsumerStreams(int consumerStreams) {
- this.consumerStreams = consumerStreams;
+ configuration.setConsumerStreams(consumerStreams);
}
- public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
- Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
+ public void setConsumerTimeoutMs(int consumerTimeoutMs) {
+ configuration.setConsumerTimeoutMs(consumerTimeoutMs);
+ }
- Message message = new DefaultMessage();
- message.setHeader(KafkaConstants.PARTITION, mm.partition());
- message.setHeader(KafkaConstants.TOPIC, mm.topic());
- message.setHeader(KafkaConstants.KEY, new String(mm.key()));
- message.setBody(mm.message());
- exchange.setIn(message);
+ public void setSerializerClass(String serializerClass) {
+ configuration.setSerializerClass(serializerClass);
+ }
- return exchange;
+ public void setQueueBufferingMaxMessages(int queueBufferingMaxMessages) {
+ configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
}
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- KafkaConsumer consumer = new KafkaConsumer(this, processor);
- configureConsumer(consumer);
- return consumer;
+ public int getFetchWaitMaxMs() {
+ return configuration.getFetchWaitMaxMs();
}
- @Override
- public Producer createProducer() throws Exception {
- return new KafkaProducer(this);
+ public Integer getZookeeperConnectionTimeoutMs() {
+ return configuration.getZookeeperConnectionTimeoutMs();
}
- @Override
- public boolean isSingleton() {
- return true;
+ public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
+ configuration.setZookeeperConnectionTimeoutMs(zookeeperConnectionTimeoutMs);
}
- public ExecutorService createExecutor() {
- return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + getTopic() + "]", getConsumerStreams());
+ public void setMessageSendMaxRetries(int messageSendMaxRetries) {
+ configuration.setMessageSendMaxRetries(messageSendMaxRetries);
}
+ public int getQueueBufferingMaxMs() {
+ return configuration.getQueueBufferingMaxMs();
+ }
+
+ public void setRequestRequiredAcks(short requestRequiredAcks) {
+ configuration.setRequestRequiredAcks(requestRequiredAcks);
+ }
+
+ public Integer getRebalanceBackoffMs() {
+ return configuration.getRebalanceBackoffMs();
+ }
+
+ public void setQueueEnqueueTimeoutMs(int queueEnqueueTimeoutMs) {
+ configuration.setQueueEnqueueTimeoutMs(queueEnqueueTimeoutMs);
+ }
+
+ public int getFetchMessageMaxBytes() {
+ return configuration.getFetchMessageMaxBytes();
+ }
+
+ public int getQueuedMaxMessages() {
+ return configuration.getQueuedMaxMessages();
+ }
+
+ public int getAutoCommitIntervalMs() {
+ return configuration.getAutoCommitIntervalMs();
+ }
+
+ public void setSocketTimeoutMs(int socketTimeoutMs) {
+ configuration.setSocketTimeoutMs(socketTimeoutMs);
+ }
+
+ public void setAutoCommitIntervalMs(int autoCommitIntervalMs) {
+ configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
+ }
+
+ public void setRequestTimeoutMs(int requestTimeoutMs) {
+ configuration.setRequestTimeoutMs(requestTimeoutMs);
+ }
+
+ public void setCompressedTopics(String compressedTopics) {
+ configuration.setCompressedTopics(compressedTopics);
+ }
+
+ public int getSocketReceiveBufferBytes() {
+ return configuration.getSocketReceiveBufferBytes();
+ }
+
+ public void setSendBufferBytes(int sendBufferBytes) {
+ configuration.setSendBufferBytes(sendBufferBytes);
+ }
+
+ public void setFetchMessageMaxBytes(int fetchMessageMaxBytes) {
+ configuration.setFetchMessageMaxBytes(fetchMessageMaxBytes);
+ }
+
+ public int getRefreshLeaderBackoffMs() {
+ return configuration.getRefreshLeaderBackoffMs();
+ }
+
+ public void setFetchWaitMaxMs(int fetchWaitMaxMs) {
+ configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
+ }
+
+ public int getTopicMetadataRefreshIntervalMs() {
+ return configuration.getTopicMetadataRefreshIntervalMs();
+ }
+
+ public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
+ configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
+ }
+
+ public int getConsumerTimeoutMs() {
+ return configuration.getConsumerTimeoutMs();
+ }
+
+ public void setAutoCommitEnable(boolean autoCommitEnable) {
+ configuration.setAutoCommitEnable(autoCommitEnable);
+ }
+
+ public String getCompressionCodec() {
+ return configuration.getCompressionCodec();
+ }
+
+ public void setProducerType(String producerType) {
+ configuration.setProducerType(producerType);
+ }
+
+ public String getClientId() {
+ return configuration.getClientId();
+ }
+
+ public int getFetchMinBytes() {
+ return configuration.getFetchMinBytes();
+ }
+
+ public String getAutoOffsetReset() {
+ return configuration.getAutoOffsetReset();
+ }
+
+ public void setRefreshLeaderBackoffMs(int refreshLeaderBackoffMs) {
+ configuration.setRefreshLeaderBackoffMs(refreshLeaderBackoffMs);
+ }
+
+ public void setAutoOffsetReset(String autoOffsetReset) {
+ configuration.setAutoOffsetReset(autoOffsetReset);
+ }
+
+ public void setConsumerId(String consumerId) {
+ configuration.setConsumerId(consumerId);
+ }
+
+ public int getRetryBackoffMs() {
+ return configuration.getRetryBackoffMs();
+ }
+
+ public int getRebalanceMaxRetries() {
+ return configuration.getRebalanceMaxRetries();
+ }
+
+ public boolean isAutoCommitEnable() {
+ return configuration.isAutoCommitEnable();
+ }
+
+ public void setQueueBufferingMaxMs(int queueBufferingMaxMs) {
+ configuration.setQueueBufferingMaxMs(queueBufferingMaxMs);
+ }
+
+ public void setRebalanceMaxRetries(int rebalanceMaxRetries) {
+ configuration.setRebalanceMaxRetries(rebalanceMaxRetries);
+ }
+
+ public int getZookeeperSessionTimeoutMs() {
+ return configuration.getZookeeperSessionTimeoutMs();
+ }
+
+ public void setKeySerializerClass(String keySerializerClass) {
+ configuration.setKeySerializerClass(keySerializerClass);
+ }
+
+ public void setCompressionCodec(String compressionCodec) {
+ configuration.setCompressionCodec(compressionCodec);
+ }
+
+ public void setClientId(String clientId) {
+ configuration.setClientId(clientId);
+ }
+
+ public int getSocketTimeoutMs() {
+ return configuration.getSocketTimeoutMs();
+ }
+
+ public String getCompressedTopics() {
+ return configuration.getCompressedTopics();
+ }
+
+ public int getZookeeperSyncTimeMs() {
+ return configuration.getZookeeperSyncTimeMs();
+ }
+
+ public void setSocketReceiveBufferBytes(int socketReceiveBufferBytes) {
+ configuration.setSocketReceiveBufferBytes(socketReceiveBufferBytes);
+ }
+
+ public int getQueueEnqueueTimeoutMs() {
+ return configuration.getQueueEnqueueTimeoutMs();
+ }
+
+ public int getQueueBufferingMaxMessages() {
+ return configuration.getQueueBufferingMaxMessages();
+ }
+
+ public void setZookeeperSyncTimeMs(int zookeeperSyncTimeMs) {
+ configuration.setZookeeperSyncTimeMs(zookeeperSyncTimeMs);
+ }
+
+ public String getKeySerializerClass() {
+ return configuration.getKeySerializerClass();
+ }
+
+ public void setTopicMetadataRefreshIntervalMs(int topicMetadataRefreshIntervalMs) {
+ configuration.setTopicMetadataRefreshIntervalMs(topicMetadataRefreshIntervalMs);
+ }
+
+ public void setBatchNumMessages(int batchNumMessages) {
+ configuration.setBatchNumMessages(batchNumMessages);
+ }
+
+ public int getSendBufferBytes() {
+ return configuration.getSendBufferBytes();
+ }
+
+ public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
+ configuration.setRebalanceBackoffMs(rebalanceBackoffMs);
+ }
+
+ public void setQueuedMaxMessages(int queuedMaxMessages) {
+ configuration.setQueuedMaxMessages(queuedMaxMessages);
+ }
+
+ public void setRetryBackoffMs(int retryBackoffMs) {
+ configuration.setRetryBackoffMs(retryBackoffMs);
+ }
+
+ public int getBatchNumMessages() {
+ return configuration.getBatchNumMessages();
+ }
+
+ public short getRequestRequiredAcks() {
+ return configuration.getRequestRequiredAcks();
+ }
+
+ public String getProducerType() {
+ return configuration.getProducerType();
+ }
+
+ public String getConsumerId() {
+ return configuration.getConsumerId();
+ }
+
+ public int getMessageSendMaxRetries() {
+ return configuration.getMessageSendMaxRetries();
+ }
+
+ public void setFetchMinBytes(int fetchMinBytes) {
+ configuration.setFetchMinBytes(fetchMinBytes);
+ }
+
+ public String getSerializerClass() {
+ return configuration.getSerializerClass();
+ }
+
+ public int getRequestTimeoutMs() {
+ return configuration.getRequestTimeoutMs();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6c2d167..4d681f1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -48,11 +48,8 @@ public class KafkaProducer extends DefaultProducer {
}
Properties getProps() {
- Properties props = new Properties();
+ Properties props = endpoint.getConfiguration().createProducerProperties();
props.put("metadata.broker.list", endpoint.getBrokers());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", endpoint.getPartitioner());
- props.put("request.required.acks", "1");
return props;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 85fa272..36bb6c4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -50,7 +50,8 @@ public class KafkaProducerIT extends CamelTestSupport {
public static final String TOPIC = "test";
public static final String TOPIC_IN_HEADER = "testHeader";
- @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner")
+ @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC
+ + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder&requestRequiredAcks=1")
private Endpoint to;
@Produce(uri = "direct:start")
[2/2] git commit: Fixe bunch of eclipse warning of camel-kafka
component
Posted by ni...@apache.org.
Fixe bunch of eclipse warning of camel-kafka component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd733ddf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd733ddf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd733ddf
Branch: refs/heads/master
Commit: cd733ddf00fb228ed1163049c7f81e32da179feb
Parents: e2a680e
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Apr 1 15:53:55 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Apr 1 15:53:55 2014 +0800
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 11 +++-----
.../camel/component/kafka/KafkaProducerIT.java | 15 +++++++----
.../component/kafka/KafkaProducerTest.java | 28 +++++++++++---------
3 files changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cd733ddf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 990e942..3087a14 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,20 +27,15 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
-
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*
*/
public class KafkaConsumer extends DefaultConsumer {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
-
protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private final Processor processor;
@@ -82,7 +77,7 @@ public class KafkaConsumer extends DefaultConsumer {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
executor = endpoint.createExecutor();
- for (final KafkaStream stream : streams) {
+ for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ConsumerTask(stream));
}
}
@@ -107,9 +102,9 @@ public class KafkaConsumer extends DefaultConsumer {
class ConsumerTask implements Runnable {
- private KafkaStream stream;
+ private KafkaStream<byte[], byte[]> stream;
- public ConsumerTask(KafkaStream stream) {
+ public ConsumerTask(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cd733ddf/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 36bb6c4..f77d91a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -41,15 +41,19 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance running on 2181.
* The broker must have a topic called test created.
*/
public class KafkaProducerIT extends CamelTestSupport {
-
+
public static final String TOPIC = "test";
public static final String TOPIC_IN_HEADER = "testHeader";
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerIT.class);
+
@EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC
+ "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder&requestRequiredAcks=1")
private Endpoint to;
@@ -111,10 +115,10 @@ public class KafkaProducerIT extends CamelTestSupport {
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
ExecutorService executor = Executors.newFixedThreadPool(10);
- for (final KafkaStream stream : consumerMap.get(TOPIC)) {
+ for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC)) {
executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
}
- for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) {
+ for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC_IN_HEADER)) {
executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
}
}
@@ -131,10 +135,10 @@ public class KafkaProducerIT extends CamelTestSupport {
}
private static class KakfaTopicConsumer implements Runnable {
- private final KafkaStream stream;
+ private final KafkaStream<byte[], byte[]> stream;
private final CountDownLatch latch;
- public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) {
+ public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) {
this.stream = stream;
this.latch = latch;
}
@@ -144,6 +148,7 @@ public class KafkaProducerIT extends CamelTestSupport {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String msg = new String(it.next().message());
+ LOG.info("Get the message" + msg);
latch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cd733ddf/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index acdfc60..3c71417 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -42,6 +42,7 @@ public class KafkaProducerTest {
private Exchange exchange = Mockito.mock(Exchange.class);
private Message in = new DefaultMessage();
+ @SuppressWarnings({"unchecked"})
public KafkaProducerTest() throws IllegalAccessException, InstantiationException, ClassNotFoundException,
URISyntaxException {
endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
@@ -59,6 +60,7 @@ public class KafkaProducerTest {
}
@Test
+ @SuppressWarnings({"unchecked"})
public void processSendsMesssage() throws Exception {
endpoint.setTopic("sometopic");
@@ -80,10 +82,7 @@ public class KafkaProducerTest {
producer.process(exchange);
- ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
- Mockito.verify(producer.producer).send(captor.capture());
- assertEquals("4", captor.getValue().key());
- assertEquals("anotherTopic", captor.getValue().topic());
+ verifySendMessage("4", "anotherTopic");
}
@Test
@@ -96,10 +95,8 @@ public class KafkaProducerTest {
producer.process(exchange);
- ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
- Mockito.verify(producer.producer).send(captor.capture());
- assertEquals("4", captor.getValue().key());
- assertEquals("anotherTopic", captor.getValue().topic());
+ verifySendMessage("4", "anotherTopic");
+
}
@Test(expected = CamelException.class)
@@ -116,19 +113,26 @@ public class KafkaProducerTest {
Mockito.when(exchange.getIn()).thenReturn(in);
producer.process(exchange);
}
-
+
@Test
+
public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
- endpoint.setTopic("sometopic");
+ endpoint.setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
producer.process(exchange);
+ verifySendMessage("4", "someTopic");
+
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected void verifySendMessage(String key, String topic) {
ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
Mockito.verify(producer.producer).send(captor.capture());
- assertEquals("4", captor.getValue().key());
- assertEquals("sometopic", captor.getValue().topic());
+ assertEquals(key, captor.getValue().key());
+ assertEquals(topic, captor.getValue().topic());
}
}