You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/19 20:44:22 UTC

[1/6] incubator-streams git commit: STREAMS-216 | Updated the broadcast object to contain a streams ID which will make it easier to identify which Stream the beans belong to

Repository: incubator-streams
Updated Branches:
  refs/heads/master 91dd9a3c5 -> 6f2acaa21


STREAMS-216 | Updated the broadcast object to contain a streams ID which will make it easier to identify which Stream the beans belong to


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b9f3519b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b9f3519b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b9f3519b

Branch: refs/heads/master
Commit: b9f3519bf18213e5a9b343b1fba25a4b13f2ce3b
Parents: b03b1b4
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Tue Nov 11 16:26:23 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Tue Nov 11 16:26:23 2014 -0600

----------------------------------------------------------------------
 .../monitoring/tasks/BroadcastMonitorThread.java     | 15 +++++++++++++++
 .../org/apache/streams/pojo/json/Broadcast.json      |  4 ++++
 .../streams/local/builders/LocalStreamBuilder.java   |  1 +
 3 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b9f3519b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 6e07619..40e4cfe 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -45,6 +45,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
     private ObjectMapper objectMapper;
     private Map<String, Object> streamConfig;
     private String broadcastURI = null;
+    private String streamName = null;
     private MessagePersister messagePersister;
     private volatile boolean keepRunning;
 
@@ -54,6 +55,8 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
         server = ManagementFactory.getPlatformMBeanServer();
 
         setBroadcastURI();
+        setStreamName();
+
         messagePersister = new BroadcastMessagePersister(broadcastURI);
 
         initializeObjectMapper();
@@ -103,6 +106,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
                         }
 
                         if(broadcast != null) {
+                            broadcast.setStreamIdentifier(streamName);
                             messages.add(objectMapper.writeValueAsString(broadcast));
                         }
                     }
@@ -130,6 +134,17 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
         }
     }
 
+    private void setStreamName() {
+        if(streamConfig != null &&
+                streamConfig.containsKey("streamsID") &&
+                streamConfig.get("streamsID") != null &&
+                streamConfig.get("streamsID") instanceof String) {
+            streamName = streamConfig.get("streamsID").toString();
+        } else {
+            streamName = "{\"streamName\":\"Unknown Stream\"}";
+        }
+    }
+
     public void shutdown() {
         this.keepRunning = false;
         LOGGER.debug("Shutting down BroadcastMonitor Thread");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b9f3519b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
index 4d7f87b..94ec147 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@ -8,6 +8,10 @@
     "name": {
       "type": "string",
       "description": "Name of the MBean"
+    },
+    "streamIdentifier": {
+      "type": "string",
+      "description": "The name of the Stream that is currently executing"
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b9f3519b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 47cb08f..26e1b27 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -49,6 +49,7 @@ public class LocalStreamBuilder implements StreamBuilder {
 
     public static final String TIMEOUT_KEY = "TIMEOUT";
     public static final String BROADCAST_KEY = "broadcastURI";
+    public static final String STREAM_IDENTIFIER_KEY = "streamsID";
 
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;


[5/6] incubator-streams git commit: STREAMS-216 | Fixed unit tests and hardened LocalStreamBuilder, BaseStreamsTask, and BroadcastMonitorThread against NPEs

Posted by sb...@apache.org.
STREAMS-216 | Fixed unit tests and hardened LocalStreamBuilder, BaseStreamsTask, and BroadcastMonitorThread against NPEs


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a20f01ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a20f01ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a20f01ae

Branch: refs/heads/master
Commit: a20f01aefc5904ecb857fdb9a344023cf6a05100
Parents: fb8f9d2
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Tue Nov 18 11:24:04 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Tue Nov 18 11:24:04 2014 -0600

----------------------------------------------------------------------
 .../tasks/BroadcastMonitorThread.java           |  4 ++--
 .../local/builders/LocalStreamBuilder.java      |  3 ++-
 .../streams/local/tasks/BaseStreamsTask.java    |  6 ++++--
 .../local/builders/LocalStreamBuilderTest.java  |  9 +++++---
 .../local/counters/DatumStatusCounterTest.java  | 22 ++++++++++----------
 .../queues/ThroughputQueueSingleThreadTest.java | 15 ++++++-------
 6 files changed, 33 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 10b60b1..fd9354a 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -141,8 +141,8 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
             if (streamConfig != null &&
                     streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
                     streamConfig.get("monitoring_broadcast_interval_ms") != null &&
-                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
-                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+                    (streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer)) {
                 waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
             } else {
                 waitTime = DEFAULT_WAIT_TIME;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 19d50e1..a9afc3c 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -420,7 +420,8 @@ public class LocalStreamBuilder implements StreamBuilder {
     }
 
     private void setStreamIdentifier() {
-        if(streamConfig.containsKey(STREAM_IDENTIFIER_KEY) &&
+        if(streamConfig != null &&
+                streamConfig.containsKey(STREAM_IDENTIFIER_KEY) &&
                 streamConfig.get(STREAM_IDENTIFIER_KEY) != null &&
                 streamConfig.get(STREAM_IDENTIFIER_KEY).toString().length() > 0) {
             this.streamIdentifier = streamConfig.get(STREAM_IDENTIFIER_KEY).toString();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index cfb231d..9726963 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -205,7 +205,8 @@ public abstract class BaseStreamsTask implements StreamsTask {
     }
 
     public void setStartedAt() {
-        if(streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) &&
+        if(streamConfig != null &&
+                streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) &&
                 streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) != null &&
                 streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) instanceof Long) {
             this.startedAt = Long.parseLong(streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY).toString());
@@ -219,7 +220,8 @@ public abstract class BaseStreamsTask implements StreamsTask {
     }
 
     public void setStreamIdentifier() {
-        if(streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) &&
+        if(streamConfig != null &&
+                streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) &&
                 streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null &&
                 streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString().length() > 0) {
             this.streamIdentifier = streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index a675d87..ed67003 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -52,6 +52,7 @@ import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DatumCounterWriter;
 import org.apache.streams.local.test.writer.SystemOutWriter;
 import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -74,7 +75,9 @@ import javax.management.*;
  *
  */
 public class LocalStreamBuilderTest extends RandomizedTest {
-
+    private static final String MBEAN_ID = "test_id";
+    private static final String STREAM_ID = "test_stream";
+    private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
     @After
     public void removeLocalMBeans() {
@@ -90,12 +93,12 @@ public class LocalStreamBuilderTest extends RandomizedTest {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         for(String id : ids) {
             try {
-                mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+                mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)));
             } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
                 //No-op
             }
             try {
-                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id))));
+                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))));
             } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
                 //No-op
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
index 3a9a8dc..9775c6f 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -19,6 +19,7 @@ package org.apache.streams.local.counters;
 
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
@@ -32,7 +33,8 @@ import java.lang.management.ManagementFactory;
 public class DatumStatusCounterTest extends RandomizedTest {
 
     private static final String MBEAN_ID = "test_id";
-
+    private static final String STREAM_ID = "test_stream";
+    private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
 
     /**
@@ -42,7 +44,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @After
     public void unregisterMXBean() throws Exception {
         try {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID)));
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
         } catch (InstanceNotFoundException ife) {
             //No-op
         }
@@ -54,7 +56,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     public void testConstructor() {
         try {
-            new DatumStatusCounter(MBEAN_ID);
+            new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         } catch (Throwable t) {
             fail("Constructor Threw Exception : "+t.getMessage());
         }
@@ -67,7 +69,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testPassed() throws Exception {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementPassedCount();
@@ -76,7 +78,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new DatumStatusCounter(MBEAN_ID);
+        counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -94,7 +96,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testFailed() throws Exception {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementFailedCount();
@@ -103,7 +105,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new DatumStatusCounter(MBEAN_ID);
+        counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -121,7 +123,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testFailureRate() {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         assertEquals(0.0, counter.getFailRate(), 0);
         int failures = randomIntBetween(0, 100000);
         int passes = randomIntBetween(0, 100000);
@@ -129,6 +131,4 @@ public class DatumStatusCounterTest extends RandomizedTest {
         counter.incrementFailedCount(failures);
         assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
     }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2492161..ef669f4 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.queues;
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
@@ -35,7 +36,9 @@ import static org.junit.Assert.assertEquals;
  * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
  */
 public class ThroughputQueueSingleThreadTest extends RandomizedTest {
-
+    private static final String MBEAN_ID = "test_id";
+    private static final String STREAM_ID = "test_stream";
+    private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
     @After
     public void removeLocalMBeans() {
@@ -208,10 +211,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             Integer beanCount = mbs.getMBeanCount();
-            String id = "testQueue";
-            ThroughputQueue queue = new ThroughputQueue(id);
+            ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
             assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
-            ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+            ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
             assertNotNull(mBean);
         } catch (Exception e) {
             fail("Failed to register MXBean : "+e.getMessage());
@@ -226,12 +228,11 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             Integer beanCount = mbs.getMBeanCount();
-            String id = "testQueue";
             int numReg = randomIntBetween(2, 100);
             for(int i=0; i < numReg; ++i) {
-                ThroughputQueue queue = new ThroughputQueue(id+i);
+                ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME);
                 assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
-                ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i)));
+                ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME)));
                 assertNotNull(mBean);
             }
         } catch (Exception e) {


[2/6] incubator-streams git commit: Merge branch 'master' into STREAMS-216

Posted by sb...@apache.org.
Merge branch 'master' into STREAMS-216


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/966b12b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/966b12b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/966b12b3

Branch: refs/heads/master
Commit: 966b12b388b2a95a143a89094a7298bceb3539a4
Parents: b9f3519 0ffc40e
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Thu Nov 13 10:14:17 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Thu Nov 13 10:14:17 2014 -0600

----------------------------------------------------------------------
 .../streams/jackson/TypeConverterProcessor.java | 112 ++++++++
 .../test/TypeConverterProcessorTest.java        |  95 +++++++
 .../DatasiftActivitySerializerProcessor.java    |  89 ++++++
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |  12 +-
 .../DatasiftDefaultActivitySerializer.java      | 214 ---------------
 .../serializer/DatasiftEventClassifier.java     |  53 ++++
 .../DatasiftInstagramActivitySerializer.java    |   8 +-
 .../DatasiftInteractionActivitySerializer.java  | 222 +++++++++++++++
 .../DatasiftTweetActivitySerializer.java        | 266 ------------------
 .../DatasiftTwitterActivitySerializer.java      | 272 +++++++++++++++++++
 .../datasift/util/StreamsDatasiftMapper.java    |  11 +-
 .../apache/streams/datasift/DatasiftPush.json   |  30 ++
 .../com/datasift/test/DatasiftSerDeTest.java    |  18 +-
 .../DatasiftTypeConverterProcessorTest.java     |  72 -----
 .../DatasiftActivitySerializerTest.java         |  82 +++---
 .../serializer/DatasiftEventClassifierTest.java |  64 +++++
 ...DatasiftInstagramActivitySerializerTest.java |  43 +++
 ...tasiftInteractionActivitySerializerTest.java |  48 ++++
 .../DatasiftTwitterActivitySerializerTest.java  |  43 +++
 .../streams-provider-twitter/pom.xml            |   6 +
 .../processor/TwitterEventProcessor.java        | 194 -------------
 .../twitter/processor/TwitterTypeConverter.java | 209 --------------
 .../provider/TwitterEventClassifier.java        |  58 ++--
 .../serializer/StreamsTwitterMapper.java        |  11 +-
 .../TwitterJsonActivitySerializer.java          |  24 +-
 .../TwitterJsonDeleteActivitySerializer.java    |   6 +
 .../TwitterJsonRetweetActivitySerializer.java   |   6 +
 .../TwitterJsonTweetActivitySerializer.java     |   6 +
 .../TwitterJsonUserActivitySerializer.java      |   6 +
 ...erJsonUserstreameventActivitySerializer.java |   6 +
 .../streams/twitter/test/SimpleTweetTest.java   |  11 +-
 .../twitter/test/TweetActivitySerDeTest.java    |   6 +-
 .../streams/twitter/test/TweetSerDeTest.java    |   6 +-
 .../test/TwitterEventClassifierTest.java        |  34 +++
 .../streams/pig/test/PigProcessDatumTest.java   |   2 -
 .../pig/test/PigProcessDocumentTest.java        |   2 -
 .../streams/pig/test/PigSerializerTest.java     |   3 +-
 38 files changed, 1278 insertions(+), 1079 deletions(-)
----------------------------------------------------------------------



[6/6] incubator-streams git commit: Merge branch 'STREAMS-216'

Posted by sb...@apache.org.
Merge branch 'STREAMS-216'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6f2acaa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6f2acaa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6f2acaa2

Branch: refs/heads/master
Commit: 6f2acaa21d4fc7052fbab84bf9dd52b9c1b2ec4d
Parents: 91dd9a3 a20f01a
Author: sblackmon <sb...@apache.org>
Authored: Wed Nov 19 12:57:50 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Wed Nov 19 12:57:50 2014 -0600

----------------------------------------------------------------------
 .../tasks/BroadcastMonitorThread.java           |  5 +-
 .../org/apache/streams/pojo/json/Broadcast.json |  8 +++
 .../local/builders/LocalStreamBuilder.java      | 37 +++++++++++---
 .../streams/local/builders/StreamComponent.java | 31 +++++++-----
 .../local/counters/DatumStatusCounter.java      |  9 +++-
 .../local/counters/StreamsTaskCounter.java      | 16 ++++--
 .../streams/local/queues/ThroughputQueue.java   | 52 +++++++++++++++++---
 .../streams/local/tasks/BaseStreamsTask.java    | 41 ++++++++++++++-
 .../streams/local/tasks/StreamsMergeTask.java   |  7 ++-
 .../local/tasks/StreamsPersistWriterTask.java   | 11 +++--
 .../local/tasks/StreamsProcessorTask.java       | 14 ++++--
 .../local/tasks/StreamsProviderTask.java        | 11 +++--
 .../local/builders/LocalStreamBuilderTest.java  |  9 ++--
 .../local/counters/DatumStatusCounterTest.java  | 22 ++++-----
 .../local/counters/StreamsTaskCounterTest.java  | 16 +++---
 .../queues/ThroughputQueueSingleThreadTest.java | 15 +++---
 .../streams/local/tasks/BasicTasksTest.java     |  6 +--
 .../local/tasks/StreamsProviderTaskTest.java    | 10 ++--
 18 files changed, 238 insertions(+), 82 deletions(-)
----------------------------------------------------------------------



[3/6] incubator-streams git commit: Merge branch 'master' into STREAMS-216

Posted by sb...@apache.org.
Merge branch 'master' into STREAMS-216

Conflicts:
	streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
	streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a77a8f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a77a8f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a77a8f7

Branch: refs/heads/master
Commit: 9a77a8f7c29ccffc8a71b1a4d7974e7904d98651
Parents: 966b12b dc7ba80
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Thu Nov 13 12:46:56 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Thu Nov 13 12:46:56 2014 -0600

----------------------------------------------------------------------
 streams-components/streams-http/pom.xml         |  2 +-
 streams-monitoring/pom.xml                      | 29 +++++++
 .../jackson/DatumStatusCounterDeserializer.java | 76 +++++++++++++++++
 .../jackson/MemoryUsageDeserializer.java        | 79 ++++++++++++++++++
 .../jackson/StreamsTaskCounterDeserializer.java | 88 ++++++++++++++++++++
 .../jackson/ThroughputQueueDeserializer.java    | 87 +++++++++++++++++++
 .../tasks/BroadcastMonitorThread.java           | 30 ++++++-
 .../org/apache/streams/pojo/json/Broadcast.json | 17 ++++
 .../pojo/json/DatumStatusCounterBroadcast.json  | 22 +++++
 .../streams/pojo/json/MemoryUsageBroadcast.json | 30 +++++++
 .../pojo/json/StreamsTaskCounterBroadcast.json  | 38 +++++++++
 .../pojo/json/ThroughputQueueBroadcast.json     | 38 +++++++++
 .../jackson/MemoryUsageDeserializerTest.java    | 77 +++++++++++++++++
 .../src/test/resources/MemoryUsageObjects.json  |  1 +
 streams-pojo/pom.xml                            | 10 ---
 .../jackson/DatumStatusCounterDeserializer.java | 76 -----------------
 .../jackson/MemoryUsageDeserializer.java        | 79 ------------------
 .../jackson/StreamsTaskCounterDeserializer.java | 88 --------------------
 .../jackson/ThroughputQueueDeserializer.java    | 87 -------------------
 .../org/apache/streams/pojo/json/Broadcast.json | 17 ----
 .../pojo/json/DatumStatusCounterBroadcast.json  | 22 -----
 .../streams/pojo/json/MemoryUsageBroadcast.json | 30 -------
 .../pojo/json/StreamsTaskCounterBroadcast.json  | 38 ---------
 .../pojo/json/ThroughputQueueBroadcast.json     | 38 ---------
 .../jackson/MemoryUsageDeserializerTest.java    | 77 -----------------
 .../src/test/resources/MemoryUsageObjects.json  |  1 -
 .../local/builders/LocalStreamBuilder.java      |  1 +
 27 files changed, 612 insertions(+), 566 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --cc streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 40e4cfe,cbae7ee..25a5030
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@@ -55,8 -55,7 +56,9 @@@ public class BroadcastMonitorThread ext
          server = ManagementFactory.getPlatformMBeanServer();
  
          setBroadcastURI();
 +        setStreamName();
+         setWaitTime();
 +
          messagePersister = new BroadcastMessagePersister(broadcastURI);
  
          initializeObjectMapper();
@@@ -134,17 -132,30 +136,41 @@@
          }
      }
  
 +    private void setStreamName() {
-         if(streamConfig != null &&
++        if (streamConfig != null &&
 +                streamConfig.containsKey("streamsID") &&
 +                streamConfig.get("streamsID") != null &&
 +                streamConfig.get("streamsID") instanceof String) {
 +            streamName = streamConfig.get("streamsID").toString();
 +        } else {
 +            streamName = "{\"streamName\":\"Unknown Stream\"}";
 +        }
 +    }
 +
+     /**
+      * Go through streams config and set the thread's wait time (if present)
+      */
+     private void setWaitTime() {
+         try {
+             if (streamConfig != null &&
+                     streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
+                     streamConfig.get("monitoring_broadcast_interval_ms") != null &&
+                     streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+                     streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+                 waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+             } else {
+                 waitTime = DEFAULT_WAIT_TIME;
+             }
+ 
+             //Shutdown
+             if(waitTime == -1) {
+                 this.keepRunning = false;
+             }
+         } catch (Exception e) {
+             LOGGER.error("Exception while trying to set default broadcast thread wait time: {}", e);
+         }
+     }
+ 
      public void shutdown() {
          this.keepRunning = false;
          LOGGER.debug("Shutting down BroadcastMonitor Thread");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --cc streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
index 0000000,4d7f87b..94ec147
mode 000000,100644..100644
--- a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@@ -1,0 -1,13 +1,17 @@@
+ {
+   "type" : "object",
+   "title" : "object",
+   "javaType": "org.apache.streams.pojo.json.Broadcast",
+   "javaInterfaces": ["java.io.Serializable"],
+   "description" : "Base Broadcast class",
+   "properties" : {
+     "name": {
+       "type": "string",
+       "description": "Name of the MBean"
++    },
++    "streamIdentifier": {
++      "type": "string",
++      "description": "The name of the Stream that is currently executing"
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a77a8f7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 26e1b27,856b815..e3e42ff
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@@ -49,7 -49,7 +49,8 @@@ public class LocalStreamBuilder impleme
  
      public static final String TIMEOUT_KEY = "TIMEOUT";
      public static final String BROADCAST_KEY = "broadcastURI";
 +    public static final String STREAM_IDENTIFIER_KEY = "streamsID";
+     public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
  
      private Map<String, StreamComponent> providers;
      private Map<String, StreamComponent> components;


[4/6] incubator-streams git commit: STREAMS-216 | All JMX monitoring beans now include identifying information and the time that the stream was started

Posted by sb...@apache.org.
STREAMS-216 | All JMX monitoring beans now include identifying information and the time that the stream was started


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb8f9d20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb8f9d20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb8f9d20

Branch: refs/heads/master
Commit: fb8f9d209bf604a3b2127e22441e54b8e3259ad7
Parents: 9a77a8f
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Fri Nov 14 11:17:23 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Fri Nov 14 11:17:23 2014 -0600

----------------------------------------------------------------------
 .../tasks/BroadcastMonitorThread.java           | 14 ------
 .../org/apache/streams/pojo/json/Broadcast.json |  4 ++
 .../local/builders/LocalStreamBuilder.java      | 35 ++++++++++---
 .../streams/local/builders/StreamComponent.java | 31 +++++++-----
 .../local/counters/DatumStatusCounter.java      |  9 +++-
 .../local/counters/StreamsTaskCounter.java      | 16 ++++--
 .../streams/local/queues/ThroughputQueue.java   | 52 +++++++++++++++++---
 .../streams/local/tasks/BaseStreamsTask.java    | 39 ++++++++++++++-
 .../streams/local/tasks/StreamsMergeTask.java   |  7 ++-
 .../local/tasks/StreamsPersistWriterTask.java   | 11 +++--
 .../local/tasks/StreamsProcessorTask.java       | 14 ++++--
 .../local/tasks/StreamsProviderTask.java        | 11 +++--
 .../local/counters/StreamsTaskCounterTest.java  | 16 +++---
 .../streams/local/tasks/BasicTasksTest.java     |  6 +--
 .../local/tasks/StreamsProviderTaskTest.java    | 10 ++--
 15 files changed, 202 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 25a5030..10b60b1 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -46,7 +46,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
     private ObjectMapper objectMapper;
     private Map<String, Object> streamConfig;
     private String broadcastURI = null;
-    private String streamName = null;
     private MessagePersister messagePersister;
     private volatile boolean keepRunning;
 
@@ -56,7 +55,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
         server = ManagementFactory.getPlatformMBeanServer();
 
         setBroadcastURI();
-        setStreamName();
         setWaitTime();
 
         messagePersister = new BroadcastMessagePersister(broadcastURI);
@@ -108,7 +106,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
                         }
 
                         if(broadcast != null) {
-                            broadcast.setStreamIdentifier(streamName);
                             messages.add(objectMapper.writeValueAsString(broadcast));
                         }
                     }
@@ -136,17 +133,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
         }
     }
 
-    private void setStreamName() {
-        if (streamConfig != null &&
-                streamConfig.containsKey("streamsID") &&
-                streamConfig.get("streamsID") != null &&
-                streamConfig.get("streamsID") instanceof String) {
-            streamName = streamConfig.get("streamsID").toString();
-        } else {
-            streamName = "{\"streamName\":\"Unknown Stream\"}";
-        }
-    }
-
     /**
      * Go through streams config and set the thread's wait time (if present)
      */

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
index 94ec147..687ef9c 100644
--- a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@ -12,6 +12,10 @@
     "streamIdentifier": {
       "type": "string",
       "description": "The name of the Stream that is currently executing"
+    },
+    "startedAt": {
+      "type": "integer",
+      "description": "Milliseconds since epoch when this Stream was started"
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index e3e42ff..19d50e1 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -51,6 +51,8 @@ public class LocalStreamBuilder implements StreamBuilder {
     public static final String BROADCAST_KEY = "broadcastURI";
     public static final String STREAM_IDENTIFIER_KEY = "streamsID";
     public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
+    public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream";
+    public static final String DEFAULT_STARTED_AT_KEY = "startedAt";
 
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;
@@ -65,6 +67,8 @@ public class LocalStreamBuilder implements StreamBuilder {
     private Thread shutdownHook;
     private BroadcastMonitorThread broadcastMonitor;
     private int maxQueueCapacity;
+    private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
+    private DateTime startedAt = new DateTime();
 
     /**
      * Creates a local stream builder with no config object and default maximum internal queue size of 500
@@ -112,6 +116,11 @@ public class LocalStreamBuilder implements StreamBuilder {
             }
         };
 
+        setStreamIdentifier();
+        if(this.streamConfig != null) {
+            this.streamConfig.put(DEFAULT_STARTED_AT_KEY, startedAt.getMillis());
+        }
+
         this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
 
         this.futures = new HashMap<>();
@@ -120,7 +129,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) {
         validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, true));
+        this.providers.put(id, new StreamComponent(id, provider, true, streamConfig));
         ++this.totalTasks;
         if( provider instanceof DatumStatusCountable )
             ++this.monitorTasks;
@@ -130,7 +139,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
         validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, false));
+        this.providers.put(id, new StreamComponent(id, provider, false, streamConfig));
         ++this.totalTasks;
         if( provider instanceof DatumStatusCountable )
             ++this.monitorTasks;
@@ -140,7 +149,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) {
         validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, sequence));
+        this.providers.put(id, new StreamComponent(id, provider, sequence, streamConfig));
         ++this.totalTasks;
         if( provider instanceof DatumStatusCountable )
             ++this.monitorTasks;
@@ -150,7 +159,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) {
         validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, start, end));
+        this.providers.put(id, new StreamComponent(id, provider, start, end, streamConfig));
         ++this.totalTasks;
         if( provider instanceof DatumStatusCountable )
             ++this.monitorTasks;
@@ -160,7 +169,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
         validateId(id);
-        StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks);
+        StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig);
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
@@ -172,7 +181,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
         validateId(id);
-        StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks);
+        StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig);
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
@@ -281,7 +290,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         for(StreamComponent prov : this.providers.values()) {
             StreamsTask task = prov.createConnectedTask(getTimeout());
             task.setStreamConfig(this.streamConfig);
-            StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId());
+            StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), streamIdentifier, startedAt.getMillis());
             task.setStreamsTaskCounter(counter);
             this.executor.submit(task);
             provTasks.put(prov.getId(), (StreamsProviderTask) task);
@@ -296,7 +305,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         for(StreamComponent comp : this.components.values()) {
             int tasks = comp.getNumTasks();
             List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
-            StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId());
+            StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), streamIdentifier, startedAt.getMillis());
             for(int i=0; i < tasks; ++i) {
                 StreamsTask task = comp.createConnectedTask(getTimeout());
                 task.setStreamsTaskCounter(counter);
@@ -410,4 +419,14 @@ public class LocalStreamBuilder implements StreamBuilder {
         return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : -1;
     }
 
+    private void setStreamIdentifier() {
+        if(streamConfig.containsKey(STREAM_IDENTIFIER_KEY) &&
+                streamConfig.get(STREAM_IDENTIFIER_KEY) != null &&
+                streamConfig.get(STREAM_IDENTIFIER_KEY).toString().length() > 0) {
+            this.streamIdentifier = streamConfig.get(STREAM_IDENTIFIER_KEY).toString();
+        } else {
+            this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 4b96c5b..0dcc4d0 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -51,15 +51,18 @@ public class StreamComponent {
     private int numTasks = 1;
     private boolean perpetual;
 
+    private Map<String, Object> streamConfig;
+
     /**
      *
      * @param id
      * @param provider
      */
-    public StreamComponent(String id, StreamsProvider provider, boolean perpetual) {
+    public StreamComponent(String id, StreamsProvider provider, boolean perpetual, Map<String, Object> streamConfig) {
         this.id = id;
         this.provider = provider;
         this.perpetual = perpetual;
+        this.streamConfig = streamConfig;
         initializePrivateVariables();
     }
 
@@ -70,12 +73,13 @@ public class StreamComponent {
      * @param start
      * @param end
      */
-    public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) {
+    public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end, Map<String, Object> streamConfig) {
         this.id = id;
         this.provider = provider;
         this.dateRange = new DateTime[2];
         this.dateRange[START] = start;
         this.dateRange[END] = end;
+        this.streamConfig = streamConfig;
         initializePrivateVariables();
     }
 
@@ -86,10 +90,11 @@ public class StreamComponent {
      * @param provider
      * @param sequence
      */
-    public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) {
+    public StreamComponent(String id, StreamsProvider provider, BigInteger sequence, Map<String, Object> streamConfig) {
         this.id = id;
         this.provider = provider;
         this.sequence = sequence;
+        this.streamConfig = streamConfig;
     }
 
     /**
@@ -99,11 +104,12 @@ public class StreamComponent {
      * @param inQueue
      * @param numTasks
      */
-    public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks) {
+    public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks, Map<String, Object> streamConfig) {
         this.id = id;
         this.processor = processor;
         this.inQueue = inQueue;
         this.numTasks = numTasks;
+        this.streamConfig = streamConfig;
         initializePrivateVariables();
     }
 
@@ -114,11 +120,12 @@ public class StreamComponent {
      * @param inQueue
      * @param numTasks
      */
-    public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks) {
+    public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks, Map<String, Object> streamConfig) {
         this.id = id;
         this.writer = writer;
         this.inQueue = inQueue;
         this.numTasks = numTasks;
+        this.streamConfig = streamConfig;
         initializePrivateVariables();
     }
 
@@ -187,13 +194,13 @@ public class StreamComponent {
         StreamsTask task;
         if(this.processor != null) {
             if(this.numTasks > 1) {
-                task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+                task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor), streamConfig);
                 task.addInputQueue(this.inQueue);
                 for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
                     task.addOutputQueue(q);
                 }
             } else {
-                task = new StreamsProcessorTask(this.processor);
+                task = new StreamsProcessorTask(this.processor, streamConfig);
                 task.addInputQueue(this.inQueue);
                 for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
                     task.addOutputQueue(q);
@@ -202,10 +209,10 @@ public class StreamComponent {
         }
         else if(this.writer != null) {
             if(this.numTasks > 1) {
-                task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+                task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer), streamConfig);
                 task.addInputQueue(this.inQueue);
             } else {
-                task = new StreamsPersistWriterTask(this.writer);
+                task = new StreamsPersistWriterTask(this.writer, streamConfig);
                 task.addInputQueue(this.inQueue);
             }
         }
@@ -217,11 +224,11 @@ public class StreamComponent {
                 prov = this.provider;
             }
             if(this.dateRange == null && this.sequence == null)
-                task = new StreamsProviderTask(prov, this.perpetual);
+                task = new StreamsProviderTask(prov, this.perpetual, streamConfig);
             else if(this.sequence != null)
-                task = new StreamsProviderTask(prov, this.sequence);
+                task = new StreamsProviderTask(prov, this.sequence, streamConfig);
             else
-                task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+                task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig);
             //Adjust the timeout if necessary
             if(timeout != 0) {
                 ((StreamsProviderTask)task).setTimeout(timeout);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
index acada71..34d2bcc 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -18,6 +18,7 @@
 package org.apache.streams.local.counters;
 
 import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,16 +33,20 @@ import java.util.concurrent.atomic.AtomicLong;
 @ThreadSafe
 public class DatumStatusCounter implements DatumStatusCounterMXBean{
 
-    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s";
     private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class);
 
     private AtomicLong failed;
     private AtomicLong passed;
 
     public DatumStatusCounter(String id) {
+        this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+    }
+
+    public DatumStatusCounter(String id, String streamIdentifier, long startedAt) {
         this.failed = new AtomicLong(0);
         this.passed = new AtomicLong(0);
-        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
+        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt), this);
     }
 
     public void incrementFailedCount() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index 68c6364..9bd5d49 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -19,7 +19,9 @@ package org.apache.streams.local.counters;
 
 import net.jcip.annotations.GuardedBy;
 import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,9 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
  *
  */
 @ThreadSafe
-public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
+public class StreamsTaskCounter implements StreamsTaskCounterMXBean {
 
-    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s";
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s";
     private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class);
 
     private AtomicLong emitted;
@@ -48,12 +50,20 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
      * @param id
      */
     public StreamsTaskCounter(String id) {
+        this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+    }
+
+    /**
+     *
+     * @param id
+     */
+    public StreamsTaskCounter(String id, String streamId, long startedAt) {
         this.emitted = new AtomicLong(0);
         this.received = new AtomicLong(0);
         this.errors = new AtomicLong(0);
         this.totalTime = new AtomicLong(0);
         this.maxTime = -1;
-        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
+        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamId, startedAt), this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index de1add3..e4a6ab9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streams.local.queues;
 
+import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -43,7 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean {
 
-    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s";
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
 
@@ -60,7 +61,16 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
      * Creates an unbounded, unregistered {@code ThroughputQueue}
      */
     public ThroughputQueue() {
-        this(-1, null);
+        this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+    }
+
+    /**
+     *
+     * @param streamIdentifier
+     * @param startedAt
+     */
+    public ThroughputQueue(String streamIdentifier, long startedAt) {
+        this(-1, null, streamIdentifier, startedAt);
     }
 
     /**
@@ -69,7 +79,17 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
      */
     public ThroughputQueue(int maxSize) {
-        this(maxSize, null);
+        this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+    }
+
+    /**
+     *
+     * @param maxSize
+     * @param streamIdentifier
+     * @param startedAt
+     */
+    public ThroughputQueue(int maxSize, String streamIdentifier, long startedAt) {
+        this(maxSize, null, streamIdentifier, startedAt);
     }
 
     /**
@@ -78,7 +98,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
      * @param id unique id for this queue to be registered with. if id == NULL then not registered
      */
     public ThroughputQueue(String id) {
-        this(-1, id);
+        this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+    }
+
+    /**
+     *
+     * @param id
+     * @param streamIdentifier
+     * @param startedAt
+     */
+    public ThroughputQueue(String id, String streamIdentifier, long startedAt) {
+        this(-1, id, streamIdentifier, startedAt);
+    }
+
+    /**
+     *
+     * @param maxSize
+     * @param id
+     */
+    public ThroughputQueue(int maxSize, String id) {
+        this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+
     }
 
     /**
@@ -87,7 +127,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
      * @param id      unique id for this queue to be registered with. if id == NULL then not registered
      */
-    public ThroughputQueue(int maxSize, String id) {
+    public ThroughputQueue(int maxSize, String id, String streamIdentifier, long startedAt) {
         if (maxSize < 1) {
             this.underlyingQueue = new LinkedBlockingQueue<>();
         } else {
@@ -102,7 +142,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         this.totalQueueTime = new AtomicLong(0);
         if (id != null) {
             try {
-                ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+                ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt));
                 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
                 mbs.registerMBean(this, name);
             } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 6755d77..cfb231d 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
@@ -45,10 +46,18 @@ public abstract class BaseStreamsTask implements StreamsTask {
     private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList<BlockingQueue<StreamsDatum>>();
     private int inIndex = 0;
     private ObjectMapper mapper;
+    protected Map<String, Object> streamConfig;
 
-    public BaseStreamsTask() {
+    private long startedAt;
+    private String streamIdentifier;
+
+    public BaseStreamsTask(Map<String, Object> config) {
         this.mapper = new StreamsJacksonMapper();
         this.mapper.registerSubtypes(Activity.class);
+        this.streamConfig = config;
+
+        setStreamIdentifier();
+        setStartedAt();
     }
 
 
@@ -190,4 +199,32 @@ public abstract class BaseStreamsTask implements StreamsTask {
         }
         return copyTo;
     }
+
+    public long getStartedAt() {
+        return startedAt;
+    }
+
+    public void setStartedAt() {
+        if(streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) &&
+                streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) != null &&
+                streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) instanceof Long) {
+            this.startedAt = Long.parseLong(streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY).toString());
+        } else {
+            this.startedAt = -1;
+        }
+    }
+
+    public String getStreamIdentifier() {
+        return streamIdentifier;
+    }
+
+    public void setStreamIdentifier() {
+        if(streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) &&
+                streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null &&
+                streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString().length() > 0) {
+            this.streamIdentifier = streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString();
+        } else {
+            this.streamIdentifier = LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 8280f29..27d1c6e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -37,10 +37,15 @@ public class StreamsMergeTask extends BaseStreamsTask {
     private long sleepTime;
 
     public StreamsMergeTask() {
-        this(DEFAULT_SLEEP_TIME_MS);
+        this(DEFAULT_SLEEP_TIME_MS, null);
     }
 
     public StreamsMergeTask(long sleepTime) {
+        this(sleepTime, null);
+    }
+
+    public StreamsMergeTask(long sleepTime, Map<String, Object> streamConfig) {
+        super(streamConfig);
         this.sleepTime = sleepTime;
         this.keepRunning = new AtomicBoolean(true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 003ab9e..235ee92 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -58,7 +58,11 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
      * @param writer writer to execute in task
      */
     public StreamsPersistWriterTask(StreamsPersistWriter writer) {
-        this(writer, DEFAULT_SLEEP_TIME_MS);
+        this(writer, DEFAULT_SLEEP_TIME_MS, null);
+    }
+
+    public StreamsPersistWriterTask(StreamsPersistWriter writer, Map<String, Object> streamConfig) {
+        this(writer, DEFAULT_SLEEP_TIME_MS, streamConfig);
     }
 
     /**
@@ -66,7 +70,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
      * @param writer writer to execute in task
      * @param sleepTime time to sleep when inbound queue is empty.
      */
-    public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) {
+    public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime, Map<String, Object> streamConfig) {
+        super(streamConfig);
         this.writer = writer;
         this.sleepTime = sleepTime;
         this.keepRunning = new AtomicBoolean(true);
@@ -99,7 +104,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
         try {
             this.writer.prepare(this.streamConfig);
             if(this.counter == null) {
-                this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString());
+                this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
             }
             while(this.keepRunning.get()) {
                 StreamsDatum datum = null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index b6ab498..c470d0b 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -59,15 +59,23 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
      * @param processor process to run in task
      */
     public StreamsProcessorTask(StreamsProcessor processor) {
-        this(processor, DEFAULT_SLEEP_TIME_MS);
+        this(processor, DEFAULT_SLEEP_TIME_MS, null);
     }
 
     /**
      *
+     * @param processor
+     * @param streamConfig
+     */
+    public StreamsProcessorTask(StreamsProcessor processor, Map<String, Object> streamConfig) { this(processor, DEFAULT_SLEEP_TIME_MS, streamConfig); }
+
+    /**
+     *
      * @param processor processor to run in task
      * @param sleepTime time to sleep when incoming queue is empty
      */
-    public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) {
+    public StreamsProcessorTask(StreamsProcessor processor, long sleepTime, Map<String, Object> streamConfig) {
+        super(streamConfig);
         this.processor = processor;
         this.sleepTime = sleepTime;
         this.keepRunning = new AtomicBoolean(true);
@@ -105,7 +113,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
         try {
             this.processor.prepare(this.streamConfig);
             if(this.counter == null) {
-                this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString());
+                this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
             }
             while(this.keepRunning.get()) {
                 StreamsDatum datum = null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 2475780..8c87d7a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -72,7 +72,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
      * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
      * @param provider
      */
-    public StreamsProviderTask(StreamsProvider provider, boolean perpetual) {
+    public StreamsProviderTask(StreamsProvider provider, boolean perpetual, Map<String, Object> streamConfig) {
+        super(streamConfig);
         this.provider = provider;
         if( perpetual )
             this.type = Type.PERPETUAL;
@@ -87,7 +88,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
      * @param provider
      * @param sequence
      */
-    public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) {
+    public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, Map<String, Object> streamConfig) {
+        super(streamConfig);
         this.provider = provider;
         this.type = Type.READ_NEW;
         this.sequence = sequence;
@@ -101,7 +103,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
      * @param start
      * @param end
      */
-    public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) {
+    public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, Map<String, Object> streamConfig) {
+        super(streamConfig);
         this.provider = provider;
         this.type = Type.READ_RANGE;
         this.dateRange = new DateTime[2];
@@ -149,7 +152,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             //Negative values mean we want to run forever
             long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
             if(this.counter == null) { //should never be null
-                this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString());
+                this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
             }
             switch(this.type) {
                 case PERPETUAL: {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
index a001845..da567fe 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -52,7 +52,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
     @Test
     public void testConstructor() {
         try {
-            new StreamsTaskCounter(MBEAN_ID);
+            new StreamsTaskCounter(MBEAN_ID, null, -1);
         } catch (Throwable t) {
             fail("Constructor threw error : "+t.getMessage());
         }
@@ -65,7 +65,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testEmitted() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementEmittedCount();
@@ -74,7 +74,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new StreamsTaskCounter(MBEAN_ID);
+        counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -92,7 +92,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testReceived() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementReceivedCount();
@@ -101,7 +101,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new StreamsTaskCounter(MBEAN_ID);
+        counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -119,7 +119,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testError() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementErrorCount();
@@ -128,7 +128,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new StreamsTaskCounter(MBEAN_ID);
+        counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -146,7 +146,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testErrorRate() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         assertEquals(0.0, counter.getErrorRate(), 0);
         int failures = randomIntBetween(0, 100000);
         int received = randomIntBetween(0, 100000);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index a0e28cd..d5efc41 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -57,7 +57,7 @@ public class BasicTasksTest {
     public void testProviderTask() {
         int numMessages = 100;
         NumericMessageProvider provider = new NumericMessageProvider(numMessages);
-        StreamsProviderTask task = new StreamsProviderTask(provider, false);
+        StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         task.addOutputQueue(outQueue);
         //Test that adding input queues to providers is not valid
@@ -101,7 +101,7 @@ public class BasicTasksTest {
         int numMessages = 100;
         PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
         StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         task.setStreamsTaskCounter(counter);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
@@ -145,7 +145,7 @@ public class BasicTasksTest {
         int numMessages = 100;
         DatumCounterWriter writer = new DatumCounterWriter("");
         StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
         task.setStreamsTaskCounter(counter);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
index 5e18650..222566d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
@@ -61,7 +61,7 @@ public class StreamsProviderTaskTest {
 
     @Test
     public void runPerpetual() {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
         when(mockProvider.isRunning()).thenReturn(true);
         when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
         task.setTimeout(500);
@@ -75,7 +75,7 @@ public class StreamsProviderTaskTest {
     @Test
     public void flushes() {
         BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
         when(mockProvider.isRunning()).thenReturn(true);
         when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
         task.setTimeout(100);
@@ -95,7 +95,7 @@ public class StreamsProviderTaskTest {
 
     @Test
     public void runNonPerpetual() {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, false);
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null);
         when(mockProvider.isRunning()).thenReturn(true);
         when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
         task.setTimeout(500);
@@ -108,7 +108,7 @@ public class StreamsProviderTaskTest {
 
     @Test
     public void stoppable() throws InterruptedException {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
         when(mockProvider.isRunning()).thenReturn(true);
         when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
         task.setTimeout(-1);
@@ -129,7 +129,7 @@ public class StreamsProviderTaskTest {
 
     @Test
     public void earlyException() throws InterruptedException {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
         when(mockProvider.isRunning()).thenReturn(true);
         doThrow(new RuntimeException()).when(mockProvider).prepare(null);
         task.setTimeout(-1);