You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/11/07 20:25:44 UTC
[2/8] incubator-streams git commit: Fixed tests by unregistering
previously registered mbeans
Fixed tests by unregistering previously registered mbeans
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d71568dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d71568dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d71568dc
Branch: refs/heads/master
Commit: d71568dc72716d8d6b10ef0de74df5325f3d1336
Parents: 7e65a42
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 15:30:28 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 15:30:28 2014 -0500
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 5 +++
.../local/builders/LocalStreamBuilderTest.java | 32 +++++++++++++++++---
2 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d71568dc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 57f3aa4..2161638 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
import org.apache.log4j.spi.LoggerFactory;
import org.apache.streams.core.*;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
@@ -271,6 +272,8 @@ public class LocalStreamBuilder implements StreamBuilder {
for(StreamComponent prov : this.providers.values()) {
StreamsTask task = prov.createConnectedTask(getTimeout());
task.setStreamConfig(this.streamConfig);
+ StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId());
+ task.setStreamsTaskCounter(counter);
this.executor.submit(task);
provTasks.put(prov.getId(), (StreamsProviderTask) task);
if( prov.isOperationCountable() ) {
@@ -284,8 +287,10 @@ public class LocalStreamBuilder implements StreamBuilder {
for(StreamComponent comp : this.components.values()) {
int tasks = comp.getNumTasks();
List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+ StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId());
for(int i=0; i < tasks; ++i) {
StreamsTask task = comp.createConnectedTask(getTimeout());
+ task.setStreamsTaskCounter(counter);
task.setStreamConfig(this.streamConfig);
this.futures.put(task, this.executor.submit(task));
compTasks.add(task);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d71568dc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index e602181..fea7e53 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -43,6 +43,7 @@ import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.processors.SlowProcessor;
@@ -81,10 +82,17 @@ public class LocalStreamBuilderTest extends RandomizedTest {
} catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
//No-op
}
+ try {
+ mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id))));
+ } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+ //No-op
+ }
}
}
+
+
@Test
public void testStreamIdValidations() {
StreamBuilder builder = new LocalStreamBuilder();
@@ -104,7 +112,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
exp = e;
}
assertNotNull(exp);
- removeRegisteredMBeans("1", "2");
+ removeRegisteredMBeans("1", "2", "id");
}
@Test
@@ -160,9 +168,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
}
} finally {
for(int i=0; i < numProcessors; ++i) {
- removeRegisteredMBeans(processorId+i);
+ removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
}
- removeRegisteredMBeans("writer");
+ removeRegisteredMBeans("writer", "numeric_provider");
}
}
@@ -199,7 +207,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
for(int i=0; i < numProcessors; ++i) {
removeRegisteredMBeans(processorId+i);
}
- removeRegisteredMBeans("writer");
+ removeRegisteredMBeans("writer", "numeric_provider");
}
}
@@ -221,7 +229,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
- removeRegisteredMBeans("proc1", "proc2", "writer1");
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
}
}
@@ -239,6 +249,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
}
@@ -257,6 +270,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
builder.start();
assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "w1");
}
}
@@ -278,6 +294,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
//We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
}
@@ -305,6 +324,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
service.awaitTermination(30000, TimeUnit.MILLISECONDS);
assertThat(Thread.activeCount(), is(equalTo(before)));
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "w1");
}
}