You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:27:38 UTC

[05/53] [abbrv] git commit: adding platform-level status counters debugging data leak

adding platform-level status counters
debugging data leak


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab

Branch: refs/heads/master
Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7
Parents: adb43b2
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 15:21:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 15:21:14 2014 -0500

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistReader.java      | 33 ++++++++++++--------
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 23 +++++++++-----
 .../streams-provider-twitter/pom.xml            |  2 +-
 .../provider/TwitterStreamConfigurator.java     |  2 ++
 .../twitter/provider/TwitterStreamProvider.java | 22 ++++++++++---
 .../com/twitter/TwitterStreamConfiguration.json | 22 +++++++++++++
 .../src/main/resources/reference.conf           |  3 +-
 .../apache/streams/core/DatumStatusCounter.java | 25 ++++++++++-----
 streams-runtimes/streams-runtime-local/pom.xml  | 17 ++++++++++
 .../local/builders/LocalStreamBuilder.java      | 25 +++++++++++++--
 .../streams/local/builders/StreamComponent.java | 24 +++++++++++---
 .../streams/local/tasks/BaseStreamsTask.java    |  1 +
 .../tasks/LocalStreamProcessMonitorThread.java  |  2 +-
 .../local/tasks/StreamsProviderTask.java        | 16 +++++-----
 14 files changed, 170 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index b0d9904..3a6ff29 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -1,8 +1,6 @@
 package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,7 +19,6 @@ import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * Created by sblackmon on 2/28/14.
  */
-public class WebHdfsPersistReader implements StreamsPersistReader {
+public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable {
 
     public final static String STREAMS_ID = "WebHdfsPersistReader";
 
@@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
 
     private ExecutorService executor;
 
+    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+
     public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
     }
@@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
         } catch (IOException e) {
             e.printStackTrace();
         }
-        persistQueue = new LinkedBlockingQueue<StreamsDatum>(10000);
+        persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+        //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
         executor = Executors.newSingleThreadExecutor();
     }
 
@@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
     @Override
     public StreamsResultSet readCurrent() {
 
-        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
-        Iterators.addAll(currentIterator, persistQueue.iterator());
-
-        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+        StreamsResultSet current;
 
-        persistQueue.clear();
+        synchronized( WebHdfsPersistReader.class ) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            persistQueue.clear();
+        }
 
         return current;
     }
@@ -174,4 +177,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
         return null;
     }
 
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return countersTotal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 95a8ef6..b04350e 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -2,6 +2,7 @@ package org.apache.streams.hdfs;
 
 import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,19 +41,16 @@ public class WebHdfsPersistReaderTask implements Runnable {
                     try {
                         line = bufferedReader.readLine();
                         if( !Strings.isNullOrEmpty(line) ) {
+                            reader.countersCurrent.incrementAttempt();
                             String[] fields = line.split(Character.toString(reader.DELIMITER));
                             StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
-                            boolean success;
-                            do {
-                                success = reader.persistQueue.offer(entry);
-                                Thread.yield();
-                            }
-                            while( success == false );
-
+                            write( entry );
+                            reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                         }
                     } catch (Exception e) {
                         e.printStackTrace();
                         LOGGER.warn(e.getMessage());
+                        reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
                     }
                 } while( !Strings.isNullOrEmpty(line) );
                 LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
@@ -67,4 +65,15 @@ public class WebHdfsPersistReaderTask implements Runnable {
 
     }
 
+    private void write( StreamsDatum entry ) {
+        boolean success;
+        do {
+            synchronized( WebHdfsPersistReader.class ) {
+                success = reader.persistQueue.offer(entry);
+            }
+            Thread.yield();
+        }
+        while( !success );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 9a12bbc..aec397b 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -63,7 +63,7 @@
         <dependency>
             <groupId>com.twitter</groupId>
             <artifactId>hbc-core</artifactId>
-            <version>1.4.2</version>
+            <version>2.0.0</version>
         </dependency>
         <dependency>
             <groupId>org.twitter4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 7bb7048..2ae8d59 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -45,6 +45,8 @@ public class TwitterStreamConfigurator {
 
         twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
         twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
+        twitterStreamConfiguration.setWith(twitter.getString("with"));
+        twitterStreamConfiguration.setReplies(twitter.getString("replies"));
         twitterStreamConfiguration.setJsonStoreEnabled("true");
         twitterStreamConfiguration.setIncludeEntities("true");
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index b0b4cf4..e9ce10e 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -13,6 +13,7 @@ import com.twitter.hbc.core.endpoint.StreamingEndpoint;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
 import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.BasicAuth;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
@@ -143,10 +144,23 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
         else
             return;
 
-        Authentication auth = new OAuth1(config.getOauth().getConsumerKey(),
-                config.getOauth().getConsumerSecret(),
-                config.getOauth().getAccessToken(),
-                config.getOauth().getAccessTokenSecret());
+        Authentication auth;
+        if( config.getOauth() != null ) {
+            auth = new OAuth1(config.getOauth().getConsumerKey(),
+                    config.getOauth().getConsumerSecret(),
+                    config.getOauth().getAccessToken(),
+                    config.getOauth().getAccessTokenSecret());
+        } else if( config.getBasicauth() != null ) {
+            auth = new BasicAuth(
+                    config.getBasicauth().getUsername(),
+                    config.getBasicauth().getPassword()
+            );
+        } else {
+            return;
+        }
+
+        endpoint.addPostParameter("with", config.getWith());
+        endpoint.addPostParameter("replies", config.getReplies());
 
         client = new ClientBuilder()
                 .name("apache/streams/streams-contrib/streams-provider-twitter")

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
index 087f8fd..c1a0d0c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
@@ -38,6 +38,14 @@
             "type": "string",
             "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
         },
+        "with": {
+            "type": "string",
+            "description": "Typically following or user"
+        },
+        "replies": {
+            "type": "string",
+            "description": "Set to all, to see all @replies"
+        },
         "follow": {
             "type": "array",
             "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
@@ -74,6 +82,20 @@
                     "type": "string"
                 }
             }
+        },
+        "basicauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "username": {
+                    "type": "string"
+                },
+                "password": {
+                    "type": "string"
+                }
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
index 49555fc..d437db8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
@@ -8,5 +8,6 @@ twitter {
     oauth {
         appName = "Apache Streams"
     }
-
+    with = "user"
+    replies = "all"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 7798fcd..8730d73 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -2,40 +2,51 @@ package org.apache.streams.core;
 
 public class DatumStatusCounter
 {
+    private volatile int attempted = 0;
     private volatile int success = 0;
     private volatile int fail = 0;
     private volatile int partial = 0;
-    private volatile int recordsEmitted = 0;
+    private volatile int emitted = 0;
 
+    public int getAttempted()             { return this.attempted; }
     public int getSuccess()             { return this.success; }
     public int getFail()                { return  this.fail; }
     public int getPartial()             { return this.partial; }
-    public int getEmitted()             { return this.recordsEmitted; }
+    public int getEmitted()             { return this.emitted; }
+
+    public DatumStatusCounter() {
+    }
 
     public void add(DatumStatusCounter datumStatusCounter) {
+        this.attempted += datumStatusCounter.getAttempted();
         this.success += datumStatusCounter.getSuccess();
         this.partial = datumStatusCounter.getPartial();
         this.fail += datumStatusCounter.getFail();
-        this.recordsEmitted += datumStatusCounter.getEmitted();
+        this.emitted += datumStatusCounter.getEmitted();
+    }
+
+    public void incrementAttempt() {
+        this.attempted += 1;
     }
 
-    public void add(DatumStatus workStatus) {
+    public synchronized void incrementStatus(DatumStatus workStatus) {
         // add this to the record counter
         switch(workStatus) {
             case SUCCESS: this.success++; break;
             case PARTIAL: this.partial++; break;
             case FAIL: this.fail++; break;
         }
-        this.recordsEmitted += 1;
+        this.emitted += 1;
     }
 
     @Override
     public String toString() {
         return "DatumStatusCounter{" +
-                "success=" + success +
+                "attempted=" + attempted +
+                ", success=" + success +
                 ", fail=" + fail +
                 ", partial=" + partial +
-                ", recordsEmitted=" + recordsEmitted +
+                ", emitted=" + emitted +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index fa64225..d9f6d51 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -32,6 +32,18 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
             <version>0.1-SNAPSHOT</version>
@@ -41,6 +53,11 @@
             <artifactId>streams-util</artifactId>
             <version>0.1-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 444c2e1..d570573 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -2,6 +2,7 @@ package org.apache.streams.local.builders;
 
 import org.apache.streams.core.*;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
+import org.apache.streams.local.tasks.StatusCounterMonitorThread;
 import org.apache.streams.local.tasks.StreamsProviderTask;
 import org.apache.streams.local.tasks.StreamsTask;
 import org.apache.streams.util.SerializationUtil;
@@ -28,6 +29,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private ExecutorService executor;
     private ExecutorService monitor;
     private int totalTasks;
+    private int monitorTasks;
     private LocalStreamProcessMonitorThread monitorThread;
 
     /**
@@ -64,6 +66,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components = new HashMap<String, StreamComponent>();
         this.streamConfig = streamConfig;
         this.totalTasks = 0;
+        this.monitorTasks = 0;
     }
 
     @Override
@@ -71,6 +74,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, true));
         ++this.totalTasks;
+        if( provider instanceof DatumStatusCountable )
+            ++this.monitorTasks;
         return this;
     }
 
@@ -79,6 +84,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, false));
         ++this.totalTasks;
+        if( provider instanceof DatumStatusCountable )
+            ++this.monitorTasks;
         return this;
     }
 
@@ -87,6 +94,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, sequence));
         ++this.totalTasks;
+        if( provider instanceof DatumStatusCountable )
+            ++this.monitorTasks;
         return this;
     }
 
@@ -95,6 +104,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, start, end));
         ++this.totalTasks;
+        if( provider instanceof DatumStatusCountable )
+            ++this.monitorTasks;
         return this;
     }
 
@@ -105,6 +116,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
+        if( processor instanceof DatumStatusCountable )
+            ++this.monitorTasks;
         return this;
     }
 
@@ -115,6 +128,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
+        if( writer instanceof DatumStatusCountable )
+            ++this.monitorTasks;
         return this;
     }
 
@@ -125,11 +140,11 @@ public class LocalStreamBuilder implements StreamBuilder {
     public void start() {
         boolean isRunning = true;
         this.executor = Executors.newFixedThreadPool(this.totalTasks);
-        this.monitor = Executors.newSingleThreadExecutor();
+        this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
         Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
-        monitorThread = new LocalStreamProcessMonitorThread(this.monitor, 1000);
         try {
+            monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
             this.monitor.submit(monitorThread);
             for(StreamComponent comp : this.components.values()) {
                 int tasks = comp.getNumTasks();
@@ -139,6 +154,8 @@ public class LocalStreamBuilder implements StreamBuilder {
                     task.setStreamConfig(this.streamConfig);
                     this.executor.submit(task);
                     compTasks.add(task);
+                    if( comp.isOperationCountable() )
+                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)comp.getOperation(), 10));
                 }
                 streamsTasks.put(comp.getId(), compTasks);
             }
@@ -147,6 +164,10 @@ public class LocalStreamBuilder implements StreamBuilder {
                 task.setStreamConfig(this.streamConfig);
                 this.executor.submit(task);
                 provTasks.put(prov.getId(), (StreamsProviderTask) task);
+                if( prov.isOperationCountable() ) {
+                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
+                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+                }
             }
             while(isRunning) {
                 isRunning = false;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index ecfb22d..6319ba8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -1,9 +1,6 @@
 package org.apache.streams.local.builders;
 
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.*;
 import org.apache.streams.local.tasks.StreamsPersistWriterTask;
 import org.apache.streams.local.tasks.StreamsProcessorTask;
 import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -226,4 +223,23 @@ public class StreamComponent {
         else
             return false;
     }
+
+    protected StreamsOperation getOperation() {
+        if(this.processor != null) {
+            return (StreamsOperation) this.processor;
+        }
+        else if(this.writer != null) {
+            return (StreamsOperation) this.writer;
+        }
+        else if(this.provider != null) {
+            return (StreamsOperation) this.provider;
+        }
+        else {
+            throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
+        }
+    }
+
+    protected boolean isOperationCountable() {
+        return getOperation() instanceof DatumStatusCountable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 5f2620b..b9af0fd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -145,4 +145,5 @@ public abstract class BaseStreamsTask implements StreamsTask {
         }
         return this.inIndex;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
index 1325fd6..0b254b6 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
@@ -46,7 +46,7 @@ public class LocalStreamProcessMonitorThread implements Runnable
 
             String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true);
 
-            LOGGER.info("[monitor] Used Memory: {}, Max: {}",
+            LOGGER.debug("[monitor] Used Memory: {}, Max: {}",
                     usedMemory,
                     maxMemory);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index b4c929d..7b6792f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -1,9 +1,6 @@
 package org.apache.streams.local.tasks;
 
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -16,10 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  *
  */
-public class StreamsProviderTask extends BaseStreamsTask {
+public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
 
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return this.statusCounter;
+    }
+
     private static enum Type {
         PERPETUAL,
         READ_CURRENT,
@@ -41,7 +43,7 @@ public class StreamsProviderTask extends BaseStreamsTask {
     private AtomicBoolean isRunning;
 
     private int zeros = 0;
-    private DatumStatusCounter statusCounter;
+    private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
     /**
      * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
@@ -119,7 +121,7 @@ public class StreamsProviderTask extends BaseStreamsTask {
                                 zeros = 0;
                                 if( resultSet.getCounter() != null ) {
                                     LOGGER.debug(resultSet.getCounter().toString());
-                                    statusCounter.add(resultSet.getCounter());
+                                    this.statusCounter.add(resultSet.getCounter());
                                 }
                             }
                             flushResults(resultSet);