You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/19 19:58:14 UTC

[5/5] incubator-streams git commit: STREAMS-216 | Fixed unit tests and hardened LocalStreamBuilder, BaseStreamsTask, and BroadcastMonitorThread against NPEs

STREAMS-216 | Fixed unit tests and hardened LocalStreamBuilder, BaseStreamsTask, and BroadcastMonitorThread against NPEs


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a20f01ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a20f01ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a20f01ae

Branch: refs/heads/STREAMS-216
Commit: a20f01aefc5904ecb857fdb9a344023cf6a05100
Parents: fb8f9d2
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Tue Nov 18 11:24:04 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Tue Nov 18 11:24:04 2014 -0600

----------------------------------------------------------------------
 .../tasks/BroadcastMonitorThread.java           |  4 ++--
 .../local/builders/LocalStreamBuilder.java      |  3 ++-
 .../streams/local/tasks/BaseStreamsTask.java    |  6 ++++--
 .../local/builders/LocalStreamBuilderTest.java  |  9 +++++---
 .../local/counters/DatumStatusCounterTest.java  | 22 ++++++++++----------
 .../queues/ThroughputQueueSingleThreadTest.java | 15 ++++++-------
 6 files changed, 33 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 10b60b1..fd9354a 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -141,8 +141,8 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
             if (streamConfig != null &&
                     streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
                     streamConfig.get("monitoring_broadcast_interval_ms") != null &&
-                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
-                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) {
+                    (streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long ||
+                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer)) {
                 waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
             } else {
                 waitTime = DEFAULT_WAIT_TIME;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 19d50e1..a9afc3c 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -420,7 +420,8 @@ public class LocalStreamBuilder implements StreamBuilder {
     }
 
     private void setStreamIdentifier() {
-        if(streamConfig.containsKey(STREAM_IDENTIFIER_KEY) &&
+        if(streamConfig != null &&
+                streamConfig.containsKey(STREAM_IDENTIFIER_KEY) &&
                 streamConfig.get(STREAM_IDENTIFIER_KEY) != null &&
                 streamConfig.get(STREAM_IDENTIFIER_KEY).toString().length() > 0) {
             this.streamIdentifier = streamConfig.get(STREAM_IDENTIFIER_KEY).toString();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index cfb231d..9726963 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -205,7 +205,8 @@ public abstract class BaseStreamsTask implements StreamsTask {
     }
 
     public void setStartedAt() {
-        if(streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) &&
+        if(streamConfig != null &&
+                streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) &&
                 streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) != null &&
                 streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) instanceof Long) {
             this.startedAt = Long.parseLong(streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY).toString());
@@ -219,7 +220,8 @@ public abstract class BaseStreamsTask implements StreamsTask {
     }
 
     public void setStreamIdentifier() {
-        if(streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) &&
+        if(streamConfig != null &&
+                streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) &&
                 streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null &&
                 streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString().length() > 0) {
             this.streamIdentifier = streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index a675d87..ed67003 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -52,6 +52,7 @@ import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DatumCounterWriter;
 import org.apache.streams.local.test.writer.SystemOutWriter;
 import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -74,7 +75,9 @@ import javax.management.*;
  *
  */
 public class LocalStreamBuilderTest extends RandomizedTest {
-
+    private static final String MBEAN_ID = "test_id";
+    private static final String STREAM_ID = "test_stream";
+    private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
     @After
     public void removeLocalMBeans() {
@@ -90,12 +93,12 @@ public class LocalStreamBuilderTest extends RandomizedTest {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         for(String id : ids) {
             try {
-                mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+                mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)));
             } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
                 //No-op
             }
             try {
-                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id))));
+                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))));
             } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
                 //No-op
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
index 3a9a8dc..9775c6f 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -19,6 +19,7 @@ package org.apache.streams.local.counters;
 
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
@@ -32,7 +33,8 @@ import java.lang.management.ManagementFactory;
 public class DatumStatusCounterTest extends RandomizedTest {
 
     private static final String MBEAN_ID = "test_id";
-
+    private static final String STREAM_ID = "test_stream";
+    private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
 
     /**
@@ -42,7 +44,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @After
     public void unregisterMXBean() throws Exception {
         try {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID)));
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
         } catch (InstanceNotFoundException ife) {
             //No-op
         }
@@ -54,7 +56,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     public void testConstructor() {
         try {
-            new DatumStatusCounter(MBEAN_ID);
+            new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         } catch (Throwable t) {
             fail("Constructor Threw Exception : "+t.getMessage());
         }
@@ -67,7 +69,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testPassed() throws Exception {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementPassedCount();
@@ -76,7 +78,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new DatumStatusCounter(MBEAN_ID);
+        counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -94,7 +96,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testFailed() throws Exception {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         int numIncrements = randomIntBetween(1, 100000);
         for(int i=0; i < numIncrements; ++i) {
             counter.incrementFailedCount();
@@ -103,7 +105,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
 
         unregisterMXBean();
 
-        counter = new DatumStatusCounter(MBEAN_ID);
+        counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         numIncrements = randomIntBetween(1, 100000);
         long total = 0;
         for(int i=0; i < numIncrements; ++i) {
@@ -121,7 +123,7 @@ public class DatumStatusCounterTest extends RandomizedTest {
     @Test
     @Repeat(iterations = 3)
     public void testFailureRate() {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
         assertEquals(0.0, counter.getFailRate(), 0);
         int failures = randomIntBetween(0, 100000);
         int passes = randomIntBetween(0, 100000);
@@ -129,6 +131,4 @@ public class DatumStatusCounterTest extends RandomizedTest {
         counter.incrementFailedCount(failures);
         assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
     }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2492161..ef669f4 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.queues;
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
@@ -35,7 +36,9 @@ import static org.junit.Assert.assertEquals;
  * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
  */
 public class ThroughputQueueSingleThreadTest extends RandomizedTest {
-
+    private static final String MBEAN_ID = "test_id";
+    private static final String STREAM_ID = "test_stream";
+    private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
     @After
     public void removeLocalMBeans() {
@@ -208,10 +211,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             Integer beanCount = mbs.getMBeanCount();
-            String id = "testQueue";
-            ThroughputQueue queue = new ThroughputQueue(id);
+            ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
             assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
-            ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+            ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
             assertNotNull(mBean);
         } catch (Exception e) {
             fail("Failed to register MXBean : "+e.getMessage());
@@ -226,12 +228,11 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             Integer beanCount = mbs.getMBeanCount();
-            String id = "testQueue";
             int numReg = randomIntBetween(2, 100);
             for(int i=0; i < numReg; ++i) {
-                ThroughputQueue queue = new ThroughputQueue(id+i);
+                ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME);
                 assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
-                ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i)));
+                ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME)));
                 assertNotNull(mBean);
             }
         } catch (Exception e) {