You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/06/08 09:50:02 UTC

camel git commit: Upgrade Kafka and related bundle to version 0.10.0.0

Repository: camel
Updated Branches:
  refs/heads/master 27664e8f6 -> 3938344aa


Upgrade Kafka and related bundle to version 0.10.0.0


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3938344a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3938344a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3938344a

Branch: refs/heads/master
Commit: 3938344aa682e8ba2f1766e272b8feb6bf91d682
Parents: 27664e8
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Jun 8 11:48:54 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Jun 8 11:48:54 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/kafka/KafkaConsumer.java     | 4 +++-
 .../camel/component/kafka/embedded/EmbeddedKafkaCluster.java     | 3 ++-
 parent/pom.xml                                                   | 4 ++--
 3 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 8649a46..a317b54 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;
@@ -113,7 +115,7 @@ public class KafkaConsumer extends DefaultConsumer {
                     LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
                     // This poll to ensures we have an assigned partition otherwise seek won't work
                     consumer.poll(100);
-                    consumer.seekToBeginning();
+                    consumer.seekToBeginning(consumer.assignment());
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index 42403c2..69de777 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Properties;
 
 import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.ZkUtils;
@@ -66,7 +67,7 @@ public class EmbeddedKafkaCluster {
 
     public void createTopics(String... topics) {
         for (String topic : topics) {
-            AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties());
+            AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 286d537..7224373 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -352,8 +352,8 @@
     <jython-version>2.5.3</jython-version>
     <jzlib-version>1.1.3</jzlib-version>
     <jzlib-bundle-version>1.1.3_2</jzlib-bundle-version>
-    <kafka-version>0.9.0.1</kafka-version>
-    <kafka-bundle-version>0.9.0.1_1</kafka-bundle-version>
+    <kafka-version>0.10.0.0</kafka-version>
+    <kafka-bundle-version>0.10.0.0_1</kafka-bundle-version>
     <karaf-version>2.4.4</karaf-version>
     <karaf3-version>3.0.6</karaf3-version>
     <karaf4-version>4.0.5</karaf4-version>