You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:28:11 UTC
[38/53] [abbrv] git commit: Fixed bug where meta data was not be
copied to cloned datums
Fixed bug where meta data was not be copied to cloned datums
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a6bd76a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a6bd76a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a6bd76a4
Branch: refs/heads/master
Commit: a6bd76a49f5f8f7f4b94ea390d6c90a8c128c6ed
Parents: d8feb5b
Author: rebanks <re...@w2odigital.com>
Authored: Wed Apr 2 16:15:43 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Wed Apr 2 16:15:43 2014 -0500
----------------------------------------------------------------------
.../streams/local/tasks/BaseStreamsTask.java | 24 ++++++++++++++------
1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a6bd76a4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 694cb76..8006560 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -10,10 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
/**
*
@@ -116,13 +113,13 @@ public abstract class BaseStreamsTask implements StreamsTask {
try {
if(datum.document instanceof ObjectNode) {
- return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
+ return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid));
}
else if(datum.document instanceof Activity) {
- return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
+ return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
datum.timestamp,
- datum.sequenceid);
+ datum.sequenceid));
}
// else if(this.mapper.canSerialize(datum.document.getClass())){
// return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
@@ -156,4 +153,17 @@ public abstract class BaseStreamsTask implements StreamsTask {
}
while( !success );
}
+
+ private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) {
+ Map<String, Object> fromMeta = copyFrom.getMetadata();
+ Map<String, Object> toMeta = copyTo.getMetadata();
+ for(String key : fromMeta.keySet()) {
+ Object value = fromMeta.get(key);
+ if(value instanceof Serializable)
+ toMeta.put(key, SerializationUtil.cloneBySerialization(value));
+ else //hope for the best - should be serializable
+ toMeta.put(key, value);
+ }
+ return copyTo;
+ }
}