You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/11/07 20:25:44 UTC

[2/8] incubator-streams git commit: Fixed tests by unregistering previously registered mbeans

Fixed tests by unregistering previously registered mbeans


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d71568dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d71568dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d71568dc

Branch: refs/heads/master
Commit: d71568dc72716d8d6b10ef0de74df5325f3d1336
Parents: 7e65a42
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 15:30:28 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 15:30:28 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      |  5 +++
 .../local/builders/LocalStreamBuilderTest.java  | 32 +++++++++++++++++---
 2 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d71568dc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 57f3aa4..2161638 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
 
 import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
@@ -271,6 +272,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         for(StreamComponent prov : this.providers.values()) {
             StreamsTask task = prov.createConnectedTask(getTimeout());
             task.setStreamConfig(this.streamConfig);
+            StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId());
+            task.setStreamsTaskCounter(counter);
             this.executor.submit(task);
             provTasks.put(prov.getId(), (StreamsProviderTask) task);
             if( prov.isOperationCountable() ) {
@@ -284,8 +287,10 @@ public class LocalStreamBuilder implements StreamBuilder {
         for(StreamComponent comp : this.components.values()) {
             int tasks = comp.getNumTasks();
             List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+            StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId());
             for(int i=0; i < tasks; ++i) {
                 StreamsTask task = comp.createConnectedTask(getTimeout());
+                task.setStreamsTaskCounter(counter);
                 task.setStreamConfig(this.streamConfig);
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d71568dc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index e602181..fea7e53 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -43,6 +43,7 @@ import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.local.test.processors.SlowProcessor;
@@ -81,10 +82,17 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
                 //No-op
             }
+            try {
+                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id))));
+            } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+                //No-op
+            }
         }
     }
 
 
+
+
     @Test
     public void testStreamIdValidations() {
         StreamBuilder builder = new LocalStreamBuilder();
@@ -104,7 +112,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             exp = e;
         }
         assertNotNull(exp);
-        removeRegisteredMBeans("1", "2");
+        removeRegisteredMBeans("1", "2", "id");
     }
 
     @Test
@@ -160,9 +168,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             }
         } finally {
             for(int i=0; i < numProcessors; ++i) {
-                removeRegisteredMBeans(processorId+i);
+                removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
             }
-            removeRegisteredMBeans("writer");
+            removeRegisteredMBeans("writer", "numeric_provider");
         }
     }
 
@@ -199,7 +207,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             for(int i=0; i < numProcessors; ++i) {
                 removeRegisteredMBeans(processorId+i);
             }
-            removeRegisteredMBeans("writer");
+            removeRegisteredMBeans("writer", "numeric_provider");
         }
     }
 
@@ -221,7 +229,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
             assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
         } finally {
-            removeRegisteredMBeans("proc1", "proc2", "writer1");
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+            removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
         }
     }
 
@@ -239,6 +249,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
             assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
         }
     }
@@ -257,6 +270,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             builder.start();
             assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "w1");
         }
     }
@@ -278,6 +294,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
             assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
         }
     }
@@ -305,6 +324,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             service.awaitTermination(30000, TimeUnit.MILLISECONDS);
             assertThat(Thread.activeCount(), is(equalTo(before)));
         } finally {
+            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
             removeRegisteredMBeans("prov1", "proc1", "w1");
         }
     }