You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/26 18:43:34 UTC
[20/20] incubator-ignite git commit: ignite-428 Retry message
consuming in case of error
ignite-428 Retry message consuming in case of error
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ad5e99eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ad5e99eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ad5e99eb
Branch: refs/heads/ignite-428
Commit: ad5e99eb7678134ccae8af68db82f05a8b495b42
Parents: 6ca6a0c
Author: agura <ag...@gridgain.com>
Authored: Fri Jun 26 19:39:41 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jun 26 19:39:41 2015 +0300
----------------------------------------------------------------------
.../ignite/stream/kafka/KafkaStreamer.java | 37 ++++++++++++++++++--
1 file changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad5e99eb/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index 5761209..bc618e3 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -39,6 +39,9 @@ import java.util.concurrent.*;
* Example</a>
*/
public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Retry timeout. */
+ private static final int RETRY_TIMEOUT = 10000;
+
/** Logger. */
private IgniteLogger log;
@@ -63,6 +66,9 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
/** Kafka consumer connector. */
private ConsumerConnector consumer;
+ /** Stopped. */
+ private volatile boolean stopped;
+
/**
* Sets the topic name.
*
@@ -138,12 +144,37 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
// Now launch all the consumer threads.
executor = Executors.newFixedThreadPool(threads);
+ stopped = false;
+
// Now create an object to consume the messages.
for (final KafkaStream<K, V> stream : streams) {
executor.submit(new Runnable() {
@Override public void run() {
- for (MessageAndMetadata<K, V> messageAndMetadata : stream)
- getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message());
+ while (!stopped) {
+ try {
+ for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) {
+ MessageAndMetadata<K, V> msg = it.next();
+
+ try {
+ getStreamer().addData(msg.key(), msg.message());
+ }
+ catch (Exception e) {
+ log.warning("Message is ignored due to an error, msg = [" + msg + ']', e);
+ }
+ }
+ }
+ catch (Exception e) {
+ log.warning("Message can't be consumed from stream. Retry after " +
+ RETRY_TIMEOUT + " ms.", e);
+
+ try {
+ Thread.sleep(RETRY_TIMEOUT);
+ }
+ catch (InterruptedException ie) {
+ // No-op.
+ }
+ }
+ }
}
});
}
@@ -153,6 +184,8 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* Stops streamer.
*/
public void stop() {
+ stopped = true;
+
if (consumer != null)
consumer.shutdown();