You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/13 01:04:52 UTC
[08/13] git commit: additionalProperties in serialized objects plague
this module so i'm adding a processor any implemention can use to fix it
additionalProperties in serialized objects plague this module
so i'm adding a processor any implemention can use to fix it
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9acc2f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9acc2f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9acc2f05
Branch: refs/heads/STREAMS-142
Commit: 9acc2f054f35b489bbb136f5ee91acc8cc3e5bb5
Parents: d387b1d
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Aug 7 17:41:32 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Aug 12 18:04:23 2014 -0500
----------------------------------------------------------------------
.../CleanAdditionalPropertiesProcessor.java | 62 +++++++
.../DatasiftTypeConverterProcessor.java | 168 +++++++++++++++++++
.../DatasiftTypeConverterProcessor.java | 167 ------------------
.../DatasiftTypeConverterProcessorTest.java | 1 +
4 files changed, 231 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java
new file mode 100644
index 0000000..cc770db
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/CleanAdditionalPropertiesProcessor.java
@@ -0,0 +1,62 @@
+package org.apache.streams.datasift.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HACK PROCESSOR. Changes need to be made in apache streams to fix this issue long term.
+ */
+public class CleanAdditionalPropertiesProcessor implements StreamsProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class);
+
+ private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
+ private static final String EXTENSIONS = "extensions";
+
+ private ObjectMapper mapper;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum datum) {
+ List<StreamsDatum> result = Lists.newLinkedList();
+ ObjectNode activity = this.mapper.convertValue(datum.getDocument(), ObjectNode.class);
+ cleanAdditionalProperties(activity);
+ datum.setDocument(activity);
+ result.add(datum);
+ return result;
+ }
+
+ @Override
+ public void prepare(Object o) {
+ this.mapper = StreamsJacksonMapper.getInstance();
+ this.mapper.registerModule(new JsonOrgModule());
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ public void cleanAdditionalProperties(ObjectNode node) {
+ if( node.get("additionalProperties") != null ) {
+ ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
+ cleanAdditionalProperties(additionalProperties);
+ Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = additionalProperties.fields();
+ while( jsonNodeIterator.hasNext() ) {
+ Map.Entry<String, JsonNode> entry = jsonNodeIterator.next();
+ node.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
new file mode 100644
index 0000000..680c7ea
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
@@ -0,0 +1,168 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.provider.DatasiftConverter;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class DatasiftTypeConverterProcessor implements StreamsProcessor {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
+
+ private ObjectMapper mapper;
+ private Class outClass;
+ private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
+ private DatasiftConverter converter;
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public DatasiftTypeConverterProcessor(Class outClass) {
+ this.outClass = outClass;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> result = Lists.newLinkedList();
+ Object doc;
+ try {
+ doc = this.converter.convert(entry.getDocument(), this.mapper);
+ if(doc != null) {
+ result.add(new StreamsDatum(doc, entry.getId()));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
+ }
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.mapper = StreamsDatasiftMapper.getInstance();
+ this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+ if(this.outClass.equals(Activity.class)) {
+ this.converter = new ActivityConverter();
+ } else if (this.outClass.equals(String.class)) {
+ this.converter = new StringConverter();
+ } else {
+ LOGGER.warn("Using defaulting datasift converter");
+ this.converter = new DefaultConverter(this.outClass);
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ private class ActivityConverter implements DatasiftConverter {
+
+ @Override
+ public Object convert(Object toConvert, ObjectMapper mapper) {
+ if(toConvert instanceof Activity)
+ return toConvert;
+ try {
+ if(toConvert instanceof String)
+ return datasiftInteractionActivitySerializer.deserialize((String) toConvert);
+ return mapper.convertValue(toConvert, Activity.class);
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass());
+ LOGGER.error("Exception : {}", e);
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+
+ }
+
+ private class StringConverter implements DatasiftConverter {
+ @Override
+ public Object convert(Object toConvert, ObjectMapper mapper) {
+ try {
+ if(toConvert instanceof String){
+ return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
+ } else {
+ if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
+ ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
+ if(node.has("additionalProperties")) {
+ ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
+// node.put("user_mentions", additionalProperties.get("user_mentions"));
+ node.putAll(additionalProperties);
+ node.remove("additionalProperties");
+ }
+ if(node.has("actor")) {
+ ObjectNode actor = (ObjectNode) node.get("actor");
+ if(actor.has("additionalProperties")) {
+ ObjectNode additionalProperties = (ObjectNode) actor.get("additionalProperties");
+ actor.putAll(additionalProperties);
+ actor.remove("additionalProperties");
+ }
+ }
+ return mapper.writeValueAsString(node);
+ } else
+ return mapper.writeValueAsString(toConvert);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass());
+ LOGGER.error("Exception : {}", e);
+ return null;
+ }
+ }
+
+
+ }
+
+ private class DefaultConverter implements DatasiftConverter {
+
+ private Class clazz;
+
+ public DefaultConverter(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public Object convert(Object toConvert, ObjectMapper mapper) {
+ try {
+ if(toConvert instanceof String) {
+ return mapper.readValue((String) toConvert, this.clazz);
+ } else {
+ return mapper.convertValue(toConvert, this.clazz);
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed converting +"+ toConvert.getClass().getName()+" to "+ this.clazz.getName());
+ }
+ }
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
deleted file mode 100644
index 0b847a4..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessor implements StreamsProcessor {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
-
- private ObjectMapper mapper;
- private Class outClass;
- private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
- private DatasiftConverter converter;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public DatasiftTypeConverterProcessor(Class outClass) {
- this.outClass = outClass;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- List<StreamsDatum> result = Lists.newLinkedList();
- Object doc;
- try {
- doc = this.converter.convert(entry.getDocument(), this.mapper);
- if(doc != null) {
- result.add(new StreamsDatum(doc, entry.getId()));
- }
- } catch (Exception e) {
- LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
- }
- return result;
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.mapper = StreamsDatasiftMapper.getInstance();
- this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
- if(this.outClass.equals(Activity.class)) {
- this.converter = new ActivityConverter();
- } else if (this.outClass.equals(String.class)) {
- this.converter = new StringConverter();
- } else {
- LOGGER.warn("Using defaulting datasift converter");
- this.converter = new DefaultConverter(this.outClass);
- }
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- private class ActivityConverter implements DatasiftConverter {
-
- @Override
- public Object convert(Object toConvert, ObjectMapper mapper) {
- if(toConvert instanceof Activity)
- return toConvert;
- try {
- if(toConvert instanceof String)
- return datasiftInteractionActivitySerializer.deserialize((String) toConvert);
- return mapper.convertValue(toConvert, Activity.class);
- } catch (Exception e) {
- LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass());
- LOGGER.error("Exception : {}", e);
- e.printStackTrace();
- return null;
- }
- }
-
-
- }
-
- private class StringConverter implements DatasiftConverter {
- @Override
- public Object convert(Object toConvert, ObjectMapper mapper) {
- try {
- if(toConvert instanceof String){
- return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
- } else {
- if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
- ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
- if(node.has("additionalProperties")) {
- ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
-// node.put("user_mentions", additionalProperties.get("user_mentions"));
- node.putAll(additionalProperties);
- node.remove("additionalProperties");
- }
- if(node.has("actor")) {
- ObjectNode actor = (ObjectNode) node.get("actor");
- if(actor.has("additionalProperties")) {
- ObjectNode additionalProperties = (ObjectNode) actor.get("additionalProperties");
- actor.putAll(additionalProperties);
- actor.remove("additionalProperties");
- }
- }
- return mapper.writeValueAsString(node);
- } else
- return mapper.writeValueAsString(toConvert);
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass());
- LOGGER.error("Exception : {}", e);
- return null;
- }
- }
-
-
- }
-
- private class DefaultConverter implements DatasiftConverter {
-
- private Class clazz;
-
- public DefaultConverter(Class clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public Object convert(Object toConvert, ObjectMapper mapper) {
- try {
- if(toConvert instanceof String) {
- return mapper.readValue((String) toConvert, this.clazz);
- } else {
- return mapper.convertValue(toConvert, this.clazz);
- }
-
- } catch (Exception e) {
- throw new RuntimeException("Failed converting +"+ toConvert.getClass().getName()+" to "+ this.clazz.getName());
- }
- }
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9acc2f05/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
index 7d44e7f..fac0f02 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
@@ -20,6 +20,7 @@ package org.apache.streams.datasift.provider;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.datasift.processor.DatasiftTypeConverterProcessor;
import org.apache.streams.pojo.json.Activity;
import org.junit.Test;