You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:28:11 UTC

[38/53] [abbrv] git commit: Fixed bug where meta data was not be copied to cloned datums

Fixed bug where meta data was not be copied to cloned datums


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a6bd76a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a6bd76a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a6bd76a4

Branch: refs/heads/master
Commit: a6bd76a49f5f8f7f4b94ea390d6c90a8c128c6ed
Parents: d8feb5b
Author: rebanks <re...@w2odigital.com>
Authored: Wed Apr 2 16:15:43 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Wed Apr 2 16:15:43 2014 -0500

----------------------------------------------------------------------
 .../streams/local/tasks/BaseStreamsTask.java    | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a6bd76a4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 694cb76..8006560 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -10,10 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 
 /**
  *
@@ -116,13 +113,13 @@ public abstract class BaseStreamsTask implements StreamsTask {
         try {
 
             if(datum.document instanceof ObjectNode) {
-                return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
+                return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid));
             }
             else if(datum.document instanceof Activity) {
 
-                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
+                return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
                                         datum.timestamp,
-                                        datum.sequenceid);
+                                        datum.sequenceid));
             }
 //            else if(this.mapper.canSerialize(datum.document.getClass())){
 //                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
@@ -156,4 +153,17 @@ public abstract class BaseStreamsTask implements StreamsTask {
         }
         while( !success );
     }
+
+    private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) {
+        Map<String, Object> fromMeta = copyFrom.getMetadata();
+        Map<String, Object> toMeta = copyTo.getMetadata();
+        for(String key : fromMeta.keySet()) {
+            Object value = fromMeta.get(key);
+            if(value instanceof Serializable)
+                toMeta.put(key, SerializationUtil.cloneBySerialization(value));
+            else //hope for the best - should be serializable
+                toMeta.put(key, value);
+        }
+        return copyTo;
+    }
 }