You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 22:19:35 UTC

git commit: a few missing files

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning ec28cc5e0 -> 6adb12a33


a few missing files


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6adb12a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6adb12a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6adb12a3

Branch: refs/heads/springcleaning
Commit: 6adb12a33d62a1ef99929f92c75f6072993d7bf6
Parents: ec28cc5
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 16:19:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 16:19:33 2014 -0500

----------------------------------------------------------------------
 .../streams/core/DatumStatusCountable.java      | 10 ++++
 .../local/builders/LocalStreamBuilder.java      |  6 ++-
 .../local/tasks/StatusCounterMonitorThread.java | 56 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
new file mode 100644
index 0000000..4fec919
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
@@ -0,0 +1,10 @@
+package org.apache.streams.core;
+
+/**
+ * Created by steveblackmon on 3/24/14.
+ */
+public interface DatumStatusCountable {
+
+    public DatumStatusCounter getDatumStatusCounter();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index d570573..bf1abe6 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -154,8 +154,10 @@ public class LocalStreamBuilder implements StreamBuilder {
                     task.setStreamConfig(this.streamConfig);
                     this.executor.submit(task);
                     compTasks.add(task);
-                    if( comp.isOperationCountable() )
-                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)comp.getOperation(), 10));
+                    if( comp.isOperationCountable() ) {
+                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+                    }
                 }
                 streamsTasks.put(comp.getId(), compTasks);
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6adb12a3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
new file mode 100644
index 0000000..c6febbe
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
@@ -0,0 +1,56 @@
+package org.apache.streams.local.tasks;
+
+import org.apache.streams.core.DatumStatusCountable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatusCounterMonitorThread implements Runnable
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class);
+
+    private DatumStatusCountable task;
+
+    private int seconds;
+
+    private boolean run = true;
+
+    public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) {
+        this.task = task;
+        this.seconds = delayInSeconds;
+    }
+
+    public void shutdown(){
+        this.run = false;
+    }
+
+    @Override
+    public void run()
+    {
+        while(run){
+
+            /**
+             *
+             * Note:
+             * Quick class and method to let us see what is going on with the JVM. We need to make sure
+             * that everything is running with as little memory as possible. If we are generating a heap
+             * overflow, this will be very apparent by the information shown here.
+             */
+
+            LOGGER.debug("{}: {} attempted, {} success, {} partial, {} failed, {} total",
+                    task.getClass(),
+                    task.getDatumStatusCounter().getAttempted(),
+                    task.getDatumStatusCounter().getSuccess(),
+                    task.getDatumStatusCounter().getPartial(),
+                    task.getDatumStatusCounter().getFail(),
+                    task.getDatumStatusCounter().getEmitted());
+
+            try
+            {
+                Thread.sleep(seconds*1000);
+            }
+            catch (InterruptedException e)
+            { }
+        }
+    }
+
+}