You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/22 02:03:24 UTC

[02/10] git commit: Added processing time

Added processing time


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6916a1b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6916a1b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6916a1b7

Branch: refs/heads/master
Commit: 6916a1b78318f7dec9d73121e25e1f6b545c9a78
Parents: c030d43
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 17:53:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 17:53:34 2014 -0500

----------------------------------------------------------------------
 .../local/counters/StreamsTaskCounter.java      | 53 ++++++++++++++++++++
 .../counters/StreamsTaskCounterMXBean.java      | 10 ++++
 2 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index ffd9f25..e864219 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streams.local.counters;
 
+import net.jcip.annotations.GuardedBy;
 import net.jcip.annotations.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,11 +38,16 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
     private AtomicLong emitted;
     private AtomicLong received;
     private AtomicLong errors;
+    private AtomicLong totalTime;
+    @GuardedBy("this")
+    private volatile long maxTime;
 
     public StreamsTaskCounter(String id) {
         this.emitted = new AtomicLong(0);
         this.received = new AtomicLong(0);
         this.errors = new AtomicLong(0);
+        this.totalTime = new AtomicLong(0);
+        this.maxTime = -1;
         try {
             ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -52,30 +58,64 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
         }
     }
 
+    /**
+     * Increment emitted count
+     */
     public void incrementEmittedCount() {
         this.incrementEmittedCount(1);
     }
 
+    /**
+     * Increment emitted count
+     * @param delta
+     */
     public void incrementEmittedCount(long delta) {
         this.emitted.addAndGet(delta);
     }
 
+    /**
+     * Increment error count
+     */
     public void incrementErrorCount() {
         this.incrementErrorCount(1);
     }
 
+    /**
+     * Increment error count
+     * @param delta
+     */
     public void incrementErrorCount(long delta) {
         this.errors.addAndGet(delta);
     }
 
+    /**
+     * Increment received count
+     */
     public void incrementReceivedCount() {
         this.incrementReceivedCount(1);
     }
 
+    /**
+     * Increment received count
+     * @param delta
+     */
     public void incrementReceivedCount(long delta) {
         this.received.addAndGet(delta);
     }
 
+    /**
+     * Add the time it takes to process a single datum in milliseconds
+     * @param processTime
+     */
+    public void addTime(long processTime) {
+        synchronized (this) {
+            if(processTime > this.maxTime) {
+                this.maxTime = processTime;
+            }
+        }
+        this.totalTime.addAndGet(processTime);
+    }
+
     @Override
     public double getErrorRate() {
         if(this.received.get() == 0) {
@@ -98,4 +138,17 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
     public long getNumUnhandledErrors() {
         return this.errors.get();
     }
+
+    @Override
+    public double getAvgTime() {
+        if(this.received.get() == 0) {
+            return 0.0;
+        }
+        return this.totalTime.get() / (double) this.received.get();
+    }
+
+    @Override
+    public long getMaxTime() {
+        return this.maxTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
index 634857d..8ac2e33 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -48,6 +48,16 @@ public interface StreamsTaskCounterMXBean {
      */
     public long getNumUnhandledErrors();
 
+    /**
+     * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return.
+     * @return
+     */
+    public double getAvgTime();
 
+    /**
+     * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return.
+     * @return
+     */
+    public long getMaxTime();
 
 }