You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:27:38 UTC
[05/53] [abbrv] git commit: adding platform-level status counters
debugging data leak
adding platform-level status counters
debugging data leak
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab
Branch: refs/heads/master
Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7
Parents: adb43b2
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 15:21:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 15:21:14 2014 -0500
----------------------------------------------------------------------
.../streams/hdfs/WebHdfsPersistReader.java | 33 ++++++++++++--------
.../streams/hdfs/WebHdfsPersistReaderTask.java | 23 +++++++++-----
.../streams-provider-twitter/pom.xml | 2 +-
.../provider/TwitterStreamConfigurator.java | 2 ++
.../twitter/provider/TwitterStreamProvider.java | 22 ++++++++++---
.../com/twitter/TwitterStreamConfiguration.json | 22 +++++++++++++
.../src/main/resources/reference.conf | 3 +-
.../apache/streams/core/DatumStatusCounter.java | 25 ++++++++++-----
streams-runtimes/streams-runtime-local/pom.xml | 17 ++++++++++
.../local/builders/LocalStreamBuilder.java | 25 +++++++++++++--
.../streams/local/builders/StreamComponent.java | 24 +++++++++++---
.../streams/local/tasks/BaseStreamsTask.java | 1 +
.../tasks/LocalStreamProcessMonitorThread.java | 2 +-
.../local/tasks/StreamsProviderTask.java | 16 +++++-----
14 files changed, 170 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index b0d9904..3a6ff29 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -1,8 +1,6 @@
package org.apache.streams.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,7 +19,6 @@ import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by sblackmon on 2/28/14.
*/
-public class WebHdfsPersistReader implements StreamsPersistReader {
+public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable {
public final static String STREAMS_ID = "WebHdfsPersistReader";
@@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
private ExecutorService executor;
+ protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+ protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+
public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
}
@@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
} catch (IOException e) {
e.printStackTrace();
}
- persistQueue = new LinkedBlockingQueue<StreamsDatum>(10000);
+ persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
executor = Executors.newSingleThreadExecutor();
}
@@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
@Override
public StreamsResultSet readCurrent() {
- Collection<StreamsDatum> currentIterator = Lists.newArrayList();
- Iterators.addAll(currentIterator, persistQueue.iterator());
-
- StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+ StreamsResultSet current;
- persistQueue.clear();
+ synchronized( WebHdfsPersistReader.class ) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ current.setCounter(new DatumStatusCounter());
+ current.getCounter().add(countersCurrent);
+ countersTotal.add(countersCurrent);
+ countersCurrent = new DatumStatusCounter();
+ persistQueue.clear();
+ }
return current;
}
@@ -174,4 +177,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
return null;
}
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ return countersTotal;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 95a8ef6..b04350e 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -2,6 +2,7 @@ package org.apache.streams.hdfs;
import com.google.common.base.Strings;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,19 +41,16 @@ public class WebHdfsPersistReaderTask implements Runnable {
try {
line = bufferedReader.readLine();
if( !Strings.isNullOrEmpty(line) ) {
+ reader.countersCurrent.incrementAttempt();
String[] fields = line.split(Character.toString(reader.DELIMITER));
StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
- boolean success;
- do {
- success = reader.persistQueue.offer(entry);
- Thread.yield();
- }
- while( success == false );
-
+ write( entry );
+ reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn(e.getMessage());
+ reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
}
} while( !Strings.isNullOrEmpty(line) );
LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
@@ -67,4 +65,15 @@ public class WebHdfsPersistReaderTask implements Runnable {
}
+ private void write( StreamsDatum entry ) {
+ boolean success;
+ do {
+ synchronized( WebHdfsPersistReader.class ) {
+ success = reader.persistQueue.offer(entry);
+ }
+ Thread.yield();
+ }
+ while( !success );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 9a12bbc..aec397b 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -63,7 +63,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
- <version>1.4.2</version>
+ <version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 7bb7048..2ae8d59 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -45,6 +45,8 @@ public class TwitterStreamConfigurator {
twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
+ twitterStreamConfiguration.setWith(twitter.getString("with"));
+ twitterStreamConfiguration.setReplies(twitter.getString("replies"));
twitterStreamConfiguration.setJsonStoreEnabled("true");
twitterStreamConfiguration.setIncludeEntities("true");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index b0b4cf4..e9ce10e 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -13,6 +13,7 @@ import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.BasicAuth;
import com.twitter.hbc.httpclient.auth.OAuth1;
import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
@@ -143,10 +144,23 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
else
return;
- Authentication auth = new OAuth1(config.getOauth().getConsumerKey(),
- config.getOauth().getConsumerSecret(),
- config.getOauth().getAccessToken(),
- config.getOauth().getAccessTokenSecret());
+ Authentication auth;
+ if( config.getOauth() != null ) {
+ auth = new OAuth1(config.getOauth().getConsumerKey(),
+ config.getOauth().getConsumerSecret(),
+ config.getOauth().getAccessToken(),
+ config.getOauth().getAccessTokenSecret());
+ } else if( config.getBasicauth() != null ) {
+ auth = new BasicAuth(
+ config.getBasicauth().getUsername(),
+ config.getBasicauth().getPassword()
+ );
+ } else {
+ return;
+ }
+
+ endpoint.addPostParameter("with", config.getWith());
+ endpoint.addPostParameter("replies", config.getReplies());
client = new ClientBuilder()
.name("apache/streams/streams-contrib/streams-provider-twitter")
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
index 087f8fd..c1a0d0c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
@@ -38,6 +38,14 @@
"type": "string",
"description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
},
+ "with": {
+ "type": "string",
+ "description": "Typically following or user"
+ },
+ "replies": {
+ "type": "string",
+ "description": "Set to all, to see all @replies"
+ },
"follow": {
"type": "array",
"description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
@@ -74,6 +82,20 @@
"type": "string"
}
}
+ },
+ "basicauth": {
+ "type": "object",
+ "dynamic": "true",
+ "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "username": {
+ "type": "string"
+ },
+ "password": {
+ "type": "string"
+ }
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
index 49555fc..d437db8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
@@ -8,5 +8,6 @@ twitter {
oauth {
appName = "Apache Streams"
}
-
+ with = "user"
+ replies = "all"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 7798fcd..8730d73 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -2,40 +2,51 @@ package org.apache.streams.core;
public class DatumStatusCounter
{
+ private volatile int attempted = 0;
private volatile int success = 0;
private volatile int fail = 0;
private volatile int partial = 0;
- private volatile int recordsEmitted = 0;
+ private volatile int emitted = 0;
+ public int getAttempted() { return this.attempted; }
public int getSuccess() { return this.success; }
public int getFail() { return this.fail; }
public int getPartial() { return this.partial; }
- public int getEmitted() { return this.recordsEmitted; }
+ public int getEmitted() { return this.emitted; }
+
+ public DatumStatusCounter() {
+ }
public void add(DatumStatusCounter datumStatusCounter) {
+ this.attempted += datumStatusCounter.getAttempted();
this.success += datumStatusCounter.getSuccess();
this.partial = datumStatusCounter.getPartial();
this.fail += datumStatusCounter.getFail();
- this.recordsEmitted += datumStatusCounter.getEmitted();
+ this.emitted += datumStatusCounter.getEmitted();
+ }
+
+ public void incrementAttempt() {
+ this.attempted += 1;
}
- public void add(DatumStatus workStatus) {
+ public synchronized void incrementStatus(DatumStatus workStatus) {
// add this to the record counter
switch(workStatus) {
case SUCCESS: this.success++; break;
case PARTIAL: this.partial++; break;
case FAIL: this.fail++; break;
}
- this.recordsEmitted += 1;
+ this.emitted += 1;
}
@Override
public String toString() {
return "DatumStatusCounter{" +
- "success=" + success +
+ "attempted=" + attempted +
+ ", success=" + success +
", fail=" + fail +
", partial=" + partial +
- ", recordsEmitted=" + recordsEmitted +
+ ", emitted=" + emitted +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index fa64225..d9f6d51 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -32,6 +32,18 @@
<dependencies>
<dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
<version>0.1-SNAPSHOT</version>
@@ -41,6 +53,11 @@
<artifactId>streams-util</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 444c2e1..d570573 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -2,6 +2,7 @@ package org.apache.streams.local.builders;
import org.apache.streams.core.*;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
+import org.apache.streams.local.tasks.StatusCounterMonitorThread;
import org.apache.streams.local.tasks.StreamsProviderTask;
import org.apache.streams.local.tasks.StreamsTask;
import org.apache.streams.util.SerializationUtil;
@@ -28,6 +29,7 @@ public class LocalStreamBuilder implements StreamBuilder {
private ExecutorService executor;
private ExecutorService monitor;
private int totalTasks;
+ private int monitorTasks;
private LocalStreamProcessMonitorThread monitorThread;
/**
@@ -64,6 +66,7 @@ public class LocalStreamBuilder implements StreamBuilder {
this.components = new HashMap<String, StreamComponent>();
this.streamConfig = streamConfig;
this.totalTasks = 0;
+ this.monitorTasks = 0;
}
@Override
@@ -71,6 +74,8 @@ public class LocalStreamBuilder implements StreamBuilder {
validateId(id);
this.providers.put(id, new StreamComponent(id, provider, true));
++this.totalTasks;
+ if( provider instanceof DatumStatusCountable )
+ ++this.monitorTasks;
return this;
}
@@ -79,6 +84,8 @@ public class LocalStreamBuilder implements StreamBuilder {
validateId(id);
this.providers.put(id, new StreamComponent(id, provider, false));
++this.totalTasks;
+ if( provider instanceof DatumStatusCountable )
+ ++this.monitorTasks;
return this;
}
@@ -87,6 +94,8 @@ public class LocalStreamBuilder implements StreamBuilder {
validateId(id);
this.providers.put(id, new StreamComponent(id, provider, sequence));
++this.totalTasks;
+ if( provider instanceof DatumStatusCountable )
+ ++this.monitorTasks;
return this;
}
@@ -95,6 +104,8 @@ public class LocalStreamBuilder implements StreamBuilder {
validateId(id);
this.providers.put(id, new StreamComponent(id, provider, start, end));
++this.totalTasks;
+ if( provider instanceof DatumStatusCountable )
+ ++this.monitorTasks;
return this;
}
@@ -105,6 +116,8 @@ public class LocalStreamBuilder implements StreamBuilder {
this.components.put(id, comp);
connectToOtherComponents(inBoundIds, comp);
this.totalTasks += numTasks;
+ if( processor instanceof DatumStatusCountable )
+ ++this.monitorTasks;
return this;
}
@@ -115,6 +128,8 @@ public class LocalStreamBuilder implements StreamBuilder {
this.components.put(id, comp);
connectToOtherComponents(inBoundIds, comp);
this.totalTasks += numTasks;
+ if( writer instanceof DatumStatusCountable )
+ ++this.monitorTasks;
return this;
}
@@ -125,11 +140,11 @@ public class LocalStreamBuilder implements StreamBuilder {
public void start() {
boolean isRunning = true;
this.executor = Executors.newFixedThreadPool(this.totalTasks);
- this.monitor = Executors.newSingleThreadExecutor();
+ this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
- monitorThread = new LocalStreamProcessMonitorThread(this.monitor, 1000);
try {
+ monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
this.monitor.submit(monitorThread);
for(StreamComponent comp : this.components.values()) {
int tasks = comp.getNumTasks();
@@ -139,6 +154,8 @@ public class LocalStreamBuilder implements StreamBuilder {
task.setStreamConfig(this.streamConfig);
this.executor.submit(task);
compTasks.add(task);
+ if( comp.isOperationCountable() )
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)comp.getOperation(), 10));
}
streamsTasks.put(comp.getId(), compTasks);
}
@@ -147,6 +164,10 @@ public class LocalStreamBuilder implements StreamBuilder {
task.setStreamConfig(this.streamConfig);
this.executor.submit(task);
provTasks.put(prov.getId(), (StreamsProviderTask) task);
+ if( prov.isOperationCountable() ) {
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+ }
}
while(isRunning) {
isRunning = false;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index ecfb22d..6319ba8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -1,9 +1,6 @@
package org.apache.streams.local.builders;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.*;
import org.apache.streams.local.tasks.StreamsPersistWriterTask;
import org.apache.streams.local.tasks.StreamsProcessorTask;
import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -226,4 +223,23 @@ public class StreamComponent {
else
return false;
}
+
+ protected StreamsOperation getOperation() {
+ if(this.processor != null) {
+ return (StreamsOperation) this.processor;
+ }
+ else if(this.writer != null) {
+ return (StreamsOperation) this.writer;
+ }
+ else if(this.provider != null) {
+ return (StreamsOperation) this.provider;
+ }
+ else {
+ throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
+ }
+ }
+
+ protected boolean isOperationCountable() {
+ return getOperation() instanceof DatumStatusCountable;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 5f2620b..b9af0fd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -145,4 +145,5 @@ public abstract class BaseStreamsTask implements StreamsTask {
}
return this.inIndex;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
index 1325fd6..0b254b6 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
@@ -46,7 +46,7 @@ public class LocalStreamProcessMonitorThread implements Runnable
String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true);
- LOGGER.info("[monitor] Used Memory: {}, Max: {}",
+ LOGGER.debug("[monitor] Used Memory: {}, Max: {}",
usedMemory,
maxMemory);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index b4c929d..7b6792f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -1,9 +1,6 @@
package org.apache.streams.local.tasks;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,10 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
-public class StreamsProviderTask extends BaseStreamsTask {
+public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable {
private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ return this.statusCounter;
+ }
+
private static enum Type {
PERPETUAL,
READ_CURRENT,
@@ -41,7 +43,7 @@ public class StreamsProviderTask extends BaseStreamsTask {
private AtomicBoolean isRunning;
private int zeros = 0;
- private DatumStatusCounter statusCounter;
+ private DatumStatusCounter statusCounter = new DatumStatusCounter();
/**
* Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
@@ -119,7 +121,7 @@ public class StreamsProviderTask extends BaseStreamsTask {
zeros = 0;
if( resultSet.getCounter() != null ) {
LOGGER.debug(resultSet.getCounter().toString());
- statusCounter.add(resultSet.getCounter());
+ this.statusCounter.add(resultSet.getCounter());
}
}
flushResults(resultSet);