You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/27 08:01:57 UTC

[1/8] git commit: Update to Kafka 0.8.1

Repository: camel
Updated Branches:
  refs/heads/master 8bfd1232b -> e135e2efc


Update to Kafka 0.8.1


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0fe4a3d7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0fe4a3d7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0fe4a3d7

Branch: refs/heads/master
Commit: 0fe4a3d7a1f15dea89ecd6cecd34d74a3fc93f6a
Parents: 06ffb5d
Author: Fabien Chaillou <fa...@gmail.com>
Authored: Wed Mar 26 14:11:41 2014 -0400
Committer: Fabien Chaillou <fa...@gmail.com>
Committed: Wed Mar 26 14:11:41 2014 -0400

----------------------------------------------------------------------
 .../org/apache/camel/component/kafka/KafkaEndpointTest.java    | 6 +++++-
 .../org/apache/camel/component/kafka/SimplePartitioner.java    | 4 ++--
 parent/pom.xml                                                 | 4 ++--
 3 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0fe4a3d7/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index 3a59e3e..6ac6f81 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -18,8 +18,10 @@ package org.apache.camel.component.kafka;
 
 import java.net.URISyntaxException;
 
+import kafka.message.Message;
 import kafka.message.MessageAndMetadata;
 
+import kafka.serializer.DefaultDecoder;
 import org.apache.camel.Exchange;
 import org.junit.Test;
 
@@ -32,8 +34,10 @@ public class KafkaEndpointTest {
     public void testCreatingKafkaExchangeSetsHeaders() throws URISyntaxException {
         KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent());
 
+        Message message = new Message("mymessage".getBytes(), "somekey".getBytes());
+        DefaultDecoder decoder = new DefaultDecoder(null);
         MessageAndMetadata<byte[], byte[]> mm =
-                new MessageAndMetadata<byte[], byte[]>("somekey".getBytes(), "mymessage".getBytes(), "topic", 4, 56);
+                new MessageAndMetadata<byte[], byte[]>("topic", 4, message, 56, decoder, decoder);
 
         Exchange exchange = endpoint.createKafkaExchange(mm);
         assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY));

http://git-wip-us.apache.org/repos/asf/camel/blob/0fe4a3d7/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
index 05eb1e5..039a2e7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-public class SimplePartitioner implements Partitioner<String> {
+public class SimplePartitioner implements Partitioner {
 
     public SimplePartitioner(VerifiableProperties props) {
     }
@@ -31,7 +31,7 @@ public class SimplePartitioner implements Partitioner<String> {
      * @return an integer between 0 and numPartitions-1
      */
     @Override
-    public int partition(String key, int numPartitions) {
+    public int partition(Object key, int numPartitions) {
         return key.hashCode() % numPartitions;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0fe4a3d7/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 61cdb05..a8cf7c5 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -253,8 +253,8 @@
     <junit-bundle-version>4.11_1</junit-bundle-version>
     <junit-version>4.11</junit-version>
     <jython-version>2.5.3</jython-version>
-    <kafka-version>0.8.0</kafka-version>
-    <kafka-bundle-version>0.8.0_1</kafka-bundle-version>
+    <kafka-version>0.8.1</kafka-version>
+    <kafka-bundle-version>0.8.1_1</kafka-bundle-version>
     <karaf-version>2.3.3</karaf-version>
     <kie-version>6.0.0.Final</kie-version>
     <krati-version>0.4.9</krati-version>


[2/8] git commit: Fix the KafkaConsumer to put the message in the body Right now, the consumer would create an exchange for each received message. However, it didn't filled the exchange body with the received message content.

Posted by da...@apache.org.
Fix the KafkaConsumer to put the message in the body
Right now, the consumer would create an exchange for each received message.
However, it didn't filled the exchange body with the received message content.

Right now, it is set as an array of bytes but in the future we could use the Consumer decoder class to convert the content in the right type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a2fd504a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a2fd504a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a2fd504a

Branch: refs/heads/master
Commit: a2fd504ac48a9f4759f59b7d2f028df1dc7087c4
Parents: 06ffb5d
Author: Fabien Chaillou <fa...@gmail.com>
Authored: Wed Mar 26 15:14:10 2014 -0400
Committer: Fabien Chaillou <fa...@gmail.com>
Committed: Wed Mar 26 15:14:10 2014 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 1 +
 .../test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a2fd504a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index b700850..f88e3d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -111,6 +111,7 @@ public class KafkaEndpoint extends DefaultEndpoint {
         message.setHeader(KafkaConstants.PARTITION, mm.partition());
         message.setHeader(KafkaConstants.TOPIC, mm.topic());
         message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        message.setBody(mm.message());
         exchange.setIn(message);
 
         return exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/a2fd504a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
index cb9be59..a8ca6c3 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -79,6 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport {
     @Test
     public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
         to.expectedMessageCount(5);
+        to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" );
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);


[6/8] git commit: Merge branch 'consumerFix' of https://github.com/fchaillou/camel

Posted by da...@apache.org.
Merge branch 'consumerFix' of https://github.com/fchaillou/camel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ab67a2e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ab67a2e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ab67a2e

Branch: refs/heads/master
Commit: 5ab67a2e88f8051a5289407a491886f9278fc782
Parents: 92013da a2fd504
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 27 07:59:27 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 27 07:59:27 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 1 +
 .../test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------



[5/8] git commit: Merge branch 'kafka0.8.1' of https://github.com/fchaillou/camel

Posted by da...@apache.org.
Merge branch 'kafka0.8.1' of https://github.com/fchaillou/camel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92013da5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92013da5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92013da5

Branch: refs/heads/master
Commit: 92013da5e361145b6d85b08440c1c20913cab585
Parents: 8bfd123 0fe4a3d
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 27 07:58:24 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 27 07:58:24 2014 +0100

----------------------------------------------------------------------
 .../org/apache/camel/component/kafka/KafkaEndpointTest.java    | 6 +++++-
 .../org/apache/camel/component/kafka/SimplePartitioner.java    | 4 ++--
 parent/pom.xml                                                 | 4 ++--
 3 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[8/8] git commit: Merge branch 'CAMEL-7326-2' of https://github.com/gzurowski/camel

Posted by da...@apache.org.
Merge branch 'CAMEL-7326-2' of https://github.com/gzurowski/camel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e135e2ef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e135e2ef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e135e2ef

Branch: refs/heads/master
Commit: e135e2efc4c58017fdd6aaa5969598a90565b934
Parents: 8914459 2d84828
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 27 08:04:49 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 27 08:04:49 2014 +0100

----------------------------------------------------------------------
 components/camel-hdfs2/pom.xml | 6 ++++++
 components/camel-solr/pom.xml  | 4 ++++
 2 files changed, 10 insertions(+)
----------------------------------------------------------------------



[7/8] git commit: #121: KafkaProducer: lookup the topic in the message header. Thanks to Fabien Chaillou for the patch. Fixed CS.

Posted by da...@apache.org.
#121: KafkaProducer: lookup the topic in the message header. Thanks to Fabien Chaillou for the patch. Fixed CS.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8914459f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8914459f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8914459f

Branch: refs/heads/master
Commit: 8914459ffc493b28ca512389075f200964ad6981
Parents: 5ab67a2
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 27 08:04:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 27 08:04:08 2014 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    |  8 ++-
 .../camel/component/kafka/KafkaConsumerIT.java  |  2 +-
 .../camel/component/kafka/KafkaProducerIT.java  | 75 +++++++++++++-------
 .../component/kafka/KafkaProducerTest.java      | 40 +++++++++++
 4 files changed, 98 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8914459f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 66440f3..6c2d167 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -69,9 +69,15 @@ public class KafkaProducer extends DefaultProducer {
         if (partitionKey == null) {
             throw new CamelExchangeException("No partition key set", exchange);
         }
+
+        String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+        if (topic == null) {
+            throw new CamelExchangeException("No topic key set", exchange);
+        }
+
         String msg = exchange.getIn().getBody(String.class);
 
-        KeyedMessage<String, String> data = new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg);
+        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
         producer.send(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8914459f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
index a8ca6c3..5a4baf7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -79,7 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport {
     @Test
     public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
         to.expectedMessageCount(5);
-        to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" );
+        to.expectedBodiesReceived("message-0", "message-1", "message-2", "message-3", "message-4");
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);

http://git-wip-us.apache.org/repos/asf/camel/blob/8914459f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 5805666..85fa272 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -17,13 +17,14 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -47,6 +48,7 @@ import org.junit.Test;
 public class KafkaProducerIT extends CamelTestSupport {
 
     public static final String TOPIC = "test";
+    public static final String TOPIC_IN_HEADER = "testHeader";
 
     @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner")
     private Endpoint to;
@@ -86,40 +88,63 @@ public class KafkaProducerIT extends CamelTestSupport {
 
     @Test
     public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
 
-        final List<String> messages = new ArrayList<String>();
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
         topicCountMap.put(TOPIC, 5);
+        topicCountMap.put(TOPIC_IN_HEADER, 5);
+        createKafkaMessageConsumer(messagesLatch, topicCountMap);
+
+        sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER);
+
+        boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics", allMessagesReceived);
+    }
+
+    private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
-        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
-
-        ExecutorService executor = Executors.newFixedThreadPool(5);
-        for (final KafkaStream stream : streams) {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
-                    while (it.hasNext()) {
-                        String msg = new String(it.next().message());
-                        messages.add(msg);
-                    }
-                }
-            });
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        for (final KafkaStream stream : consumerMap.get(TOPIC)) {
+            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+        }
+        for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) {
+            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+        }
+    }
+
+    private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) {
+        Map<String, Object> headerMap = new HashMap<String, Object>();
+        for (int i = 0; i < headersWithValue.length; i = i + 2) {
+            headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
         }
 
-        for (int k = 0; k < 10; k++) {
-            template.sendBodyAndHeader("IT test message", KafkaConstants.PARTITION_KEY, "1");
+        for (int k = 0; k < messageInOtherTopic; k++) {
+            template.sendBodyAndHeaders(bodyOther, headerMap);
         }
+    }
 
-        for (int k = 0; k < 20; k++) {
-            if (messages.size() == 10) {
-                return;
-            }
-            Thread.sleep(200);
+    private static class KakfaTopicConsumer implements Runnable {
+        private final KafkaStream stream;
+        private final CountDownLatch latch;
+
+        public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) {
+            this.stream = stream;
+            this.latch = latch;
         }
 
-        fail();
+        @Override
+        public void run() {
+            ConsumerIterator<byte[], byte[]> it = stream.iterator();
+            while (it.hasNext()) {
+                String msg = new String(it.next().message());
+                latch.countDown();
+            }
+        }
     }
 }
-

http://git-wip-us.apache.org/repos/asf/camel/blob/8914459f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index ccaaab5..acdfc60 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -70,6 +70,46 @@ public class KafkaProducerTest {
         Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
     }
 
+    @Test
+    public void processSendsMesssageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
+
+        endpoint.setTopic(null);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+
+        producer.process(exchange);
+
+        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
+        Mockito.verify(producer.producer).send(captor.capture());
+        assertEquals("4", captor.getValue().key());
+        assertEquals("anotherTopic", captor.getValue().topic());
+    }
+
+    @Test
+    public void processSendsMesssageWithTopicHeaderAndEndPoint() throws Exception {
+
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+
+        producer.process(exchange);
+
+        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
+        Mockito.verify(producer.producer).send(captor.capture());
+        assertEquals("4", captor.getValue().key());
+        assertEquals("anotherTopic", captor.getValue().topic());
+    }
+
+    @Test(expected = CamelException.class)
+    public void processRequiresTopicInEndpointOrInHeader() throws Exception {
+        endpoint.setTopic(null);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        producer.process(exchange);
+    }
+
     @Test(expected = CamelException.class)
     public void processRequiresPartitionHeader() throws Exception {
         endpoint.setTopic("sometopic");


[3/8] git commit: CAMEL-7326: Exclude unused dependency

Posted by da...@apache.org.
CAMEL-7326: Exclude unused dependency

Exclude unused dependency on jdk.tools (tools.jar) to avoid problems
when importing and working with Eclipse m2e integration.

Signed-off-by: Gregor Zurowski <gr...@zurowski.org>

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/942902c1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/942902c1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/942902c1

Branch: refs/heads/master
Commit: 942902c101c7309f86c914e65c1e9a05d95af24b
Parents: 06ffb5d
Author: Gregor Zurowski <gr...@zurowski.org>
Authored: Wed Mar 26 21:47:34 2014 -0400
Committer: Gregor Zurowski <gr...@zurowski.org>
Committed: Wed Mar 26 21:47:34 2014 -0400

----------------------------------------------------------------------
 components/camel-solr/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/942902c1/components/camel-solr/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-solr/pom.xml b/components/camel-solr/pom.xml
index 1995fd2..02aa439 100644
--- a/components/camel-solr/pom.xml
+++ b/components/camel-solr/pom.xml
@@ -81,6 +81,10 @@
           <groupId>rome</groupId>
           <artifactId>rome</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>


[4/8] git commit: CAMEL-7326: Exclude unused dependency

Posted by da...@apache.org.
CAMEL-7326: Exclude unused dependency

Exclude unused dependency on jdk.tools (tools.jar) to avoid problems
when importing and working with Eclipse m2e integration.

Signed-off-by: Gregor Zurowski <gr...@zurowski.org>

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d84828d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d84828d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d84828d

Branch: refs/heads/master
Commit: 2d84828d8f8089af93e64c0021e604dd6ca4b944
Parents: 942902c
Author: Gregor Zurowski <gr...@zurowski.org>
Authored: Wed Mar 26 23:27:58 2014 -0400
Committer: Gregor Zurowski <gr...@zurowski.org>
Committed: Wed Mar 26 23:27:58 2014 -0400

----------------------------------------------------------------------
 components/camel-hdfs2/pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2d84828d/components/camel-hdfs2/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/pom.xml b/components/camel-hdfs2/pom.xml
index 6dbb62c..8f78f99 100644
--- a/components/camel-hdfs2/pom.xml
+++ b/components/camel-hdfs2/pom.xml
@@ -60,6 +60,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop2-version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>