You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/20 19:29:02 UTC

[10/11] git commit: STREAMS-191 | Changed addToOutgoingQueue to reduce code complexity and to clarify code

STREAMS-191 | Changed addToOutgoingQueue to reduce code complexity and to clarify code


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7882f148
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7882f148
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7882f148

Branch: refs/heads/master
Commit: 7882f1480399289b1580235120fa7ee6bc7cd0ff
Parents: ae01621
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 15 11:03:23 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 15 11:03:23 2014 -0500

----------------------------------------------------------------------
 .../streams/local/tasks/BaseStreamsTask.java    | 33 +++++---------------
 1 file changed, 7 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7882f148/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 902a2d7..6755d77 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -99,36 +99,17 @@ public abstract class BaseStreamsTask implements StreamsTask {
             outQueues.get(0).put(datum);
         }
         else {
-            StreamsDatum newDatum = null;
-            List<BlockingQueue<StreamsDatum>> failedQueues = Lists.newLinkedList();
-            // TODO
-            // Needs to be optimized better but workable now
-            // Adds datums to queues that aren't full, then adds to full queues with blocking
-            for(BlockingQueue<StreamsDatum> queue : this.outQueues) {
-                try {
-                    newDatum = cloneStreamsDatum(datum);
-                    if(newDatum != null) {
-                        if(!queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) {
-                            failedQueues.add(queue);
+            List<BlockingQueue<StreamsDatum>> toOutput = Lists.newLinkedList(this.outQueues);
+            while(!toOutput.isEmpty()) {
+                for (BlockingQueue<StreamsDatum> queue : toOutput) {
+                    StreamsDatum newDatum = cloneStreamsDatum(datum);
+                    if (newDatum != null) {
+                        if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) {
+                            toOutput.remove(queue);
                         }
                     }
-                } catch (RuntimeException e) {
-                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
-                    LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
                 }
             }
-            for(BlockingQueue<StreamsDatum> queue : failedQueues) {
-                try {
-                    newDatum = cloneStreamsDatum(datum);
-                    if(newDatum != null) {
-                        queue.put(newDatum);
-                    }
-                } catch (RuntimeException e) {
-                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
-                    LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
-                }
-            }
-
         }
     }