You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/11 20:54:17 UTC
[10/32] incubator-streams git commit: Implemented new
StreamTasksCounter into StreamsTasks
Implemented new StreamTasksCounter into StreamsTasks
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7e65a423
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7e65a423
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7e65a423
Branch: refs/heads/STREAMS-212
Commit: 7e65a423f31c2fc4540b72ade6be904c77638784
Parents: d305371
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 14:16:40 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 14:16:40 2014 -0500
----------------------------------------------------------------------
.../streams/local/tasks/StreamsMergeTask.java | 7 ++++
.../local/tasks/StreamsPersistWriterTask.java | 18 +++++++--
.../local/tasks/StreamsProcessorTask.java | 20 ++++++++--
.../local/tasks/StreamsProviderTask.java | 15 +++++++
.../apache/streams/local/tasks/StreamsTask.java | 4 ++
.../streams/local/tasks/BasicTasksTest.java | 41 ++++++++++++++++++--
6 files changed, 93 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 7a4c806..8280f29 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -19,6 +19,8 @@
package org.apache.streams.local.tasks;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,4 +80,9 @@ public class StreamsMergeTask extends BaseStreamsTask {
}
}
}
+
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ throw new NotImplementedException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index cab46b8..003ab9e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -20,13 +20,11 @@ package org.apache.streams.local.tasks;
import org.apache.streams.core.*;
import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +43,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
private BlockingQueue<StreamsDatum> inQueue;
private AtomicBoolean isRunning;
private AtomicBoolean blocked;
+ private StreamsTaskCounter counter;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -99,6 +98,9 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
public void run() {
try {
this.writer.prepare(this.streamConfig);
+ if(this.counter == null) {
+ this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString());
+ }
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
@@ -111,14 +113,18 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
Thread.currentThread().interrupt();
}
if(datum != null) {
+ this.counter.incrementReceivedCount();
try {
+ long startTime = System.currentTimeMillis();
this.writer.write(datum);
+ this.counter.addTime(System.currentTimeMillis() - startTime);
statusCounter.incrementStatus(DatumStatus.SUCCESS);
} catch (Exception e) {
LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
this.keepRunning.set(false); // why do we shutdown on a failed write ?
statusCounter.incrementStatus(DatumStatus.FAIL);
DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
+ this.counter.incrementErrorCount();
}
} else { //datums should never be null
LOGGER.debug("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
@@ -151,4 +157,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
return queues;
}
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index ee69127..8d66847 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -21,13 +21,11 @@ package org.apache.streams.local.tasks;
import com.google.common.collect.Maps;
import org.apache.streams.core.*;
import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,6 +45,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
private BlockingQueue<StreamsDatum> inQueue;
private AtomicBoolean isRunning;
private AtomicBoolean blocked;
+ private StreamsTaskCounter counter;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -105,6 +104,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
public void run() {
try {
this.processor.prepare(this.streamConfig);
+ if(this.counter == null) {
+ this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString());
+ }
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
@@ -117,11 +119,15 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
Thread.currentThread().interrupt();
}
if(datum != null) {
+ this.counter.incrementReceivedCount();
try {
+ long startTime = System.currentTimeMillis();
List<StreamsDatum> output = this.processor.process(datum);
+ this.counter.addTime(System.currentTimeMillis() - startTime);
if(output != null) {
for(StreamsDatum outDatum : output) {
super.addToOutgoingQueue(datum);
+ this.counter.incrementEmittedCount();
statusCounter.incrementStatus(DatumStatus.SUCCESS);
}
}
@@ -130,6 +136,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
this.keepRunning.set(false);
Thread.currentThread().interrupt();
} catch (Throwable t) {
+ this.counter.incrementErrorCount();
LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t.getMessage());
statusCounter.incrementStatus(DatumStatus.FAIL);
//Add the error to the metadata, but keep processing
@@ -151,4 +158,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
queues.add(this.inQueue);
return queues;
}
+
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index c16f64d..2475780 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.tasks;
import org.apache.streams.core.*;
import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,6 +66,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
private long sleepTime;
private int zeros = 0;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
+ private StreamsTaskCounter counter;
/**
* Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
@@ -145,13 +148,18 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
StreamsResultSet resultSet = null;
//Negative values mean we want to run forever
long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
+ if(this.counter == null) { //should never be null
+ this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString());
+ }
switch(this.type) {
case PERPETUAL: {
provider.startStream();
this.started.set(true);
while(this.isRunning()) {
try {
+ long startTime = System.currentTimeMillis();
resultSet = provider.readCurrent();
+ this.counter.addTime(System.currentTimeMillis() - startTime);
if( resultSet.size() == 0 )
zeros++;
else {
@@ -164,6 +172,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
if(zeros > 0)
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
+ this.counter.incrementErrorCount();
LOGGER.warn("Thread interrupted");
this.keepRunning.set(false);
}
@@ -219,8 +228,10 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
if(datum != null) {
try {
super.addToOutgoingQueue(datum);
+ this.counter.incrementEmittedCount();
statusCounter.incrementStatus(DatumStatus.SUCCESS);
} catch( Exception e ) {
+ this.counter.incrementErrorCount();
statusCounter.incrementStatus(DatumStatus.FAIL);
DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
}
@@ -229,4 +240,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.flushing.set(false);
}
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 7513631..8423095 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -19,6 +19,7 @@
package org.apache.streams.local.tasks;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import java.util.List;
import java.util.Map;
@@ -87,4 +88,7 @@ public interface StreamsTask extends Runnable{
*/
public List<BlockingQueue<StreamsDatum>> getOutputQueues();
+
+ public void setStreamsTaskCounter(StreamsTaskCounter counter);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index f524db0..f62250d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -19,12 +19,18 @@
package org.apache.streams.local.tasks;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.DatumStatusCounter;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.providers.NumericMessageProvider;
import org.apache.streams.local.test.writer.DatumCounterWriter;
+import org.junit.After;
import org.junit.Test;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
import java.util.Queue;
import java.util.concurrent.*;
@@ -36,6 +42,21 @@ import static org.junit.Assert.*;
public class BasicTasksTest {
+ private static final String MBEAN_ID = "test_bean";
+
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
+ }
+ }
+
@Test
public void testProviderTask() {
@@ -77,7 +98,7 @@ public class BasicTasksTest {
assertTrue("Task should have completed running in aloted time.", service.isTerminated());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- };
+ }
}
@Test
@@ -85,6 +106,8 @@ public class BasicTasksTest {
int numMessages = 100;
PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
task.addOutputQueue(outQueue);
@@ -104,8 +127,7 @@ public class BasicTasksTest {
fail("Processor task failed to output "+numMessages+" in a timely fashion.");
}
}
- task.stopTask();
- assertEquals(numMessages, processor.getMessageCount());
+ task.stopTask();;
service.shutdown();
try {
if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -116,6 +138,11 @@ public class BasicTasksTest {
} catch (InterruptedException e) {
fail("Test Interupted.");
}
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, counter.getNumReceived());
+ assertEquals(numMessages, counter.getNumEmitted());
+ assertEquals(0, counter.getNumUnhandledErrors());
+ assertEquals(0.0, counter.getErrorRate(), 0.0);
}
@Test
@@ -123,6 +150,8 @@ public class BasicTasksTest {
int numMessages = 100;
DatumCounterWriter writer = new DatumCounterWriter("");
StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
@@ -150,7 +179,6 @@ public class BasicTasksTest {
}
}
task.stopTask();
- assertEquals(numMessages, writer.getDatumsCounted());
service.shutdown();
try {
if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -161,6 +189,11 @@ public class BasicTasksTest {
} catch (InterruptedException e) {
fail("Test Interupted.");
}
+ assertEquals(numMessages, writer.getDatumsCounted());
+ assertEquals(numMessages, counter.getNumReceived());
+ assertEquals(0, counter.getNumEmitted());
+ assertEquals(0, counter.getNumUnhandledErrors());
+ assertEquals(0.0, counter.getErrorRate(), 0.0);
}
@Test