You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/08/07 16:49:38 UTC
[1/4] camel git commit: Polished
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 990c6b579 -> 654db0d39
refs/heads/master 91ff7d0b6 -> 805db35b4
Polished
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e09c51d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e09c51d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e09c51d2
Branch: refs/heads/master
Commit: e09c51d2d9d7a1d71bde9a5a626f3e86153748f2
Parents: 91ff7d0
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:39:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:39:42 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e09c51d2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1657fa7..cb2dc9d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -121,11 +121,11 @@ public class KafkaConsumer extends DefaultConsumer {
class BatchingConsumerTask implements Runnable {
private KafkaStream<byte[], byte[]> stream;
- private CyclicBarrier berrier;
+ private CyclicBarrier barrier;
- public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier berrier) {
+ public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) {
this.stream = stream;
- this.berrier = berrier;
+ this.barrier = barrier;
}
public void run() {
@@ -160,7 +160,7 @@ public class KafkaConsumer extends DefaultConsumer {
if (processed >= endpoint.getBatchSize() || consumerTimeout
|| (processed > 0 && !hasNext)) { // Need to commit the offset for the last round
try {
- berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
+ barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
if (!consumerTimeout) {
processed = 0;
}
[3/4] camel git commit: Polished
Posted by da...@apache.org.
Polished
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38f9d3bc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38f9d3bc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38f9d3bc
Branch: refs/heads/camel-2.15.x
Commit: 38f9d3bc8955fcee6010d12e093b63461b1ed504
Parents: 990c6b5
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:39:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:57:07 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/38f9d3bc/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1657fa7..cb2dc9d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -121,11 +121,11 @@ public class KafkaConsumer extends DefaultConsumer {
class BatchingConsumerTask implements Runnable {
private KafkaStream<byte[], byte[]> stream;
- private CyclicBarrier berrier;
+ private CyclicBarrier barrier;
- public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier berrier) {
+ public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) {
this.stream = stream;
- this.berrier = berrier;
+ this.barrier = barrier;
}
public void run() {
@@ -160,7 +160,7 @@ public class KafkaConsumer extends DefaultConsumer {
if (processed >= endpoint.getBatchSize() || consumerTimeout
|| (processed > 0 && !hasNext)) { // Need to commit the offset for the last round
try {
- berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
+ barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
if (!consumerTimeout) {
processed = 0;
}
[4/4] camel git commit: CAMEL-8653: camel-kafka consumer should
commit offset on consumer when its no longer running such as
suspending/shutting down.
Posted by da...@apache.org.
CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/654db0d3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/654db0d3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/654db0d3
Branch: refs/heads/camel-2.15.x
Commit: 654db0d39c755101afe81e0169c386b7f4f029c3
Parents: 38f9d3b
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:56:36 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:57:13 2015 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 24 ++++++++++++++++----
1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/654db0d3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index cb2dc9d..94893fb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import kafka.message.MessageAndMetadata;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public class KafkaConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
log.info("Starting Kafka consumer");
+
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConsumersCount(); i++) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
@@ -79,8 +81,10 @@ public class KafkaConsumer extends DefaultConsumer {
topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
+
+ // commit periodically
if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
- if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0)
+ if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
&& endpoint.getConsumerStreams() > 1) {
LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
}
@@ -90,8 +94,9 @@ public class KafkaConsumer extends DefaultConsumer {
}
consumerBarriers.put(consumer, barrier);
} else {
+ // auto commit
for (final KafkaStream<byte[], byte[]> stream : streams) {
- executor.submit(new AutoCommitConsumerTask(stream));
+ executor.submit(new AutoCommitConsumerTask(consumer, stream));
}
consumerBarriers.put(consumer, null);
}
@@ -103,11 +108,14 @@ public class KafkaConsumer extends DefaultConsumer {
protected void doStop() throws Exception {
super.doStop();
log.info("Stopping Kafka consumer");
+
for (ConsumerConnector consumer : consumerBarriers.keySet()) {
if (consumer != null) {
consumer.shutdown();
}
}
+ consumerBarriers.clear();
+
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -175,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer {
class CommitOffsetTask implements Runnable {
- private ConsumerConnector consumer;
+ private final ConsumerConnector consumer;
public CommitOffsetTask(ConsumerConnector consumer) {
this.consumer = consumer;
@@ -183,22 +191,25 @@ public class KafkaConsumer extends DefaultConsumer {
@Override
public void run() {
+ LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
consumer.commitOffsets();
}
}
class AutoCommitConsumerTask implements Runnable {
+ private final ConsumerConnector consumer;
private KafkaStream<byte[], byte[]> stream;
- public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
+ public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) {
+ this.consumer = consumer;
this.stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// only poll the next message if we are allowed to run and are not suspending
- while (isRunAllowed() && it.hasNext()) {
+ while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mm = it.next();
Exchange exchange = endpoint.createKafkaExchange(mm);
try {
@@ -207,6 +218,9 @@ public class KafkaConsumer extends DefaultConsumer {
getExceptionHandler().handleException("Error during processing", exchange, e);
}
}
+ // no more data so commit offset
+ LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
+ consumer.commitOffsets();
}
}
}
[2/4] camel git commit: CAMEL-8653: camel-kafka consumer should
commit offset on consumer when its no longer running such as
suspending/shutting down.
Posted by da...@apache.org.
CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/805db35b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/805db35b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/805db35b
Branch: refs/heads/master
Commit: 805db35b482b0f3b65df80906e1d42dcea1b3c9e
Parents: e09c51d
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 7 16:56:36 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 16:56:36 2015 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 24 ++++++++++++++++----
1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/805db35b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index cb2dc9d..94893fb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import kafka.message.MessageAndMetadata;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public class KafkaConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
log.info("Starting Kafka consumer");
+
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConsumersCount(); i++) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
@@ -79,8 +81,10 @@ public class KafkaConsumer extends DefaultConsumer {
topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
+
+ // commit periodically
if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
- if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0)
+ if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
&& endpoint.getConsumerStreams() > 1) {
LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
}
@@ -90,8 +94,9 @@ public class KafkaConsumer extends DefaultConsumer {
}
consumerBarriers.put(consumer, barrier);
} else {
+ // auto commit
for (final KafkaStream<byte[], byte[]> stream : streams) {
- executor.submit(new AutoCommitConsumerTask(stream));
+ executor.submit(new AutoCommitConsumerTask(consumer, stream));
}
consumerBarriers.put(consumer, null);
}
@@ -103,11 +108,14 @@ public class KafkaConsumer extends DefaultConsumer {
protected void doStop() throws Exception {
super.doStop();
log.info("Stopping Kafka consumer");
+
for (ConsumerConnector consumer : consumerBarriers.keySet()) {
if (consumer != null) {
consumer.shutdown();
}
}
+ consumerBarriers.clear();
+
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -175,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer {
class CommitOffsetTask implements Runnable {
- private ConsumerConnector consumer;
+ private final ConsumerConnector consumer;
public CommitOffsetTask(ConsumerConnector consumer) {
this.consumer = consumer;
@@ -183,22 +191,25 @@ public class KafkaConsumer extends DefaultConsumer {
@Override
public void run() {
+ LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
consumer.commitOffsets();
}
}
class AutoCommitConsumerTask implements Runnable {
+ private final ConsumerConnector consumer;
private KafkaStream<byte[], byte[]> stream;
- public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
+ public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) {
+ this.consumer = consumer;
this.stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// only poll the next message if we are allowed to run and are not suspending
- while (isRunAllowed() && it.hasNext()) {
+ while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mm = it.next();
Exchange exchange = endpoint.createKafkaExchange(mm);
try {
@@ -207,6 +218,9 @@ public class KafkaConsumer extends DefaultConsumer {
getExceptionHandler().handleException("Error during processing", exchange, e);
}
}
+ // no more data so commit offset
+ LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
+ consumer.commitOffsets();
}
}
}