You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/04/01 09:54:35 UTC

[1/2] git commit: CAMEL-7335 Expose kafka configuration properties to the camel component with thanks to Fabien

Repository: camel
Updated Branches:
  refs/heads/master c4fee06c6 -> cd733ddf0


CAMEL-7335 Expose kafka configuration properties to the camel component with thanks to Fabien


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2a680eb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2a680eb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2a680eb

Branch: refs/heads/master
Commit: e2a680eb947820ccd4fe20949366df3f1a3c3a33
Parents: c4fee06
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Apr 1 15:30:07 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Apr 1 15:30:07 2014 +0800

----------------------------------------------------------------------
 .../component/kafka/KafkaConfiguration.java     | 442 +++++++++++++++++++
 .../camel/component/kafka/KafkaConsumer.java    |   5 +-
 .../camel/component/kafka/KafkaEndpoint.java    | 357 +++++++++++++--
 .../camel/component/kafka/KafkaProducer.java    |   5 +-
 .../camel/component/kafka/KafkaProducerIT.java  |   3 +-
 5 files changed, 763 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
new file mode 100644
index 0000000..88d5017
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import kafka.producer.DefaultPartitioner;
+
+public class KafkaConfiguration {
+    private String zookeeperHost;
+    private int zookeeperPort = 2181;
+    private String topic;
+    private String groupId;
+    private String partitioner = DefaultPartitioner.class.getCanonicalName();
+    private int consumerStreams = 10;
+
+    //Common configuration properties
+    private String clientId;
+
+    //Consumer configuration properties
+    private String consumerId;
+    private Integer socketTimeoutMs;
+    private Integer socketReceiveBufferBytes;
+    private Integer fetchMessageMaxBytes;
+    private Boolean autoCommitEnable;
+    private Integer autoCommitIntervalMs;
+    private Integer queuedMaxMessages;
+    private Integer rebalanceMaxRetries;
+    private Integer fetchMinBytes;
+    private Integer fetchWaitMaxMs;
+    private Integer rebalanceBackoffMs;
+    private Integer refreshLeaderBackoffMs;
+    private String autoOffsetReset;
+    private Integer consumerTimeoutMs;
+
+    //Zookeepr configuration properties
+    private Integer zookeeperSessionTimeoutMs;
+    private Integer zookeeperConnectionTimeoutMs;
+    private Integer zookeeperSyncTimeMs;
+
+    //Producer configuration properties
+    private String producerType;
+    private String compressionCodec;
+    private String compressedTopics;
+    private Integer messageSendMaxRetries;
+    private Integer retryBackoffMs;
+    private Integer topicMetadataRefreshIntervalMs;
+
+    //Sync producer config
+    private Integer sendBufferBytes;
+    private short requestRequiredAcks;
+    private Integer requestTimeoutMs;
+
+    //Async producer config
+    private Integer queueBufferingMaxMs;
+    private Integer queueBufferingMaxMessages;
+    private Integer queueEnqueueTimeoutMs;
+    private Integer batchNumMessages;
+    private String serializerClass;
+    private String keySerializerClass;
+
+    public KafkaConfiguration() {
+    }
+
+    public Properties createProducerProperties() {
+        Properties props = new Properties();
+        addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks());
+        addPropertyIfNotNull(props, "partitioner.class", getPartitioner());
+        addPropertyIfNotNull(props, "serializer.class", getSerializerClass());
+        addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass());
+        addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs());
+        addPropertyIfNotNull(props, "producer.type", getProducerType());
+        addPropertyIfNotNull(props, "compression.codec", getCompressionCodec());
+        addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics());
+        addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries());
+        addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs());
+        addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs());
+        addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs());
+        addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages());
+        addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs());
+        addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages());
+        addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes());
+        addPropertyIfNotNull(props, "client.id", getClientId());
+        return props;
+    }
+
+    public Properties createConsumerProperties() {
+        Properties props = new Properties();
+        addPropertyIfNotNull(props, "consumer.id", getConsumerId());
+        addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs());
+        addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes());
+        addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes());
+        addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable());
+        addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs());
+        addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages());
+        addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
+        addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
+        addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries());
+        addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs());
+        addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs());
+        addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset());
+        addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs());
+        addPropertyIfNotNull(props, "client.id", getClientId());
+        addPropertyIfNotNull(props, "zookeeper.session.timeout.ms ", getZookeeperSessionTimeoutMs());
+        addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
+        addPropertyIfNotNull(props, "zookeeper.sync.time.ms ", getZookeeperSyncTimeMs());
+        return props;
+    }
+
+    private static <T> void addPropertyIfNotNull(Properties props, String key, T value) {
+        if (value != null) {
+            // Kafka expects all properties as String
+            props.put(key, value.toString());
+        }
+    }
+
+    public String getZookeeperHost() {
+        return zookeeperHost;
+    }
+
+    public void setZookeeperHost(String zookeeperHost) {
+        this.zookeeperHost = zookeeperHost;
+    }
+
+    public int getZookeeperPort() {
+        return zookeeperPort;
+    }
+
+    public void setZookeeperPort(int zookeeperPort) {
+        this.zookeeperPort = zookeeperPort;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getPartitioner() {
+        return partitioner;
+    }
+
+    public void setPartitioner(String partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getConsumerStreams() {
+        return consumerStreams;
+    }
+
+    public void setConsumerStreams(int consumerStreams) {
+        this.consumerStreams = consumerStreams;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(String consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    public Integer getSocketTimeoutMs() {
+        return socketTimeoutMs;
+    }
+
+    public void setSocketTimeoutMs(Integer socketTimeoutMs) {
+        this.socketTimeoutMs = socketTimeoutMs;
+    }
+
+    public Integer getSocketReceiveBufferBytes() {
+        return socketReceiveBufferBytes;
+    }
+
+    public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
+        this.socketReceiveBufferBytes = socketReceiveBufferBytes;
+    }
+
+    public Integer getFetchMessageMaxBytes() {
+        return fetchMessageMaxBytes;
+    }
+
+    public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
+        this.fetchMessageMaxBytes = fetchMessageMaxBytes;
+    }
+
+    public Boolean isAutoCommitEnable() {
+        return autoCommitEnable;
+    }
+
+    public void setAutoCommitEnable(Boolean autoCommitEnable) {
+        this.autoCommitEnable = autoCommitEnable;
+    }
+
+    public Integer getAutoCommitIntervalMs() {
+        return autoCommitIntervalMs;
+    }
+
+    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+        this.autoCommitIntervalMs = autoCommitIntervalMs;
+    }
+
+    public Integer getQueuedMaxMessages() {
+        return queuedMaxMessages;
+    }
+
+    public void setQueuedMaxMessages(Integer queuedMaxMessages) {
+        this.queuedMaxMessages = queuedMaxMessages;
+    }
+
+    public Integer getRebalanceMaxRetries() {
+        return rebalanceMaxRetries;
+    }
+
+    public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
+        this.rebalanceMaxRetries = rebalanceMaxRetries;
+    }
+
+    public Integer getFetchMinBytes() {
+        return fetchMinBytes;
+    }
+
+    public void setFetchMinBytes(Integer fetchMinBytes) {
+        this.fetchMinBytes = fetchMinBytes;
+    }
+
+    public Integer getFetchWaitMaxMs() {
+        return fetchWaitMaxMs;
+    }
+
+    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+        this.fetchWaitMaxMs = fetchWaitMaxMs;
+    }
+
+    public Integer getRebalanceBackoffMs() {
+        return rebalanceBackoffMs;
+    }
+
+    public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
+        this.rebalanceBackoffMs = rebalanceBackoffMs;
+    }
+
+    public Integer getRefreshLeaderBackoffMs() {
+        return refreshLeaderBackoffMs;
+    }
+
+    public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
+        this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
+    }
+
+    public String getAutoOffsetReset() {
+        return autoOffsetReset;
+    }
+
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        this.autoOffsetReset = autoOffsetReset;
+    }
+
+    public Integer getConsumerTimeoutMs() {
+        return consumerTimeoutMs;
+    }
+
+    public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
+        this.consumerTimeoutMs = consumerTimeoutMs;
+    }
+
+    public Integer getZookeeperSessionTimeoutMs() {
+        return zookeeperSessionTimeoutMs;
+    }
+
+    public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) {
+        this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
+    }
+
+    public Integer getZookeeperConnectionTimeoutMs() {
+        return zookeeperConnectionTimeoutMs;
+    }
+
+    public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
+        this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
+    }
+
+    public Integer getZookeeperSyncTimeMs() {
+        return zookeeperSyncTimeMs;
+    }
+
+    public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
+        this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
+    }
+
+    public String getProducerType() {
+        return producerType;
+    }
+
+    public void setProducerType(String producerType) {
+        this.producerType = producerType;
+    }
+
+    public String getCompressionCodec() {
+        return compressionCodec;
+    }
+
+    public void setCompressionCodec(String compressionCodec) {
+        this.compressionCodec = compressionCodec;
+    }
+
+    public String getCompressedTopics() {
+        return compressedTopics;
+    }
+
+    public void setCompressedTopics(String compressedTopics) {
+        this.compressedTopics = compressedTopics;
+    }
+
+    public Integer getMessageSendMaxRetries() {
+        return messageSendMaxRetries;
+    }
+
+    public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
+        this.messageSendMaxRetries = messageSendMaxRetries;
+    }
+
+    public Integer getRetryBackoffMs() {
+        return retryBackoffMs;
+    }
+
+    public void setRetryBackoffMs(Integer retryBackoffMs) {
+        this.retryBackoffMs = retryBackoffMs;
+    }
+
+    public Integer getTopicMetadataRefreshIntervalMs() {
+        return topicMetadataRefreshIntervalMs;
+    }
+
+    public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) {
+        this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
+    }
+
+    public Integer getSendBufferBytes() {
+        return sendBufferBytes;
+    }
+
+    public void setSendBufferBytes(Integer sendBufferBytes) {
+        this.sendBufferBytes = sendBufferBytes;
+    }
+
+    public short getRequestRequiredAcks() {
+        return requestRequiredAcks;
+    }
+
+    public void setRequestRequiredAcks(short requestRequiredAcks) {
+        this.requestRequiredAcks = requestRequiredAcks;
+    }
+
+    public Integer getRequestTimeoutMs() {
+        return requestTimeoutMs;
+    }
+
+    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
+    public Integer getQueueBufferingMaxMs() {
+        return queueBufferingMaxMs;
+    }
+
+    public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
+        this.queueBufferingMaxMs = queueBufferingMaxMs;
+    }
+
+    public Integer getQueueBufferingMaxMessages() {
+        return queueBufferingMaxMessages;
+    }
+
+    public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+        this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+    }
+
+    public Integer getQueueEnqueueTimeoutMs() {
+        return queueEnqueueTimeoutMs;
+    }
+
+    public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
+        this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
+    }
+
+    public Integer getBatchNumMessages() {
+        return batchNumMessages;
+    }
+
+    public void setBatchNumMessages(Integer batchNumMessages) {
+        this.batchNumMessages = batchNumMessages;
+    }
+
+    public String getSerializerClass() {
+        return serializerClass;
+    }
+
+    public void setSerializerClass(String serializerClass) {
+        this.serializerClass = serializerClass;
+    }
+
+    public String getKeySerializerClass() {
+        return keySerializerClass;
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        this.keySerializerClass = keySerializerClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 8db7c54..990e942 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -63,12 +63,9 @@ public class KafkaConsumer extends DefaultConsumer {
     }
 
     Properties getProps() {
-        Properties props = new Properties();
+        Properties props = endpoint.getConfiguration().createConsumerProperties();
         props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort());
         props.put("group.id", endpoint.getGroupId());
-        props.put("zookeeper.session.timeout.ms", "400");
-        props.put("zookeeper.sync.time.ms", "200");
-        props.put("auto.commit.interval.ms", "1000");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index f88e3d6..9bdb100 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -35,12 +35,7 @@ import org.apache.camel.impl.DefaultMessage;
 public class KafkaEndpoint extends DefaultEndpoint {
 
     private String brokers;
-    private String zookeeperHost;
-    private int zookeeperPort;
-    private String groupId;
-    private int consumerStreams = 10;
-    private String partitioner;
-    private String topic;
+    private KafkaConfiguration configuration = new KafkaConfiguration();
 
     public KafkaEndpoint() {
     }
@@ -52,44 +47,97 @@ public class KafkaEndpoint extends DefaultEndpoint {
         this.brokers = remaining.split("\\?")[0];
     }
 
+    public KafkaConfiguration getConfiguration() {
+        if (configuration == null) {
+            configuration = createConfiguration();
+        }
+        return configuration;
+    }
+
+    public void setConfiguration(KafkaConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    protected KafkaConfiguration createConfiguration() {
+        return new KafkaConfiguration();
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        KafkaConsumer consumer = new KafkaConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new KafkaProducer(this);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public ExecutorService createExecutor() {
+        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+    }
+
+    public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
+        Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
+
+        Message message = new DefaultMessage();
+        message.setHeader(KafkaConstants.PARTITION, mm.partition());
+        message.setHeader(KafkaConstants.TOPIC, mm.topic());
+        message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        message.setBody(mm.message());
+        exchange.setIn(message);
+
+        return exchange;
+    }
+
+
+    // Delegated properties from the configuration
+    //-------------------------------------------------------------------------
+
     public String getZookeeperHost() {
-        return zookeeperHost;
+        return configuration.getZookeeperHost();
     }
 
     public void setZookeeperHost(String zookeeperHost) {
-        this.zookeeperHost = zookeeperHost;
+        configuration.setZookeeperHost(zookeeperHost);
     }
 
     public int getZookeeperPort() {
-        return zookeeperPort;
+        return configuration.getZookeeperPort();
     }
 
     public void setZookeeperPort(int zookeeperPort) {
-        this.zookeeperPort = zookeeperPort;
+        configuration.setZookeeperPort(zookeeperPort);
     }
 
     public String getGroupId() {
-        return groupId;
+        return configuration.getGroupId();
     }
 
     public void setGroupId(String groupId) {
-        this.groupId = groupId;
+        configuration.setGroupId(groupId);
     }
 
     public String getPartitioner() {
-        return partitioner;
+        return configuration.getPartitioner();
     }
 
     public void setPartitioner(String partitioner) {
-        this.partitioner = partitioner;
+        configuration.setPartitioner(partitioner);
     }
 
     public String getTopic() {
-        return topic;
+        return configuration.getTopic();
     }
 
     public void setTopic(String topic) {
-        this.topic = topic;
+        configuration.setTopic(topic);
     }
 
     public String getBrokers() {
@@ -97,45 +145,274 @@ public class KafkaEndpoint extends DefaultEndpoint {
     }
 
     public int getConsumerStreams() {
-        return consumerStreams;
+        return configuration.getConsumerStreams();
     }
 
     public void setConsumerStreams(int consumerStreams) {
-        this.consumerStreams = consumerStreams;
+        configuration.setConsumerStreams(consumerStreams);
     }
 
-    public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
-        Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
+    public void setConsumerTimeoutMs(int consumerTimeoutMs) {
+        configuration.setConsumerTimeoutMs(consumerTimeoutMs);
+    }
 
-        Message message = new DefaultMessage();
-        message.setHeader(KafkaConstants.PARTITION, mm.partition());
-        message.setHeader(KafkaConstants.TOPIC, mm.topic());
-        message.setHeader(KafkaConstants.KEY, new String(mm.key()));
-        message.setBody(mm.message());
-        exchange.setIn(message);
+    public void setSerializerClass(String serializerClass) {
+        configuration.setSerializerClass(serializerClass);
+    }
 
-        return exchange;
+    public void setQueueBufferingMaxMessages(int queueBufferingMaxMessages) {
+        configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
     }
 
-    @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        KafkaConsumer consumer = new KafkaConsumer(this, processor);
-        configureConsumer(consumer);
-        return consumer;
+    public int getFetchWaitMaxMs() {
+        return configuration.getFetchWaitMaxMs();
     }
 
-    @Override
-    public Producer createProducer() throws Exception {
-        return new KafkaProducer(this);
+    public Integer getZookeeperConnectionTimeoutMs() {
+        return configuration.getZookeeperConnectionTimeoutMs();
     }
 
-    @Override
-    public boolean isSingleton() {
-        return true;
+    public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
+        configuration.setZookeeperConnectionTimeoutMs(zookeeperConnectionTimeoutMs);
     }
 
-    public ExecutorService createExecutor() {
-        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + getTopic() + "]", getConsumerStreams());
+    public void setMessageSendMaxRetries(int messageSendMaxRetries) {
+        configuration.setMessageSendMaxRetries(messageSendMaxRetries);
     }
 
+    public int getQueueBufferingMaxMs() {
+        return configuration.getQueueBufferingMaxMs();
+    }
+
+    public void setRequestRequiredAcks(short requestRequiredAcks) {
+        configuration.setRequestRequiredAcks(requestRequiredAcks);
+    }
+
+    public Integer getRebalanceBackoffMs() {
+        return configuration.getRebalanceBackoffMs();
+    }
+
+    public void setQueueEnqueueTimeoutMs(int queueEnqueueTimeoutMs) {
+        configuration.setQueueEnqueueTimeoutMs(queueEnqueueTimeoutMs);
+    }
+
+    public int getFetchMessageMaxBytes() {
+        return configuration.getFetchMessageMaxBytes();
+    }
+
+    public int getQueuedMaxMessages() {
+        return configuration.getQueuedMaxMessages();
+    }
+
+    public int getAutoCommitIntervalMs() {
+        return configuration.getAutoCommitIntervalMs();
+    }
+
+    public void setSocketTimeoutMs(int socketTimeoutMs) {
+        configuration.setSocketTimeoutMs(socketTimeoutMs);
+    }
+
+    public void setAutoCommitIntervalMs(int autoCommitIntervalMs) {
+        configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
+    }
+
+    public void setRequestTimeoutMs(int requestTimeoutMs) {
+        configuration.setRequestTimeoutMs(requestTimeoutMs);
+    }
+
+    public void setCompressedTopics(String compressedTopics) {
+        configuration.setCompressedTopics(compressedTopics);
+    }
+
+    public int getSocketReceiveBufferBytes() {
+        return configuration.getSocketReceiveBufferBytes();
+    }
+
+    public void setSendBufferBytes(int sendBufferBytes) {
+        configuration.setSendBufferBytes(sendBufferBytes);
+    }
+
+    public void setFetchMessageMaxBytes(int fetchMessageMaxBytes) {
+        configuration.setFetchMessageMaxBytes(fetchMessageMaxBytes);
+    }
+
+    public int getRefreshLeaderBackoffMs() {
+        return configuration.getRefreshLeaderBackoffMs();
+    }
+
+    public void setFetchWaitMaxMs(int fetchWaitMaxMs) {
+        configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
+    }
+
+    public int getTopicMetadataRefreshIntervalMs() {
+        return configuration.getTopicMetadataRefreshIntervalMs();
+    }
+
+    public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
+        configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
+    }
+
+    public int getConsumerTimeoutMs() {
+        return configuration.getConsumerTimeoutMs();
+    }
+
+    public void setAutoCommitEnable(boolean autoCommitEnable) {
+        configuration.setAutoCommitEnable(autoCommitEnable);
+    }
+
+    public String getCompressionCodec() {
+        return configuration.getCompressionCodec();
+    }
+
+    public void setProducerType(String producerType) {
+        configuration.setProducerType(producerType);
+    }
+
+    public String getClientId() {
+        return configuration.getClientId();
+    }
+
+    public int getFetchMinBytes() {
+        return configuration.getFetchMinBytes();
+    }
+
+    public String getAutoOffsetReset() {
+        return configuration.getAutoOffsetReset();
+    }
+
+    public void setRefreshLeaderBackoffMs(int refreshLeaderBackoffMs) {
+        configuration.setRefreshLeaderBackoffMs(refreshLeaderBackoffMs);
+    }
+
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        configuration.setAutoOffsetReset(autoOffsetReset);
+    }
+
+    public void setConsumerId(String consumerId) {
+        configuration.setConsumerId(consumerId);
+    }
+
+    public int getRetryBackoffMs() {
+        return configuration.getRetryBackoffMs();
+    }
+
+    public int getRebalanceMaxRetries() {
+        return configuration.getRebalanceMaxRetries();
+    }
+
+    public boolean isAutoCommitEnable() {
+        return configuration.isAutoCommitEnable();
+    }
+
+    public void setQueueBufferingMaxMs(int queueBufferingMaxMs) {
+        configuration.setQueueBufferingMaxMs(queueBufferingMaxMs);
+    }
+
+    public void setRebalanceMaxRetries(int rebalanceMaxRetries) {
+        configuration.setRebalanceMaxRetries(rebalanceMaxRetries);
+    }
+
+    public int getZookeeperSessionTimeoutMs() {
+        return configuration.getZookeeperSessionTimeoutMs();
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        configuration.setKeySerializerClass(keySerializerClass);
+    }
+
+    public void setCompressionCodec(String compressionCodec) {
+        configuration.setCompressionCodec(compressionCodec);
+    }
+
+    public void setClientId(String clientId) {
+        configuration.setClientId(clientId);
+    }
+
+    public int getSocketTimeoutMs() {
+        return configuration.getSocketTimeoutMs();
+    }
+
+    public String getCompressedTopics() {
+        return configuration.getCompressedTopics();
+    }
+
+    public int getZookeeperSyncTimeMs() {
+        return configuration.getZookeeperSyncTimeMs();
+    }
+
+    public void setSocketReceiveBufferBytes(int socketReceiveBufferBytes) {
+        configuration.setSocketReceiveBufferBytes(socketReceiveBufferBytes);
+    }
+
+    public int getQueueEnqueueTimeoutMs() {
+        return configuration.getQueueEnqueueTimeoutMs();
+    }
+
+    public int getQueueBufferingMaxMessages() {
+        return configuration.getQueueBufferingMaxMessages();
+    }
+
+    public void setZookeeperSyncTimeMs(int zookeeperSyncTimeMs) {
+        configuration.setZookeeperSyncTimeMs(zookeeperSyncTimeMs);
+    }
+
+    public String getKeySerializerClass() {
+        return configuration.getKeySerializerClass();
+    }
+
+    public void setTopicMetadataRefreshIntervalMs(int topicMetadataRefreshIntervalMs) {
+        configuration.setTopicMetadataRefreshIntervalMs(topicMetadataRefreshIntervalMs);
+    }
+
+    public void setBatchNumMessages(int batchNumMessages) {
+        configuration.setBatchNumMessages(batchNumMessages);
+    }
+
+    public int getSendBufferBytes() {
+        return configuration.getSendBufferBytes();
+    }
+
+    public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
+        configuration.setRebalanceBackoffMs(rebalanceBackoffMs);
+    }
+
+    public void setQueuedMaxMessages(int queuedMaxMessages) {
+        configuration.setQueuedMaxMessages(queuedMaxMessages);
+    }
+
+    public void setRetryBackoffMs(int retryBackoffMs) {
+        configuration.setRetryBackoffMs(retryBackoffMs);
+    }
+
+    public int getBatchNumMessages() {
+        return configuration.getBatchNumMessages();
+    }
+
+    public short getRequestRequiredAcks() {
+        return configuration.getRequestRequiredAcks();
+    }
+
+    public String getProducerType() {
+        return configuration.getProducerType();
+    }
+
+    public String getConsumerId() {
+        return configuration.getConsumerId();
+    }
+
+    public int getMessageSendMaxRetries() {
+        return configuration.getMessageSendMaxRetries();
+    }
+
+    public void setFetchMinBytes(int fetchMinBytes) {
+        configuration.setFetchMinBytes(fetchMinBytes);
+    }
+
+    public String getSerializerClass() {
+        return configuration.getSerializerClass();
+    }
+
+    public int getRequestTimeoutMs() {
+        return configuration.getRequestTimeoutMs();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6c2d167..4d681f1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -48,11 +48,8 @@ public class KafkaProducer extends DefaultProducer {
     }
 
     Properties getProps() {
-        Properties props = new Properties();
+        Properties props = endpoint.getConfiguration().createProducerProperties();
         props.put("metadata.broker.list", endpoint.getBrokers());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", endpoint.getPartitioner());
-        props.put("request.required.acks", "1");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e2a680eb/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 85fa272..36bb6c4 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -50,7 +50,8 @@ public class KafkaProducerIT extends CamelTestSupport {
     public static final String TOPIC = "test";
     public static final String TOPIC_IN_HEADER = "testHeader";
 
-    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner")
+    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC 
+        + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder&requestRequiredAcks=1")
     private Endpoint to;
 
     @Produce(uri = "direct:start")


[2/2] git commit: Fixe bunch of eclipse warning of camel-kafka component

Posted by ni...@apache.org.
Fixe bunch of eclipse warning of camel-kafka component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd733ddf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd733ddf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd733ddf

Branch: refs/heads/master
Commit: cd733ddf00fb228ed1163049c7f81e32da179feb
Parents: e2a680e
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Apr 1 15:53:55 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Apr 1 15:53:55 2014 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 11 +++-----
 .../camel/component/kafka/KafkaProducerIT.java  | 15 +++++++----
 .../component/kafka/KafkaProducerTest.java      | 28 +++++++++++---------
 3 files changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cd733ddf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 990e942..3087a14 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,20 +27,15 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  *
  */
 public class KafkaConsumer extends DefaultConsumer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
-
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
@@ -82,7 +77,7 @@ public class KafkaConsumer extends DefaultConsumer {
         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
 
         executor = endpoint.createExecutor();
-        for (final KafkaStream stream : streams) {
+        for (final KafkaStream<byte[], byte[]> stream : streams) {
             executor.submit(new ConsumerTask(stream));
         }
     }
@@ -107,9 +102,9 @@ public class KafkaConsumer extends DefaultConsumer {
 
     class ConsumerTask implements Runnable {
 
-        private KafkaStream stream;
+        private KafkaStream<byte[], byte[]> stream;
 
-        public ConsumerTask(KafkaStream stream) {
+        public ConsumerTask(KafkaStream<byte[], byte[]> stream) {
             this.stream = stream;
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cd733ddf/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 36bb6c4..f77d91a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -41,15 +41,19 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance running on 2181.
  * The broker must have a topic called test created.
  */
 public class KafkaProducerIT extends CamelTestSupport {
-
+    
     public static final String TOPIC = "test";
     public static final String TOPIC_IN_HEADER = "testHeader";
 
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerIT.class);
+
     @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC 
         + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder&requestRequiredAcks=1")
     private Endpoint to;
@@ -111,10 +115,10 @@ public class KafkaProducerIT extends CamelTestSupport {
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
 
         ExecutorService executor = Executors.newFixedThreadPool(10);
-        for (final KafkaStream stream : consumerMap.get(TOPIC)) {
+        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC)) {
             executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
         }
-        for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) {
+        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC_IN_HEADER)) {
             executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
         }
     }
@@ -131,10 +135,10 @@ public class KafkaProducerIT extends CamelTestSupport {
     }
 
     private static class KakfaTopicConsumer implements Runnable {
-        private final KafkaStream stream;
+        private final KafkaStream<byte[], byte[]> stream;
         private final CountDownLatch latch;
 
-        public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) {
+        public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) {
             this.stream = stream;
             this.latch = latch;
         }
@@ -144,6 +148,7 @@ public class KafkaProducerIT extends CamelTestSupport {
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
             while (it.hasNext()) {
                 String msg = new String(it.next().message());
+                LOG.info("Get the message" + msg);
                 latch.countDown();
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/cd733ddf/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index acdfc60..3c71417 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -42,6 +42,7 @@ public class KafkaProducerTest {
     private Exchange exchange = Mockito.mock(Exchange.class);
     private Message in = new DefaultMessage();
 
+    @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws IllegalAccessException, InstantiationException, ClassNotFoundException,
             URISyntaxException {
         endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
@@ -59,6 +60,7 @@ public class KafkaProducerTest {
     }
 
     @Test
+    @SuppressWarnings({"unchecked"})
     public void processSendsMesssage() throws Exception {
 
         endpoint.setTopic("sometopic");
@@ -80,10 +82,7 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
-        Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals("4", captor.getValue().key());
-        assertEquals("anotherTopic", captor.getValue().topic());
+        verifySendMessage("4", "anotherTopic");
     }
 
     @Test
@@ -96,10 +95,8 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
-        Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals("4", captor.getValue().key());
-        assertEquals("anotherTopic", captor.getValue().topic());
+        verifySendMessage("4", "anotherTopic");
+      
     }
 
     @Test(expected = CamelException.class)
@@ -116,19 +113,26 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         producer.process(exchange);
     }
-
+    
     @Test
+    
     public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
 
-        endpoint.setTopic("sometopic");
+        endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
 
+        verifySendMessage("4", "someTopic");
+        
+    }
+    
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessage(String key, String topic) {
         ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
         Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals("4", captor.getValue().key());
-        assertEquals("sometopic", captor.getValue().topic());
+        assertEquals(key, captor.getValue().key());
+        assertEquals(topic, captor.getValue().topic());
     }
 }