You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 22:19:35 UTC
git commit: a few missing files
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning ec28cc5e0 -> 6adb12a33
a few missing files
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6adb12a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6adb12a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6adb12a3
Branch: refs/heads/springcleaning
Commit: 6adb12a33d62a1ef99929f92c75f6072993d7bf6
Parents: ec28cc5
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 16:19:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 16:19:33 2014 -0500
----------------------------------------------------------------------
.../streams/core/DatumStatusCountable.java | 10 ++++
.../local/builders/LocalStreamBuilder.java | 6 ++-
.../local/tasks/StatusCounterMonitorThread.java | 56 ++++++++++++++++++++
3 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
new file mode 100644
index 0000000..4fec919
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
@@ -0,0 +1,10 @@
+package org.apache.streams.core;
+
+/**
+ * Created by steveblackmon on 3/24/14.
+ */
+public interface DatumStatusCountable {
+
+ public DatumStatusCounter getDatumStatusCounter();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index d570573..bf1abe6 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -154,8 +154,10 @@ public class LocalStreamBuilder implements StreamBuilder {
task.setStreamConfig(this.streamConfig);
this.executor.submit(task);
compTasks.add(task);
- if( comp.isOperationCountable() )
- this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)comp.getOperation(), 10));
+ if( comp.isOperationCountable() ) {
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+ }
}
streamsTasks.put(comp.getId(), compTasks);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
new file mode 100644
index 0000000..c6febbe
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
@@ -0,0 +1,56 @@
+package org.apache.streams.local.tasks;
+
+import org.apache.streams.core.DatumStatusCountable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatusCounterMonitorThread implements Runnable
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class);
+
+ private DatumStatusCountable task;
+
+ private int seconds;
+
+ private boolean run = true;
+
+ public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) {
+ this.task = task;
+ this.seconds = delayInSeconds;
+ }
+
+ public void shutdown(){
+ this.run = false;
+ }
+
+ @Override
+ public void run()
+ {
+ while(run){
+
+ /**
+ *
+ * Note:
+ * Quick class and method to let us see what is going on with the JVM. We need to make sure
+ * that everything is running with as little memory as possible. If we are generating a heap
+ * overflow, this will be very apparent by the information shown here.
+ */
+
+ LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, {} total",
+ task.getClass(),
+ task.getDatumStatusCounter().getAttempted(),
+ task.getDatumStatusCounter().getSuccess(),
+ task.getDatumStatusCounter().getPartial(),
+ task.getDatumStatusCounter().getFail(),
+ task.getDatumStatusCounter().getEmitted());
+
+ try
+ {
+ Thread.sleep(seconds*1000);
+ }
+ catch (InterruptedException e)
+ { }
+ }
+ }
+
+}