You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:55 UTC

[12/15] git commit: STREAMS-83 | Updated kafka provider with running method

STREAMS-83 | Updated kafka provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c9e80f5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c9e80f5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c9e80f5d

Branch: refs/heads/master
Commit: c9e80f5de11579fe9356aa2bb60a8b6aefdc483b
Parents: b206852
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:37:20 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:37:20 2014 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/streams/kafka/KafkaPersistReader.java  | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c9e80f5d/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index fd49b1d..a7810b1 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -121,6 +121,11 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
         return null;
     }
 
+    @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
     private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
         Properties props = new Properties();
         props.put("zookeeper.connect", a_zookeeper);