You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/20 19:29:02 UTC
[10/11] git commit: STREAMS-191 | Changed addToOutgoingQueue to
reduce code complexity and to clarify code
STREAMS-191 | Changed addToOutgoingQueue to reduce code complexity and to clarify code
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7882f148
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7882f148
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7882f148
Branch: refs/heads/master
Commit: 7882f1480399289b1580235120fa7ee6bc7cd0ff
Parents: ae01621
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 15 11:03:23 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 15 11:03:23 2014 -0500
----------------------------------------------------------------------
.../streams/local/tasks/BaseStreamsTask.java | 33 +++++---------------
1 file changed, 7 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7882f148/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 902a2d7..6755d77 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -99,36 +99,17 @@ public abstract class BaseStreamsTask implements StreamsTask {
outQueues.get(0).put(datum);
}
else {
- StreamsDatum newDatum = null;
- List<BlockingQueue<StreamsDatum>> failedQueues = Lists.newLinkedList();
- // TODO
- // Needs to be optimized better but workable now
- // Adds datums to queues that aren't full, then adds to full queues with blocking
- for(BlockingQueue<StreamsDatum> queue : this.outQueues) {
- try {
- newDatum = cloneStreamsDatum(datum);
- if(newDatum != null) {
- if(!queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) {
- failedQueues.add(queue);
+ List<BlockingQueue<StreamsDatum>> toOutput = Lists.newLinkedList(this.outQueues);
+ while(!toOutput.isEmpty()) {
+ for (BlockingQueue<StreamsDatum> queue : toOutput) {
+ StreamsDatum newDatum = cloneStreamsDatum(datum);
+ if (newDatum != null) {
+ if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) {
+ toOutput.remove(queue);
}
}
- } catch (RuntimeException e) {
- LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
- LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
}
}
- for(BlockingQueue<StreamsDatum> queue : failedQueues) {
- try {
- newDatum = cloneStreamsDatum(datum);
- if(newDatum != null) {
- queue.put(newDatum);
- }
- } catch (RuntimeException e) {
- LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
- LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
- }
- }
-
}
}