You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/06/08 09:50:02 UTC
camel git commit: Upgrade Kafka and related bundle to version 0.10.0.0
Repository: camel
Updated Branches:
refs/heads/master 27664e8f6 -> 3938344aa
Upgrade Kafka and related bundle to version 0.10.0.0
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3938344a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3938344a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3938344a
Branch: refs/heads/master
Commit: 3938344aa682e8ba2f1766e272b8feb6bf91d682
Parents: 27664e8
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Jun 8 11:48:54 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Jun 8 11:48:54 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 4 +++-
.../camel/component/kafka/embedded/EmbeddedKafkaCluster.java | 3 ++-
parent/pom.xml | 4 ++--
3 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 8649a46..a317b54 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.kafka;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
@@ -113,7 +115,7 @@ public class KafkaConsumer extends DefaultConsumer {
LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
// This poll to ensures we have an assigned partition otherwise seek won't work
consumer.poll(100);
- consumer.seekToBeginning();
+ consumer.seekToBeginning(consumer.assignment());
}
while (isRunAllowed() && !isSuspendingOrSuspended()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index 42403c2..69de777 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Properties;
import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.ZkUtils;
@@ -66,7 +67,7 @@ public class EmbeddedKafkaCluster {
public void createTopics(String... topics) {
for (String topic : topics) {
- AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties());
+ AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 286d537..7224373 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -352,8 +352,8 @@
<jython-version>2.5.3</jython-version>
<jzlib-version>1.1.3</jzlib-version>
<jzlib-bundle-version>1.1.3_2</jzlib-bundle-version>
- <kafka-version>0.9.0.1</kafka-version>
- <kafka-bundle-version>0.9.0.1_1</kafka-bundle-version>
+ <kafka-version>0.10.0.0</kafka-version>
+ <kafka-bundle-version>0.10.0.0_1</kafka-bundle-version>
<karaf-version>2.4.4</karaf-version>
<karaf3-version>3.0.6</karaf3-version>
<karaf4-version>4.0.5</karaf4-version>