You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:56 UTC
[13/15] git commit: STREAMS-83 | Updated webhdfs provider with
running method
STREAMS-83 | Updated webhdfs provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8d874a91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8d874a91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8d874a91
Branch: refs/heads/master
Commit: 8d874a913624bc9344109cf07cbaddbfc9510089
Parents: c9e80f5
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:42:18 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:42:18 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/streams/hdfs/WebHdfsPersistReader.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d874a91/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 7bc7f61..1baddc3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -67,6 +68,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
protected DatumStatusCounter countersTotal = new DatumStatusCounter();
protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private Future<?> task;
public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
@@ -165,7 +167,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public void startStream() {
LOGGER.debug("startStream");
- executor.submit(new WebHdfsPersistReaderTask(this));
+ task = executor.submit(new WebHdfsPersistReaderTask(this));
}
@Override
@@ -196,6 +198,11 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
}
@Override
+ public boolean isRunning() {
+ return !task.isDone() && !task.isCancelled();
+ }
+
+ @Override
public DatumStatusCounter getDatumStatusCounter() {
return countersTotal;
}