You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 23:33:59 UTC

git commit: fixed datum leak

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning 6adb12a33 -> c2a858db2


fixed datum leak


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2a858db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2a858db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2a858db

Branch: refs/heads/springcleaning
Commit: c2a858db2721175453901b428440e3c45343fd9b
Parents: 6adb12a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 17:33:46 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 17:33:46 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/local/tasks/BaseStreamsTask.java  | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a858db/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index b9af0fd..3799480 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -76,15 +76,16 @@ public abstract class BaseStreamsTask implements StreamsTask {
      */
     protected void addToOutgoingQueue(StreamsDatum datum) {
         if(this.outQueues.size() == 1) {
-            this.outQueues.get(0).offer(datum);
+            enqueue(outQueues.get(0), datum);
         }
         else {
             StreamsDatum newDatum = null;
             for(Queue<StreamsDatum> queue : this.outQueues) {
                 try {
                     newDatum = cloneStreamsDatum(datum);
-                    if(newDatum != null)
-                        queue.offer(newDatum);
+                    if(newDatum != null) {
+                        enqueue(queue, newDatum);
+                    }
                 } catch (RuntimeException e) {
                     LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
                     LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
@@ -146,4 +147,12 @@ public abstract class BaseStreamsTask implements StreamsTask {
         return this.inIndex;
     }
 
+    private void enqueue( Queue<StreamsDatum> queue, StreamsDatum entry ) {
+        boolean success;
+        do {
+            success = queue.offer(entry);
+            Thread.yield();
+        }
+        while( !success );
+    }
 }