You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 23:33:59 UTC
git commit: fixed datum leak
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning 6adb12a33 -> c2a858db2
fixed datum leak
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2a858db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2a858db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2a858db
Branch: refs/heads/springcleaning
Commit: c2a858db2721175453901b428440e3c45343fd9b
Parents: 6adb12a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 17:33:46 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 17:33:46 2014 -0500
----------------------------------------------------------------------
.../apache/streams/local/tasks/BaseStreamsTask.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a858db/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index b9af0fd..3799480 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -76,15 +76,16 @@ public abstract class BaseStreamsTask implements StreamsTask {
*/
protected void addToOutgoingQueue(StreamsDatum datum) {
if(this.outQueues.size() == 1) {
- this.outQueues.get(0).offer(datum);
+ enqueue(outQueues.get(0), datum);
}
else {
StreamsDatum newDatum = null;
for(Queue<StreamsDatum> queue : this.outQueues) {
try {
newDatum = cloneStreamsDatum(datum);
- if(newDatum != null)
- queue.offer(newDatum);
+ if(newDatum != null) {
+ enqueue(queue, newDatum);
+ }
} catch (RuntimeException e) {
LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
@@ -146,4 +147,12 @@ public abstract class BaseStreamsTask implements StreamsTask {
return this.inIndex;
}
+ private void enqueue( Queue<StreamsDatum> queue, StreamsDatum entry ) {
+ boolean success;
+ do {
+ success = queue.offer(entry);
+ Thread.yield();
+ }
+ while( !success );
+ }
}