You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/13 01:04:52 UTC

[08/13] git commit: additionalProperties in serialized objects plague this module so i'm adding a processor any implemention can use to fix it

additionalProperties in serialized objects plague this module
so i'm adding a processor any implemention can use to fix it


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9acc2f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9acc2f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9acc2f05

Branch: refs/heads/STREAMS-142
Commit: 9acc2f054f35b489bbb136f5ee91acc8cc3e5bb5
Parents: d387b1d
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Aug 7 17:41:32 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Aug 12 18:04:23 2014 -0500

----------------------------------------------------------------------
 .../CleanAdditionalPropertiesProcessor.java     |  62 +++++++
 .../DatasiftTypeConverterProcessor.java         | 168 +++++++++++++++++++
 .../DatasiftTypeConverterProcessor.java         | 167 ------------------
 .../DatasiftTypeConverterProcessorTest.java     |   1 +
 4 files changed, 231 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java
new file mode 100644
index 0000000..cc770db
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java
@@ -0,0 +1,62 @@
+package org.apache.streams.datasift.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HACK PROCESSOR.  Changes need to be made in apache streams to fix this issue long term.
+ */
+public class CleanAdditionalPropertiesProcessor implements StreamsProcessor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class);
+
+    private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
+    private static final String EXTENSIONS = "extensions";
+
+    private ObjectMapper mapper;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum datum) {
+        List<StreamsDatum> result = Lists.newLinkedList();
+        ObjectNode activity = this.mapper.convertValue(datum.getDocument(), ObjectNode.class);
+        cleanAdditionalProperties(activity);
+        datum.setDocument(activity);
+        result.add(datum);
+        return result;
+    }
+
+    @Override
+    public void prepare(Object o) {
+        this.mapper = StreamsJacksonMapper.getInstance();
+        this.mapper.registerModule(new JsonOrgModule());
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    public void cleanAdditionalProperties(ObjectNode node) {
+        if( node.get("additionalProperties") != null ) {
+            ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
+            cleanAdditionalProperties(additionalProperties);
+            Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = additionalProperties.fields();
+            while( jsonNodeIterator.hasNext() ) {
+                Map.Entry<String, JsonNode> entry = jsonNodeIterator.next();
+                node.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
new file mode 100644
index 0000000..680c7ea
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
@@ -0,0 +1,168 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.provider.DatasiftConverter;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class DatasiftTypeConverterProcessor implements StreamsProcessor {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
+
+    private ObjectMapper mapper;
+    private Class outClass;
+    private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
+    private DatasiftConverter converter;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public DatasiftTypeConverterProcessor(Class outClass) {
+        this.outClass = outClass;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newLinkedList();
+        Object doc;
+        try {
+            doc = this.converter.convert(entry.getDocument(), this.mapper);
+            if(doc != null) {
+                result.add(new StreamsDatum(doc, entry.getId()));
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
+        }
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.mapper = StreamsDatasiftMapper.getInstance();
+        this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+        if(this.outClass.equals(Activity.class)) {
+            this.converter = new ActivityConverter();
+        } else if (this.outClass.equals(String.class)) {
+            this.converter = new StringConverter();
+        } else {
+            LOGGER.warn("Using defaulting datasift converter");
+            this.converter = new DefaultConverter(this.outClass);
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    private class ActivityConverter implements DatasiftConverter {
+
+        @Override
+        public Object convert(Object toConvert, ObjectMapper mapper) {
+            if(toConvert instanceof Activity)
+                return toConvert;
+            try {
+                if(toConvert instanceof String)
+                    return datasiftInteractionActivitySerializer.deserialize((String) toConvert);
+                return mapper.convertValue(toConvert, Activity.class);
+            } catch (Exception e) {
+                LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass());
+                LOGGER.error("Exception : {}", e);
+                e.printStackTrace();
+                return null;
+            }
+        }
+
+
+    }
+
+    private class StringConverter implements DatasiftConverter {
+        @Override
+        public Object convert(Object toConvert, ObjectMapper mapper) {
+            try {
+                if(toConvert instanceof String){
+                    return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
+                } else {
+                    if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
+                        ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
+                        if(node.has("additionalProperties")) {
+                            ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
+//                            node.put("user_mentions", additionalProperties.get("user_mentions"));
+                            node.putAll(additionalProperties);
+                            node.remove("additionalProperties");
+                        }
+                        if(node.has("actor")) {
+                            ObjectNode actor = (ObjectNode) node.get("actor");
+                            if(actor.has("additionalProperties")) {
+                                ObjectNode additionalProperties = (ObjectNode) actor.get("additionalProperties");
+                                actor.putAll(additionalProperties);
+                                actor.remove("additionalProperties");
+                            }
+                        }
+                        return mapper.writeValueAsString(node);
+                    } else
+                        return mapper.writeValueAsString(toConvert);
+                }
+            } catch (Exception e) {
+                LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass());
+                LOGGER.error("Exception : {}", e);
+                return null;
+            }
+        }
+
+
+    }
+
+    private class DefaultConverter implements DatasiftConverter {
+
+        private Class clazz;
+
+        public DefaultConverter(Class clazz) {
+            this.clazz = clazz;
+        }
+
+        @Override
+        public Object convert(Object toConvert, ObjectMapper mapper) {
+            try {
+                if(toConvert instanceof String) {
+                    return mapper.readValue((String) toConvert, this.clazz);
+                } else {
+                    return mapper.convertValue(toConvert, this.clazz);
+                }
+
+            } catch (Exception e) {
+                throw new RuntimeException("Failed converting +"+ toConvert.getClass().getName()+" to "+ this.clazz.getName());
+            }
+        }
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
deleted file mode 100644
index 0b847a4..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessor implements StreamsProcessor {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
-
-    private ObjectMapper mapper;
-    private Class outClass;
-    private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
-    private DatasiftConverter converter;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public DatasiftTypeConverterProcessor(Class outClass) {
-        this.outClass = outClass;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Object doc;
-        try {
-            doc = this.converter.convert(entry.getDocument(), this.mapper);
-            if(doc != null) {
-                result.add(new StreamsDatum(doc, entry.getId()));
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
-        }
-        return result;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.mapper = StreamsDatasiftMapper.getInstance();
-        this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
-        if(this.outClass.equals(Activity.class)) {
-            this.converter = new ActivityConverter();
-        } else if (this.outClass.equals(String.class)) {
-            this.converter = new StringConverter();
-        } else {
-            LOGGER.warn("Using defaulting datasift converter");
-            this.converter = new DefaultConverter(this.outClass);
-        }
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    private class ActivityConverter implements DatasiftConverter {
-
-        @Override
-        public Object convert(Object toConvert, ObjectMapper mapper) {
-            if(toConvert instanceof Activity)
-                return toConvert;
-            try {
-                if(toConvert instanceof String)
-                    return datasiftInteractionActivitySerializer.deserialize((String) toConvert);
-                return mapper.convertValue(toConvert, Activity.class);
-            } catch (Exception e) {
-                LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass());
-                LOGGER.error("Exception : {}", e);
-                e.printStackTrace();
-                return null;
-            }
-        }
-
-
-    }
-
-    private class StringConverter implements DatasiftConverter {
-        @Override
-        public Object convert(Object toConvert, ObjectMapper mapper) {
-            try {
-                if(toConvert instanceof String){
-                    return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
-                } else {
-                    if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
-                        ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
-                        if(node.has("additionalProperties")) {
-                            ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
-//                            node.put("user_mentions", additionalProperties.get("user_mentions"));
-                            node.putAll(additionalProperties);
-                            node.remove("additionalProperties");
-                        }
-                        if(node.has("actor")) {
-                            ObjectNode actor = (ObjectNode) node.get("actor");
-                            if(actor.has("additionalProperties")) {
-                                ObjectNode additionalProperties = (ObjectNode) actor.get("additionalProperties");
-                                actor.putAll(additionalProperties);
-                                actor.remove("additionalProperties");
-                            }
-                        }
-                        return mapper.writeValueAsString(node);
-                    } else
-                        return mapper.writeValueAsString(toConvert);
-                }
-            } catch (Exception e) {
-                LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass());
-                LOGGER.error("Exception : {}", e);
-                return null;
-            }
-        }
-
-
-    }
-
-    private class DefaultConverter implements DatasiftConverter {
-
-        private Class clazz;
-
-        public DefaultConverter(Class clazz) {
-            this.clazz = clazz;
-        }
-
-        @Override
-        public Object convert(Object toConvert, ObjectMapper mapper) {
-            try {
-                if(toConvert instanceof String) {
-                    return mapper.readValue((String) toConvert, this.clazz);
-                } else {
-                    return mapper.convertValue(toConvert, this.clazz);
-                }
-
-            } catch (Exception e) {
-                throw new RuntimeException("Failed converting +"+ toConvert.getClass().getName()+" to "+ this.clazz.getName());
-            }
-        }
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
index 7d44e7f..fac0f02 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
@@ -20,6 +20,7 @@ package org.apache.streams.datasift.provider;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.datasift.processor.DatasiftTypeConverterProcessor;
 import org.apache.streams.pojo.json.Activity;
 import org.junit.Test;