You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/22 02:03:24 UTC
[02/10] git commit: Added processing time
Added processing time
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6916a1b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6916a1b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6916a1b7
Branch: refs/heads/master
Commit: 6916a1b78318f7dec9d73121e25e1f6b545c9a78
Parents: c030d43
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 17:53:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 17:53:34 2014 -0500
----------------------------------------------------------------------
.../local/counters/StreamsTaskCounter.java | 53 ++++++++++++++++++++
.../counters/StreamsTaskCounterMXBean.java | 10 ++++
2 files changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index ffd9f25..e864219 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -17,6 +17,7 @@
*/
package org.apache.streams.local.counters;
+import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +38,16 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
private AtomicLong emitted;
private AtomicLong received;
private AtomicLong errors;
+ private AtomicLong totalTime;
+ @GuardedBy("this")
+ private volatile long maxTime;
public StreamsTaskCounter(String id) {
this.emitted = new AtomicLong(0);
this.received = new AtomicLong(0);
this.errors = new AtomicLong(0);
+ this.totalTime = new AtomicLong(0);
+ this.maxTime = -1;
try {
ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -52,30 +58,64 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
}
}
+ /**
+ * Increment emitted count
+ */
public void incrementEmittedCount() {
this.incrementEmittedCount(1);
}
+ /**
+ * Increment emitted count
+ * @param delta
+ */
public void incrementEmittedCount(long delta) {
this.emitted.addAndGet(delta);
}
+ /**
+ * Increment error count
+ */
public void incrementErrorCount() {
this.incrementErrorCount(1);
}
+ /**
+ * Increment error count
+ * @param delta
+ */
public void incrementErrorCount(long delta) {
this.errors.addAndGet(delta);
}
+ /**
+ * Increment received count
+ */
public void incrementReceivedCount() {
this.incrementReceivedCount(1);
}
+ /**
+ * Increment received count
+ * @param delta
+ */
public void incrementReceivedCount(long delta) {
this.received.addAndGet(delta);
}
+ /**
+ * Add the time it takes to process a single datum in milliseconds
+ * @param processTime
+ */
+ public void addTime(long processTime) {
+ synchronized (this) {
+ if(processTime > this.maxTime) {
+ this.maxTime = processTime;
+ }
+ }
+ this.totalTime.addAndGet(processTime);
+ }
+
@Override
public double getErrorRate() {
if(this.received.get() == 0) {
@@ -98,4 +138,17 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
public long getNumUnhandledErrors() {
return this.errors.get();
}
+
+ @Override
+ public double getAvgTime() {
+ if(this.received.get() == 0) {
+ return 0.0;
+ }
+ return this.totalTime.get() / (double) this.received.get();
+ }
+
+ @Override
+ public long getMaxTime() {
+ return this.maxTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
index 634857d..8ac2e33 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -48,6 +48,16 @@ public interface StreamsTaskCounterMXBean {
*/
public long getNumUnhandledErrors();
+ /**
+ * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return.
+ * @return
+ */
+ public double getAvgTime();
+ /**
+ * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return.
+ * @return
+ */
+ public long getMaxTime();
}