You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/13 06:10:07 UTC

[40/43] incubator-streams git commit: STREAMS-210 | Removed accidental changes to runtime files

STREAMS-210 | Removed accidental changes to runtime files


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/48ad6009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/48ad6009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/48ad6009

Branch: refs/heads/master
Commit: 48ad60093dfbbc528e4b06f80c0ad35c9e977b92
Parents: 743f3bd
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Mon Nov 10 09:42:07 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Mon Nov 10 09:42:07 2014 -0600

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 16 +---------
 .../streams/local/tasks/StreamsMergeTask.java   |  7 +++++
 .../local/tasks/StreamsPersistWriterTask.java   | 18 ++++++++---
 .../local/tasks/StreamsProcessorTask.java       | 20 +++++++++---
 .../local/tasks/StreamsProviderTask.java        | 15 +++++++++
 .../apache/streams/local/tasks/StreamsTask.java |  4 +++
 .../local/builders/LocalStreamBuilderTest.java  | 32 +++++++++++++++++---
 .../streams/local/tasks/BasicTasksTest.java     | 26 +++++++++++++---
 8 files changed, 106 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index d3fc71e..de1add3 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * Only the necessary methods for the local streams runtime are implemented.  All other methods throw a
  * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
  */
-public class ThroughputQueue<E> extends NotificationBroadcasterSupport implements BlockingQueue<E>, ThroughputQueueMXBean {
+public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean {
 
     public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s";
 
@@ -105,14 +105,6 @@ public class ThroughputQueue<E> extends NotificationBroadcasterSupport implement
                 ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
                 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
                 mbs.registerMBean(this, name);
-
-                /*addNotificationListener(new NotificationListener() {
-                    @Override
-                    public void handleNotification(Notification notification, Object handback) {
-                        LOGGER.debug("Notification!");
-                    }
-                }, null, null);*/
-
             } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
                 LOGGER.error("Failed to register MXBean : {}", e);
                 throw new RuntimeException(e);
@@ -124,10 +116,6 @@ public class ThroughputQueue<E> extends NotificationBroadcasterSupport implement
     public boolean add(E e) {
         if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
             internalAddElement();
-
-            Notification n = new AttributeChangeNotification(this, 1, System.currentTimeMillis(), "Added element to queue", "Added", "String", null, e);
-            sendNotification(n);
-
             return true;
         }
         return false;
@@ -151,8 +139,6 @@ public class ThroughputQueue<E> extends NotificationBroadcasterSupport implement
     @Override
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
         if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
-            Notification n = new AttributeChangeNotification(this, 1, System.currentTimeMillis(), "Added element to queue", "Added", "String", null, e);
-            sendNotification(n);
             internalAddElement();
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 7a4c806..8280f29 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -19,6 +19,8 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,4 +80,9 @@ public class StreamsMergeTask extends BaseStreamsTask {
             }
         }
     }
+
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        throw new NotImplementedException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index cab46b8..003ab9e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -20,13 +20,11 @@ package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.*;
 import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +43,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     private BlockingQueue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
     private AtomicBoolean blocked;
+    private StreamsTaskCounter counter;
 
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
@@ -99,6 +98,9 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     public void run() {
         try {
             this.writer.prepare(this.streamConfig);
+            if(this.counter == null) {
+                this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString());
+            }
             while(this.keepRunning.get()) {
                 StreamsDatum datum = null;
                 try {
@@ -111,14 +113,18 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
                     Thread.currentThread().interrupt();
                 }
                 if(datum != null) {
+                    this.counter.incrementReceivedCount();
                     try {
+                        long startTime = System.currentTimeMillis();
                         this.writer.write(datum);
+                        this.counter.addTime(System.currentTimeMillis() - startTime);
                         statusCounter.incrementStatus(DatumStatus.SUCCESS);
                     } catch (Exception e) {
                         LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
                         this.keepRunning.set(false); // why do we shutdown on a failed write ?
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                         DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
+                        this.counter.incrementErrorCount();
                     }
                 } else { //datums should never be null
                     LOGGER.debug("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
@@ -151,4 +157,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
         return queues;
     }
 
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        this.counter = counter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 1bb565d..b6ab498 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -21,13 +21,11 @@ package org.apache.streams.local.tasks;
 import com.google.common.collect.Maps;
 import org.apache.streams.core.*;
 import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,6 +45,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
     private BlockingQueue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
     private AtomicBoolean blocked;
+    private StreamsTaskCounter counter;
 
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
@@ -105,6 +104,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
     public void run() {
         try {
             this.processor.prepare(this.streamConfig);
+            if(this.counter == null) {
+                this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString());
+            }
             while(this.keepRunning.get()) {
                 StreamsDatum datum = null;
                 try {
@@ -117,11 +119,15 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
                     Thread.currentThread().interrupt();
                 }
                 if(datum != null) {
+                    this.counter.incrementReceivedCount();
                     try {
+                        long startTime = System.currentTimeMillis();
                         List<StreamsDatum> output = this.processor.process(datum);
+                        this.counter.addTime(System.currentTimeMillis() - startTime);
                         if(output != null) {
                             for(StreamsDatum outDatum : output) {
                                 super.addToOutgoingQueue(outDatum);
+                                this.counter.incrementEmittedCount();
                                 statusCounter.incrementStatus(DatumStatus.SUCCESS);
                             }
                         }
@@ -130,6 +136,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
                         this.keepRunning.set(false);
                         Thread.currentThread().interrupt();
                     } catch (Throwable t) {
+                        this.counter.incrementErrorCount();
                         LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t.getMessage());
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                         //Add the error to the metadata, but keep processing
@@ -151,4 +158,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
         queues.add(this.inQueue);
         return queues;
     }
+
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        this.counter = counter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index c16f64d..2475780 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.*;
 import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.math.BigInteger;
 import java.util.Map;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -64,6 +66,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     private long sleepTime;
     private int zeros = 0;
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
+    private StreamsTaskCounter counter;
 
     /**
      * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
@@ -145,13 +148,18 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             StreamsResultSet resultSet = null;
             //Negative values mean we want to run forever
             long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
+            if(this.counter == null) { //should never be null
+                this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString());
+            }
             switch(this.type) {
                 case PERPETUAL: {
                     provider.startStream();
                     this.started.set(true);
                     while(this.isRunning()) {
                         try {
+                            long startTime = System.currentTimeMillis();
                             resultSet = provider.readCurrent();
+                            this.counter.addTime(System.currentTimeMillis() - startTime);
                             if( resultSet.size() == 0 )
                                 zeros++;
                             else {
@@ -164,6 +172,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                             if(zeros > 0)
                                 Thread.sleep(sleepTime);
                         } catch (InterruptedException e) {
+                            this.counter.incrementErrorCount();
                             LOGGER.warn("Thread interrupted");
                             this.keepRunning.set(false);
                         }
@@ -219,8 +228,10 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             if(datum != null) {
                 try {
                     super.addToOutgoingQueue(datum);
+                    this.counter.incrementEmittedCount();
                     statusCounter.incrementStatus(DatumStatus.SUCCESS);
                 } catch( Exception e ) {
+                    this.counter.incrementErrorCount();
                     statusCounter.incrementStatus(DatumStatus.FAIL);
                     DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
                 }
@@ -229,4 +240,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.flushing.set(false);
     }
 
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        this.counter = counter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 7513631..8423095 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -19,6 +19,7 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 
 import java.util.List;
 import java.util.Map;
@@ -87,4 +88,7 @@ public interface StreamsTask extends Runnable{
      */
     public List<BlockingQueue<StreamsDatum>> getOutputQueues();
 
+
+    public void setStreamsTaskCounter(StreamsTaskCounter counter);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 6513032..a675d87 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -43,6 +43,7 @@ import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.local.test.processors.SlowProcessor;
@@ -93,10 +94,17 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
                 //No-op
             }
+            try {
+                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id))));
+            } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+                //No-op
+            }
         }
     }
 
 
+
+
     @Test
     public void testStreamIdValidations() {
         StreamBuilder builder = new LocalStreamBuilder();
@@ -116,7 +124,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             exp = e;
         }
         assertNotNull(exp);
-        removeRegisteredMBeans("1", "2");
+        removeRegisteredMBeans("1", "2", "id");
     }
 
     @Test
@@ -172,9 +180,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             }
         } finally {
             for(int i=0; i < numProcessors; ++i) {
-                removeRegisteredMBeans(processorId+i);
+                removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
             }
-            removeRegisteredMBeans("writer");
+            removeRegisteredMBeans("writer", "numeric_provider");
         }
     }
 
@@ -211,7 +219,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             for(int i=0; i < numProcessors; ++i) {
                 removeRegisteredMBeans(processorId+i);
             }
-            removeRegisteredMBeans("writer");
+            removeRegisteredMBeans("writer", "numeric_provider");
         }
     }
 
@@ -233,7 +241,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
             assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
         } finally {
-            removeRegisteredMBeans("proc1", "proc2", "writer1");
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+            removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
         }
     }
 
@@ -251,6 +261,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
             assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
         }
     }
@@ -269,6 +282,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             builder.start();
             assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "w1");
         }
     }
@@ -290,6 +306,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
             assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
         }
     }
@@ -317,6 +336,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             service.awaitTermination(30000, TimeUnit.MILLISECONDS);
             assertThat(Thread.activeCount(), is(equalTo(before)));
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "w1");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index 2d28602..a0e28cd 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -19,6 +19,8 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.DatumStatusCounter;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.local.test.providers.NumericMessageProvider;
@@ -27,6 +29,9 @@ import org.apache.streams.util.ComponentUtils;
 import org.junit.After;
 import org.junit.Test;
 
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
 import java.util.Queue;
 import java.util.concurrent.*;
 
@@ -38,6 +43,7 @@ import static org.junit.Assert.*;
 public class BasicTasksTest {
 
 
+    private static final String MBEAN_ID = "test_bean";
     @After
     public void removeLocalMBeans() {
         try {
@@ -87,7 +93,7 @@ public class BasicTasksTest {
             assertTrue("Task should have completed running in aloted time.", service.isTerminated());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-        };
+        }
     }
 
     @Test
@@ -95,6 +101,8 @@ public class BasicTasksTest {
         int numMessages = 100;
         PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
         StreamsProcessorTask task = new StreamsProcessorTask(processor);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        task.setStreamsTaskCounter(counter);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
         task.addOutputQueue(outQueue);
@@ -114,8 +122,7 @@ public class BasicTasksTest {
                 fail("Processor task failed to output "+numMessages+" in a timely fashion.");
             }
         }
-        task.stopTask();
-        assertEquals(numMessages, processor.getMessageCount());
+        task.stopTask();;
         service.shutdown();
         try {
             if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -126,6 +133,11 @@ public class BasicTasksTest {
         } catch (InterruptedException e) {
             fail("Test Interupted.");
         }
+        assertEquals(numMessages, processor.getMessageCount());
+        assertEquals(numMessages, counter.getNumReceived());
+        assertEquals(numMessages, counter.getNumEmitted());
+        assertEquals(0, counter.getNumUnhandledErrors());
+        assertEquals(0.0, counter.getErrorRate(), 0.0);
     }
 
     @Test
@@ -133,6 +145,8 @@ public class BasicTasksTest {
         int numMessages = 100;
         DatumCounterWriter writer = new DatumCounterWriter("");
         StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        task.setStreamsTaskCounter(counter);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
 
@@ -160,7 +174,6 @@ public class BasicTasksTest {
             }
         }
         task.stopTask();
-        assertEquals(numMessages, writer.getDatumsCounted());
         service.shutdown();
         try {
             if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -171,6 +184,11 @@ public class BasicTasksTest {
         } catch (InterruptedException e) {
             fail("Test Interupted.");
         }
+        assertEquals(numMessages, writer.getDatumsCounted());
+        assertEquals(numMessages, counter.getNumReceived());
+        assertEquals(0, counter.getNumEmitted());
+        assertEquals(0, counter.getNumUnhandledErrors());
+        assertEquals(0.0, counter.getErrorRate(), 0.0);
     }
 
     @Test