You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/08 21:13:03 UTC

[12/38] incubator-streams git commit: Implemented new StreamTasksCounter into StreamsTasks

Implemented new StreamTasksCounter into StreamsTasks


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7e65a423
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7e65a423
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7e65a423

Branch: refs/heads/STREAMS-49
Commit: 7e65a423f31c2fc4540b72ade6be904c77638784
Parents: d305371
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 14:16:40 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 14:16:40 2014 -0500

----------------------------------------------------------------------
 .../streams/local/tasks/StreamsMergeTask.java   |  7 ++++
 .../local/tasks/StreamsPersistWriterTask.java   | 18 +++++++--
 .../local/tasks/StreamsProcessorTask.java       | 20 ++++++++--
 .../local/tasks/StreamsProviderTask.java        | 15 +++++++
 .../apache/streams/local/tasks/StreamsTask.java |  4 ++
 .../streams/local/tasks/BasicTasksTest.java     | 41 ++++++++++++++++++--
 6 files changed, 93 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 7a4c806..8280f29 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -19,6 +19,8 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,4 +80,9 @@ public class StreamsMergeTask extends BaseStreamsTask {
             }
         }
     }
+
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        throw new NotImplementedException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index cab46b8..003ab9e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -20,13 +20,11 @@ package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.*;
 import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +43,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     private BlockingQueue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
     private AtomicBoolean blocked;
+    private StreamsTaskCounter counter;
 
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
@@ -99,6 +98,9 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     public void run() {
         try {
             this.writer.prepare(this.streamConfig);
+            if(this.counter == null) {
+                this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString());
+            }
             while(this.keepRunning.get()) {
                 StreamsDatum datum = null;
                 try {
@@ -111,14 +113,18 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
                     Thread.currentThread().interrupt();
                 }
                 if(datum != null) {
+                    this.counter.incrementReceivedCount();
                     try {
+                        long startTime = System.currentTimeMillis();
                         this.writer.write(datum);
+                        this.counter.addTime(System.currentTimeMillis() - startTime);
                         statusCounter.incrementStatus(DatumStatus.SUCCESS);
                     } catch (Exception e) {
                         LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
                         this.keepRunning.set(false); // why do we shutdown on a failed write ?
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                         DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
+                        this.counter.incrementErrorCount();
                     }
                 } else { //datums should never be null
                     LOGGER.debug("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
@@ -151,4 +157,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
         return queues;
     }
 
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        this.counter = counter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index ee69127..8d66847 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -21,13 +21,11 @@ package org.apache.streams.local.tasks;
 import com.google.common.collect.Maps;
 import org.apache.streams.core.*;
 import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,6 +45,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
     private BlockingQueue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
     private AtomicBoolean blocked;
+    private StreamsTaskCounter counter;
 
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
@@ -105,6 +104,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
     public void run() {
         try {
             this.processor.prepare(this.streamConfig);
+            if(this.counter == null) {
+                this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString());
+            }
             while(this.keepRunning.get()) {
                 StreamsDatum datum = null;
                 try {
@@ -117,11 +119,15 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
                     Thread.currentThread().interrupt();
                 }
                 if(datum != null) {
+                    this.counter.incrementReceivedCount();
                     try {
+                        long startTime = System.currentTimeMillis();
                         List<StreamsDatum> output = this.processor.process(datum);
+                        this.counter.addTime(System.currentTimeMillis() - startTime);
                         if(output != null) {
                             for(StreamsDatum outDatum : output) {
                                 super.addToOutgoingQueue(datum);
+                                this.counter.incrementEmittedCount();
                                 statusCounter.incrementStatus(DatumStatus.SUCCESS);
                             }
                         }
@@ -130,6 +136,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
                         this.keepRunning.set(false);
                         Thread.currentThread().interrupt();
                     } catch (Throwable t) {
+                        this.counter.incrementErrorCount();
                         LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t.getMessage());
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                         //Add the error to the metadata, but keep processing
@@ -151,4 +158,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
         queues.add(this.inQueue);
         return queues;
     }
+
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        this.counter = counter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index c16f64d..2475780 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.*;
 import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.math.BigInteger;
 import java.util.Map;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -64,6 +66,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     private long sleepTime;
     private int zeros = 0;
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
+    private StreamsTaskCounter counter;
 
     /**
      * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
@@ -145,13 +148,18 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             StreamsResultSet resultSet = null;
             //Negative values mean we want to run forever
             long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
+            if(this.counter == null) { //should never be null
+                this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString());
+            }
             switch(this.type) {
                 case PERPETUAL: {
                     provider.startStream();
                     this.started.set(true);
                     while(this.isRunning()) {
                         try {
+                            long startTime = System.currentTimeMillis();
                             resultSet = provider.readCurrent();
+                            this.counter.addTime(System.currentTimeMillis() - startTime);
                             if( resultSet.size() == 0 )
                                 zeros++;
                             else {
@@ -164,6 +172,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                             if(zeros > 0)
                                 Thread.sleep(sleepTime);
                         } catch (InterruptedException e) {
+                            this.counter.incrementErrorCount();
                             LOGGER.warn("Thread interrupted");
                             this.keepRunning.set(false);
                         }
@@ -219,8 +228,10 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             if(datum != null) {
                 try {
                     super.addToOutgoingQueue(datum);
+                    this.counter.incrementEmittedCount();
                     statusCounter.incrementStatus(DatumStatus.SUCCESS);
                 } catch( Exception e ) {
+                    this.counter.incrementErrorCount();
                     statusCounter.incrementStatus(DatumStatus.FAIL);
                     DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
                 }
@@ -229,4 +240,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.flushing.set(false);
     }
 
+    @Override
+    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+        this.counter = counter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 7513631..8423095 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -19,6 +19,7 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 
 import java.util.List;
 import java.util.Map;
@@ -87,4 +88,7 @@ public interface StreamsTask extends Runnable{
      */
     public List<BlockingQueue<StreamsDatum>> getOutputQueues();
 
+
+    public void setStreamsTaskCounter(StreamsTaskCounter counter);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e65a423/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index f524db0..f62250d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -19,12 +19,18 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.DatumStatusCounter;
+import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DatumCounterWriter;
+import org.junit.After;
 import org.junit.Test;
 
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
 import java.util.Queue;
 import java.util.concurrent.*;
 
@@ -36,6 +42,21 @@ import static org.junit.Assert.*;
 public class BasicTasksTest {
 
 
+    private static final String MBEAN_ID = "test_bean";
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
 
     @Test
     public void testProviderTask() {
@@ -77,7 +98,7 @@ public class BasicTasksTest {
             assertTrue("Task should have completed running in aloted time.", service.isTerminated());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-        };
+        }
     }
 
     @Test
@@ -85,6 +106,8 @@ public class BasicTasksTest {
         int numMessages = 100;
         PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
         StreamsProcessorTask task = new StreamsProcessorTask(processor);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        task.setStreamsTaskCounter(counter);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
         task.addOutputQueue(outQueue);
@@ -104,8 +127,7 @@ public class BasicTasksTest {
                 fail("Processor task failed to output "+numMessages+" in a timely fashion.");
             }
         }
-        task.stopTask();
-        assertEquals(numMessages, processor.getMessageCount());
+        task.stopTask();;
         service.shutdown();
         try {
             if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -116,6 +138,11 @@ public class BasicTasksTest {
         } catch (InterruptedException e) {
             fail("Test Interupted.");
         }
+        assertEquals(numMessages, processor.getMessageCount());
+        assertEquals(numMessages, counter.getNumReceived());
+        assertEquals(numMessages, counter.getNumEmitted());
+        assertEquals(0, counter.getNumUnhandledErrors());
+        assertEquals(0.0, counter.getErrorRate(), 0.0);
     }
 
     @Test
@@ -123,6 +150,8 @@ public class BasicTasksTest {
         int numMessages = 100;
         DatumCounterWriter writer = new DatumCounterWriter("");
         StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        task.setStreamsTaskCounter(counter);
         BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
 
@@ -150,7 +179,6 @@ public class BasicTasksTest {
             }
         }
         task.stopTask();
-        assertEquals(numMessages, writer.getDatumsCounted());
         service.shutdown();
         try {
             if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -161,6 +189,11 @@ public class BasicTasksTest {
         } catch (InterruptedException e) {
             fail("Test Interupted.");
         }
+        assertEquals(numMessages, writer.getDatumsCounted());
+        assertEquals(numMessages, counter.getNumReceived());
+        assertEquals(0, counter.getNumEmitted());
+        assertEquals(0, counter.getNumUnhandledErrors());
+        assertEquals(0.0, counter.getErrorRate(), 0.0);
     }
 
     @Test