You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:56 UTC

[13/15] git commit: STREAMS-83 | Updated webhdfs provider with running method

STREAMS-83 | Updated webhdfs provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8d874a91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8d874a91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8d874a91

Branch: refs/heads/master
Commit: 8d874a913624bc9344109cf07cbaddbfc9510089
Parents: c9e80f5
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:42:18 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:42:18 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/hdfs/WebHdfsPersistReader.java  | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d874a91/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 7bc7f61..1baddc3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -67,6 +68,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
 
     protected DatumStatusCounter countersTotal = new DatumStatusCounter();
     protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private Future<?> task;
 
     public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
@@ -165,7 +167,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     @Override
     public void startStream() {
         LOGGER.debug("startStream");
-        executor.submit(new WebHdfsPersistReaderTask(this));
+        task = executor.submit(new WebHdfsPersistReaderTask(this));
     }
 
     @Override
@@ -196,6 +198,11 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     }
 
     @Override
+    public boolean isRunning() {
+        return !task.isDone() && !task.isCancelled();
+    }
+
+    @Override
     public DatumStatusCounter getDatumStatusCounter() {
         return countersTotal;
     }