You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:22 UTC
[41/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
index d39c087..cdba150 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
@@ -19,61 +19,72 @@ under the License.
package org.apache.streams.converter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.data.DocumentClassifier;
import org.apache.streams.data.util.ActivityUtil;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
+ * Ensures generic String and ObjectNode documents can be converted to Activity
+ *
+ * <p/>
* BaseDocumentClassifier is included by default in all
* @see org.apache.streams.converter.ActivityConverterProcessor
*
- * Ensures generic String and ObjectNode documents can be converted to Activity
- *
*/
public class BaseDocumentClassifier implements DocumentClassifier {
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- @Override
- @SuppressWarnings("unchecked")
- public List<Class> detectClasses(Object document) {
-
- Activity activity;
- ObjectNode node = null;
-
- List<Class> classes = new ArrayList<>();
- // Soon javax.validation will available in jackson
- // That will make this simpler and more powerful
- if( document instanceof String ) {
- classes.add(String.class);
- try {
- activity = this.mapper.readValue((String)document, Activity.class);
- if(activity != null && ActivityUtil.isValid(activity))
- classes.add(Activity.class);
- } catch (IOException e1) {
- try {
- node = this.mapper.readValue((String)document, ObjectNode.class);
- classes.add(ObjectNode.class);
- } catch (IOException ignored) { }
- }
- } else if( document instanceof ObjectNode ){
- classes.add(ObjectNode.class);
- activity = this.mapper.convertValue(document, Activity.class);
- if(ActivityUtil.isValid(activity))
- classes.add(Activity.class);
- } else {
- classes.add(document.getClass());
- }
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseDocumentClassifier.class);
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<Class> detectClasses(Object document) {
- return classes;
+ Activity activity;
+ ObjectNode node = null;
+ List<Class> classes = new ArrayList<>();
+ // Soon javax.validation will available in jackson
+ // That will make this simpler and more powerful
+ if ( document instanceof String ) {
+ classes.add(String.class);
+ try {
+ activity = this.mapper.readValue((String)document, Activity.class);
+ if (activity != null && ActivityUtil.isValid(activity)) {
+ classes.add(Activity.class);
+ }
+ } catch (IOException e1) {
+ try {
+ node = this.mapper.readValue((String)document, ObjectNode.class);
+ classes.add(ObjectNode.class);
+ } catch (IOException ignored) {
+ LOGGER.trace("ignoring ", ignored);
+ }
+ }
+ } else if ( document instanceof ObjectNode ) {
+ classes.add(ObjectNode.class);
+ activity = this.mapper.convertValue(document, Activity.class);
+ if (ActivityUtil.isValid(activity)) {
+ classes.add(Activity.class);
+ }
+ } else {
+ classes.add(document.getClass());
}
+ return classes;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
index b6cde29..cb61f0e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
@@ -19,75 +19,88 @@ under the License.
package org.apache.streams.converter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivityConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
/**
+ * Ensures generic ObjectNode representation of an Activity can be converted to Activity
+ *
+ * <p/>
* BaseObjectNodeActivityConverter is included by default in all
* @see {@link org.apache.streams.converter.ActivityConverterProcessor}
*
- * Ensures generic ObjectNode representation of an Activity can be converted to Activity
*
*/
public class BaseObjectNodeActivityConverter implements ActivityConverter<ObjectNode> {
- public static Class requiredClass = ObjectNode.class;
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseObjectNodeActivityConverter.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ public static Class requiredClass = ObjectNode.class;
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- @Override
- public String serializationFormat() {
- return null;
- }
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
- @Override
- public ObjectNode fromActivity(Activity deserialized) throws ActivityConversionException {
- try {
- return mapper.convertValue(deserialized, ObjectNode.class);
- } catch (Exception e) {
- throw new ActivityConversionException();
- }
- }
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
- @Override
- public List<Activity> toActivityList(ObjectNode serialized) throws ActivityConversionException {
- List<Activity> activityList = Lists.newArrayList();
- try {
- activityList.add(mapper.convertValue(serialized, Activity.class));
- } catch (Exception e) {
- throw new ActivityConversionException();
- } finally {
- return activityList;
- }
+ @Override
+ public ObjectNode fromActivity(Activity deserialized) throws ActivityConversionException {
+ try {
+ return mapper.convertValue(deserialized, ObjectNode.class);
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
}
+ }
+
+ @Override
+ public List<ObjectNode> fromActivityList(List<Activity> list) {
+ throw new NotImplementedException();
+ }
- @Override
- public List<ObjectNode> fromActivityList(List<Activity> list) {
- throw new NotImplementedException();
+ @Override
+ public List<Activity> toActivityList(ObjectNode serialized) throws ActivityConversionException {
+ List<Activity> activityList = Lists.newArrayList();
+ try {
+ activityList.add(mapper.convertValue(serialized, Activity.class));
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
+ } finally {
+ return activityList;
}
+ }
- @Override
- public List<Activity> toActivityList(List<ObjectNode> list) {
- List<Activity> result = Lists.newArrayList();
- for( ObjectNode item : list ) {
- try {
- result.addAll(toActivityList(item));
- } catch (ActivityConversionException e) {}
- }
- return result;
+ @Override
+ public List<Activity> toActivityList(List<ObjectNode> list) {
+ List<Activity> result = Lists.newArrayList();
+ for ( ObjectNode item : list ) {
+ try {
+ result.addAll(toActivityList(item));
+ } catch (ActivityConversionException ex) {
+ LOGGER.trace("ActivityConversionException", ex);
+ }
}
+ return result;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
index cb66414..a38585e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
@@ -19,58 +19,54 @@ under the License.
package org.apache.streams.converter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
import org.apache.streams.data.ActivityObjectConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
-import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
/**
+ * Ensures generic ObjectNode representation of an Activity can be converted to Activity.
+ *
+ * <p/>
* BaseObjectNodeActivityConverter is included by default in all
* @see {@link ActivityConverterProcessor}
*
- * Ensures generic ObjectNode representation of an Activity can be converted to Activity
- *
*/
public class BaseObjectNodeActivityObjectConverter implements ActivityObjectConverter<ObjectNode> {
- public static Class requiredClass = ObjectNode.class;
+ public static Class requiredClass = ObjectNode.class;
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
- @Override
- public String serializationFormat() {
- return null;
- }
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
- @Override
- public ObjectNode fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
- try {
- return mapper.convertValue(deserialized, ObjectNode.class);
- } catch (Exception e) {
- throw new ActivityConversionException();
- }
+ @Override
+ public ObjectNode fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+ try {
+ return mapper.convertValue(deserialized, ObjectNode.class);
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
}
+ }
- @Override
- public ActivityObject toActivityObject(ObjectNode serialized) throws ActivityConversionException {
- try {
- return mapper.convertValue(serialized, ActivityObject.class);
- } catch (Exception e) {
- throw new ActivityConversionException();
- }
+ @Override
+ public ActivityObject toActivityObject(ObjectNode serialized) throws ActivityConversionException {
+ try {
+ return mapper.convertValue(serialized, ActivityObject.class);
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
index 7438abb..da15dee 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
@@ -19,75 +19,86 @@ under the License.
package org.apache.streams.converter;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivityConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
/**
+ * Ensures generic String Json representation of an Activity can be converted to Activity
+ *
+ * <p/>
* BaseObjectNodeActivityConverter is included by default in all
* @see {@link org.apache.streams.converter.ActivityConverterProcessor}
*
- * Ensures generic String Json representation of an Activity can be converted to Activity
- *
*/
public class BaseStringActivityConverter implements ActivityConverter<String> {
- public static Class requiredClass = String.class;
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseObjectNodeActivityConverter.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ public static final Class requiredClass = String.class;
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- @Override
- public String serializationFormat() {
- return null;
- }
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
- @Override
- public String fromActivity(Activity deserialized) throws ActivityConversionException {
- try {
- return mapper.writeValueAsString(deserialized);
- } catch (JsonProcessingException e) {
- throw new ActivityConversionException();
- }
- }
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
- @Override
- public List<Activity> toActivityList(String serialized) throws ActivityConversionException {
- List<Activity> activityList = Lists.newArrayList();
- try {
- activityList.add(mapper.readValue(serialized, Activity.class));
- } catch (Exception e) {
- throw new ActivityConversionException();
- } finally {
- return activityList;
- }
+ @Override
+ public String fromActivity(Activity deserialized) throws ActivityConversionException {
+ try {
+ return mapper.writeValueAsString(deserialized);
+ } catch (JsonProcessingException ex) {
+ throw new ActivityConversionException();
}
+ }
+
+ @Override
+ public List<String> fromActivityList(List<Activity> list) {
+ throw new NotImplementedException();
+ }
- @Override
- public List<String> fromActivityList(List<Activity> list) {
- throw new NotImplementedException();
+ @Override
+ public List<Activity> toActivityList(String serialized) throws ActivityConversionException {
+ List<Activity> activityList = Lists.newArrayList();
+ try {
+ activityList.add(mapper.readValue(serialized, Activity.class));
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
+ } finally {
+ return activityList;
}
+ }
- @Override
- public List<Activity> toActivityList(List<String> list) {
- List<Activity> result = Lists.newArrayList();
- for( String item : list ) {
- try {
- result.addAll(toActivityList(item));
- } catch (ActivityConversionException e) {}
- }
- return result;
+ @Override
+ public List<Activity> toActivityList(List<String> list) {
+ List<Activity> result = Lists.newArrayList();
+ for ( String item : list ) {
+ try {
+ result.addAll(toActivityList(item));
+ } catch (ActivityConversionException ex) {
+ LOGGER.trace("ActivityConversionException", ex);
+ }
}
+ return result;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
index 3bbbdac..7322fc1 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
@@ -19,52 +19,53 @@ under the License.
package org.apache.streams.converter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.data.ActivityObjectConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.ActivityObject;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
+ * Ensures generic ObjectNode representation of an Activity can be converted to Activity.
+ *
+ * <p/>
* BaseObjectNodeActivityConverter is included by default in all
* @see {@link ActivityConverterProcessor}
*
- * Ensures generic ObjectNode representation of an Activity can be converted to Activity
- *
*/
public class BaseStringActivityObjectConverter implements ActivityObjectConverter<String> {
- public static Class requiredClass = String.class;
+ public static Class requiredClass = String.class;
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
- @Override
- public String serializationFormat() {
- return null;
- }
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
- @Override
- public String fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
- try {
- return mapper.writeValueAsString(deserialized);
- } catch (Exception e) {
- throw new ActivityConversionException();
- }
+ @Override
+ public String fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+ try {
+ return mapper.writeValueAsString(deserialized);
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
}
+ }
- @Override
- public ActivityObject toActivityObject(String serialized) throws ActivityConversionException {
- try {
- return mapper.readValue(serialized, ActivityObject.class);
- } catch (Exception e) {
- throw new ActivityConversionException();
- }
+ @Override
+ public ActivityObject toActivityObject(String serialized) throws ActivityConversionException {
+ try {
+ return mapper.readValue(serialized, ActivityObject.class);
+ } catch (Exception ex) {
+ throw new ActivityConversionException();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
index 26dfcb3..3443bd9 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
@@ -19,14 +19,14 @@
package org.apache.streams.converter;
/**
- * Predefined field symbols
+ * Predefined field symbols.
*/
public class FieldConstants {
- protected static final String ID = "ID";
- protected static final String SEQ = "SEQ";
- protected static final String TS = "TS";
- protected static final String META = "META";
- protected static final String DOC = "DOC";
+ protected static final String ID = "ID";
+ protected static final String SEQ = "SEQ";
+ protected static final String TS = "TS";
+ protected static final String META = "META";
+ protected static final String DOC = "DOC";
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
index b3ee72f..44aa56b 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
@@ -19,11 +19,12 @@ under the License.
package org.apache.streams.converter;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.core.util.DatumUtils;
-import org.apache.streams.pojo.json.Activity;
+
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,53 +34,62 @@ import java.util.List;
* HoconConverterProcessor is a utility processor for converting any datum document
* with translation rules expressed as HOCON in the classpath or at a URL.
*
+ * <p/>
* To use this capability without a dedicated stream processor, just use HoconConverterUtil.
*/
public class HoconConverterProcessor implements StreamsProcessor {
- public static final String STREAMS_ID = "HoconConverterProcessor";
+ public static final String STREAMS_ID = "HoconConverterProcessor";
- private final static Logger LOGGER = LoggerFactory.getLogger(HoconConverterProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HoconConverterProcessor.class);
- protected Class outClass;
- protected String hocon;
- protected String inPath;
- protected String outPath;
+ protected Class outClass;
+ protected String hocon;
+ protected String inPath;
+ protected String outPath;
- public HoconConverterProcessor(Class outClass, String hocon, String inPath, String outPath) {
- this.outClass = outClass;
- this.hocon = hocon;
- this.inPath = inPath;
- this.outPath = outPath;
- }
+ /**
+ * HoconConverterProcessor.
+ *
+ * @param outClass outClass
+ * @param hocon hocon
+ * @param inPath inPath
+ * @param outPath outPath
+ */
+ public HoconConverterProcessor(Class outClass, String hocon, String inPath, String outPath) {
+ this.outClass = outClass;
+ this.hocon = hocon;
+ this.inPath = inPath;
+ this.outPath = outPath;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
- List<StreamsDatum> result = Lists.newLinkedList();
- Object document = entry.getDocument();
+ List<StreamsDatum> result = Lists.newLinkedList();
+ Object document = entry.getDocument();
- Object outDoc = HoconConverterUtil.getInstance().convert(document, outClass, hocon, inPath, outPath);
+ Object outDoc = HoconConverterUtil.getInstance().convert(document, outClass, hocon, inPath, outPath);
- StreamsDatum datum = DatumUtils.cloneDatum(entry);
- datum.setDocument(outDoc);
- result.add(datum);
+ StreamsDatum datum = DatumUtils.cloneDatum(entry);
+ datum.setDocument(outDoc);
+ result.add(datum);
- return result;
- }
+ return result;
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ }
- @Override
- public void cleanUp() {
+ @Override
+ public void cleanUp() {
- }
-};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
index f8db30c..bac081c 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
@@ -18,6 +18,8 @@
package org.apache.streams.converter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -26,7 +28,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigObject;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValue;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,95 +36,104 @@ import java.io.IOException;
/**
* HoconConverterUtil supports HoconConverterProcessor in converting types via application
- * of hocon (https://github.com/typesafehub/config/blob/master/HOCON.md) scripts
+ * of hocon (https://github.com/typesafehub/config/blob/master/HOCON.md) scripts.
*/
public class HoconConverterUtil {
- private final static Logger LOGGER = LoggerFactory.getLogger(HoconConverterUtil.class);
-
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- private static final HoconConverterUtil INSTANCE = new HoconConverterUtil();
-
- public static HoconConverterUtil getInstance(){
- return INSTANCE;
- }
-
- public Object convert(Object object, Class outClass, String hoconResource) {
- Config hocon = ConfigFactory.parseResources(hoconResource);
- return convert(object, outClass, hocon, null);
- }
-
- public Object convert(Object object, Class outClass, String hoconResource, String outPath) {
- Config hocon = ConfigFactory.parseResources(hoconResource);
- return convert(object, outClass, hocon, outPath);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HoconConverterUtil.class);
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private static final HoconConverterUtil INSTANCE = new HoconConverterUtil();
+
+ public static HoconConverterUtil getInstance() {
+ return INSTANCE;
+ }
+
+ public Object convert(Object object, Class outClass, String hoconResource) {
+ Config hocon = ConfigFactory.parseResources(hoconResource);
+ return convert(object, outClass, hocon, null);
+ }
+
+ public Object convert(Object object, Class outClass, String hoconResource, String outPath) {
+ Config hocon = ConfigFactory.parseResources(hoconResource);
+ return convert(object, outClass, hocon, outPath);
+ }
+
+ public Object convert(Object object, Class outClass, String hoconResource, String inPath, String outPath) {
+ Config hocon = ConfigFactory.parseResources(hoconResource);
+ return convert(object, outClass, hocon, inPath, outPath);
+ }
+
+ public Object convert(Object object, Class outClass, Config hocon, String outPath) {
+ return convert(object, outClass, hocon, null, outPath);
+ }
+
+ /**
+ * convert.
+ * @param object object
+ * @param outClass outClass
+ * @param hocon hocon
+ * @param inPath inPath
+ * @param outPath outPath
+ * @return result
+ */
+ public Object convert(Object object, Class outClass, Config hocon, String inPath, String outPath) {
+ String json = null;
+ Object outDoc = null;
+ if ( object instanceof String ) {
+ json = (String) object;
+ } else {
+ try {
+ json = mapper.writeValueAsString(object);
+ } catch (JsonProcessingException ex) {
+ LOGGER.warn("Failed to process input:", object);
+ return outDoc;
+ }
}
- public Object convert(Object object, Class outClass, String hoconResource, String inPath, String outPath) {
- Config hocon = ConfigFactory.parseResources(hoconResource);
- return convert(object, outClass, hocon, inPath, outPath);
+ Config base;
+ if( inPath == null) {
+ base = ConfigFactory.parseString(json);
+ } else {
+ ObjectNode node;
+ try {
+ node = mapper.readValue(json, ObjectNode.class);
+ ObjectNode root = mapper.createObjectNode();
+ root.set(inPath, node);
+ json = mapper.writeValueAsString(root);
+ base = ConfigFactory.parseString(json);
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to process input:", object);
+ return outDoc;
+ }
}
- public Object convert(Object object, Class outClass, Config hocon, String outPath) {
- return convert(object, outClass, hocon, null, outPath);
+ Config converted = hocon.withFallback(base);
+
+ String outJson = null;
+ try {
+ if( outPath == null ) {
+ outJson = converted.resolve().root().render(ConfigRenderOptions.concise());
+ } else {
+ Config resolved = converted.resolve();
+ ConfigObject outObject = resolved.withOnlyPath(outPath).root();
+ ConfigValue outValue = outObject.get(outPath);
+ outJson = outValue.render(ConfigRenderOptions.concise());
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to convert:", json);
+ LOGGER.warn(ex.getMessage());
}
-
- public Object convert(Object object, Class outClass, Config hocon, String inPath, String outPath) {
- String json = null;
- Object outDoc = null;
- if( object instanceof String ) {
- json = (String) object;
- } else {
- try {
- json = mapper.writeValueAsString(object);
- } catch (JsonProcessingException e) {
- LOGGER.warn("Failed to process input:", object);
- return outDoc;
- }
- }
-
- Config base;
- if( inPath == null)
- base = ConfigFactory.parseString(json);
- else {
- ObjectNode node;
- try {
- node = mapper.readValue(json, ObjectNode.class);
- ObjectNode root = mapper.createObjectNode();
- root.set(inPath, node);
- json = mapper.writeValueAsString(root);
- base = ConfigFactory.parseString(json);
- } catch (Exception e) {
- LOGGER.warn("Failed to process input:", object);
- return outDoc;
- }
- }
-
- Config converted = hocon.withFallback(base);
-
- String outJson = null;
- try {
- if( outPath == null )
- outJson = converted.resolve().root().render(ConfigRenderOptions.concise());
- else {
- Config resolved = converted.resolve();
- ConfigObject outObject = resolved.withOnlyPath(outPath).root();
- ConfigValue outValue = outObject.get(outPath);
- outJson = outValue.render(ConfigRenderOptions.concise());
- }
- } catch (Exception e) {
- LOGGER.warn("Failed to convert:", json);
- LOGGER.warn(e.getMessage());
- }
- if( outClass == String.class )
- return outJson;
- else {
- try {
- outDoc = mapper.readValue( outJson, outClass );
- } catch (IOException e) {
- LOGGER.warn("Failed to convert:", object);
- }
- }
- return outDoc;
+ if ( outClass == String.class )
+ return outJson;
+ else {
+ try {
+ outDoc = mapper.readValue( outJson, outClass );
+ } catch (IOException ex) {
+ LOGGER.warn("Failed to convert:", object);
+ }
}
+ return outDoc;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
index a38568b..d245c3e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -42,185 +42,219 @@ import java.util.Map;
*/
public class LineReadWriteUtil {
- private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
-
- private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap();
-
- private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+ private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+
+ private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap();
+
+ private static final List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+
+ private List<String> fields;
+ private String fieldDelimiter = "\t";
+ private String lineDelimiter = "\n";
+ private String encoding = "UTF-8";
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private LineReadWriteUtil() {
+ }
+
+ private LineReadWriteUtil(LineReadWriteConfiguration configuration) {
+ this.fields = configuration.getFields();
+ this.fieldDelimiter = configuration.getFieldDelimiter();
+ this.lineDelimiter = configuration.getLineDelimiter();
+ this.encoding = configuration.getEncoding();
+ }
+
+ public static LineReadWriteUtil getInstance() {
+ return getInstance(new LineReadWriteConfiguration());
+ }
+
+ /**
+ * getInstance.
+ * @param configuration
+ * @return result
+ */
+ public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) {
+ if ( INSTANCE_MAP.containsKey(configuration)
+ &&
+ INSTANCE_MAP.get(configuration) != null) {
+ return INSTANCE_MAP.get(configuration);
+ } else {
+ INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration));
+ return INSTANCE_MAP.get(configuration);
+ }
+ }
+
+ /**
+ * processLine
+ * @param line
+ * @return result
+ */
+ public StreamsDatum processLine(String line) {
+
+ List<String> expectedFields = fields;
+ if ( line.endsWith(lineDelimiter)) {
+ line = trimLineDelimiter(line);
+ }
+ String[] parsedFields = line.split(fieldDelimiter);
- private List<String> fields;
- private String fieldDelimiter = "\t";
- private String lineDelimiter = "\n";
- private String encoding = "UTF-8";
+ if ( parsedFields.length == 0) {
+ return null;
+ }
- private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ String id = null;
+ DateTime ts = null;
+ BigInteger seq = null;
+ Map<String, Object> metadata = null;
+ String json = null;
- private LineReadWriteUtil() {
+ if ( expectedFields.contains( FieldConstants.DOC )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
+ json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
}
- private LineReadWriteUtil(LineReadWriteConfiguration configuration) {
- this.fields = configuration.getFields();
- this.fieldDelimiter = configuration.getFieldDelimiter();
- this.lineDelimiter = configuration.getLineDelimiter();
- this.encoding = configuration.getEncoding();
+ if ( expectedFields.contains( FieldConstants.ID )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
+ id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
}
-
- public static LineReadWriteUtil getInstance() {
- return getInstance(new LineReadWriteConfiguration());
+ if ( expectedFields.contains( FieldConstants.SEQ )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
+ try {
+ seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
+ } catch ( NumberFormatException nfe ) {
+ LOGGER.warn("invalid sequence number {}", nfe);
+ }
}
-
- public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) {
- if( INSTANCE_MAP.containsKey(configuration) &&
- INSTANCE_MAP.get(configuration) != null)
- return INSTANCE_MAP.get(configuration);
- else {
- INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration));
- return INSTANCE_MAP.get(configuration);
- }
+ if ( expectedFields.contains( FieldConstants.TS )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
+ ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
}
-
- public StreamsDatum processLine(String line) {
-
- List<String> expectedFields = fields;
- if( line.endsWith(lineDelimiter)) line = trimLineDelimiter(line);
- String[] parsedFields = line.split(fieldDelimiter);
-
- if( parsedFields.length == 0)
- return null;
-
- String id = null;
- DateTime ts = null;
- BigInteger seq = null;
- Map<String, Object> metadata = null;
- String json = null;
-
- if( expectedFields.contains( FieldConstants.DOC )
- && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
- json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
- }
-
- if( expectedFields.contains( FieldConstants.ID )
- && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
- id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
- }
- if( expectedFields.contains( FieldConstants.SEQ )
- && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
- try {
- seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
- } catch( NumberFormatException nfe )
- { LOGGER.warn("invalid sequence number {}", nfe); }
- }
- if( expectedFields.contains( FieldConstants.TS )
- && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
- ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
- }
- if( expectedFields.contains( FieldConstants.META )
- && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
- metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
- }
-
- StreamsDatum datum = new StreamsDatum(json);
- datum.setId(id);
- datum.setTimestamp(ts);
- datum.setMetadata(metadata);
- datum.setSequenceid(seq);
- return datum;
-
+ if ( expectedFields.contains( FieldConstants.META )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
+ metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
}
- public String convertResultToString(StreamsDatum entry) {
- String metadataJson = null;
- try {
- metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
- } catch (JsonProcessingException e) {
- LOGGER.warn("Error converting metadata to a string", e);
- }
+ StreamsDatum datum = new StreamsDatum(json);
+ datum.setId(id);
+ datum.setTimestamp(ts);
+ datum.setMetadata(metadata);
+ datum.setSequenceid(seq);
+ return datum;
+
+ }
+
+ /**
+ * convertResultToString
+ * @param entry
+ * @return result
+ */
+ public String convertResultToString(StreamsDatum entry) {
+ String metadataJson = null;
+ try {
+ metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
+ } catch (JsonProcessingException ex) {
+ LOGGER.warn("Error converting metadata to a string", ex);
+ }
- String documentJson = null;
- try {
- if( entry.getDocument() instanceof String )
- documentJson = (String)entry.getDocument();
- else
- documentJson = MAPPER.writeValueAsString(entry.getDocument());
- } catch (JsonProcessingException e) {
- LOGGER.warn("Error converting document to string", e);
- }
+ String documentJson = null;
+ try {
+ if ( entry.getDocument() instanceof String ) {
+ documentJson = (String) entry.getDocument();
+ } else {
+ documentJson = MAPPER.writeValueAsString(entry.getDocument());
+ }
+ } catch (JsonProcessingException ex) {
+ LOGGER.warn("Error converting document to string", ex);
+ }
- if (Strings.isNullOrEmpty(documentJson))
- return null;
- else {
- StringBuilder stringBuilder = new StringBuilder();
- Iterator<String> fields = this.fields.iterator();
- List<String> fielddata = Lists.newArrayList();
- Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
- while( fields.hasNext() ) {
- String field = fields.next();
- if( field.equals(FieldConstants.DOC) )
- fielddata.add(documentJson);
- else if( field.equals(FieldConstants.ID) )
- fielddata.add(entry.getId());
- else if( field.equals(FieldConstants.SEQ) )
- if( entry.getSequenceid() != null)
- fielddata.add(entry.getSequenceid().toString());
- else
- fielddata.add("null");
- else if( field.equals(FieldConstants.TS) )
- if( entry.getTimestamp() != null )
- fielddata.add(entry.getTimestamp().toString());
- else
- fielddata.add(DateTime.now().toString());
- else if( field.equals(FieldConstants.META) )
- fielddata.add(metadataJson);
- else if( entry.getMetadata().containsKey(field)) {
- fielddata.add(entry.getMetadata().get(field).toString());
- } else {
- fielddata.add(null);
- }
-
- }
- joiner.appendTo(stringBuilder, fielddata);
- return stringBuilder.toString();
+ if (Strings.isNullOrEmpty(documentJson)) {
+ return null;
+ } else {
+ StringBuilder stringBuilder = new StringBuilder();
+ Iterator<String> fields = this.fields.iterator();
+ List<String> fielddata = Lists.newArrayList();
+ Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
+ while( fields.hasNext() ) {
+ String field = fields.next();
+ if ( field.equals(FieldConstants.DOC) ) {
+ fielddata.add(documentJson);
+ } else if ( field.equals(FieldConstants.ID) ) {
+ fielddata.add(entry.getId());
+ } else if ( field.equals(FieldConstants.SEQ) ) {
+ if (entry.getSequenceid() != null) {
+ fielddata.add(entry.getSequenceid().toString());
+ } else {
+ fielddata.add("null");
+ }
+ } else if ( field.equals(FieldConstants.TS) ) {
+ if (entry.getTimestamp() != null) {
+ fielddata.add(entry.getTimestamp().toString());
+ } else {
+ fielddata.add(DateTime.now().toString());
+ }
+ } else if ( field.equals(FieldConstants.META) ) {
+ fielddata.add(metadataJson);
+ } else if ( entry.getMetadata().containsKey(field)) {
+ fielddata.add(entry.getMetadata().get(field).toString());
+ } else {
+ fielddata.add(null);
}
+ }
+ joiner.appendTo(stringBuilder, fielddata);
+ return stringBuilder.toString();
}
-
- public DateTime parseTs(String field) {
-
- DateTime timestamp = null;
+ }
+
+ /**
+ * parseTs
+ * @param field
+ * @return
+ */
+ public DateTime parseTs(String field) {
+
+ DateTime timestamp = null;
+ try {
+ long longts = Long.parseLong(field);
+ timestamp = new DateTime(longts);
+ } catch ( Exception e1 ) {
+ try {
+ timestamp = DateTime.parse(field);
+ } catch ( Exception e2 ) {
try {
- long longts = Long.parseLong(field);
- timestamp = new DateTime(longts);
- } catch ( Exception e ) {
- try {
- timestamp = DateTime.parse(field);
- } catch ( Exception e2 ) {
- try {
- timestamp = MAPPER.readValue(field, DateTime.class);
- } catch ( Exception e3 ) {
- LOGGER.warn("Could not parse timestamp:{} ", field);
- }
- }
+ timestamp = MAPPER.readValue(field, DateTime.class);
+ } catch ( Exception e3 ) {
+ LOGGER.warn("Could not parse timestamp:{} ", field);
}
-
- return timestamp;
+ }
}
- public Map<String, Object> parseMap(String field) {
+ return timestamp;
+ }
- Map<String, Object> metadata = null;
+ /**
+ * parseMap
+ * @param field
+ * @return result
+ */
+ public Map<String, Object> parseMap(String field) {
- try {
- JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
- metadata = MAPPER.convertValue(jsonNode, Map.class);
- } catch (Exception e) {
- LOGGER.warn("failed in parseMap: " + e.getMessage());
- }
- return metadata;
- }
+ Map<String, Object> metadata = null;
- private String trimLineDelimiter(String str) {
- if( !Strings.isNullOrEmpty(str))
- if( str.endsWith(lineDelimiter))
- return str.substring(0,str.length()-1);
- return str;
+ try {
+ JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
+ metadata = MAPPER.convertValue(jsonNode, Map.class);
+ } catch (Exception ex) {
+ LOGGER.warn("failed in parseMap: " + ex.getMessage());
+ }
+ return metadata;
+ }
+
+ private String trimLineDelimiter(String str) {
+ if ( !Strings.isNullOrEmpty(str)) {
+ if (str.endsWith(lineDelimiter)) {
+ return str.substring(0, str.length() - 1);
+ }
}
+ return str;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
index edd70f4..a269f4d 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
@@ -16,13 +16,16 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
+
package org.apache.streams.converter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,65 +35,68 @@ import java.util.List;
/**
* TypeConverterProcessor converts between String json and jackson-compatible POJO objects.
*
+ * <p/>
* Activity is one supported jackson-compatible POJO, so JSON String and objects with structual similarities
* to Activity can be converted to Activity objects.
*
+ * <p/>
* However, conversion to Activity should probably use {@link org.apache.streams.converter.ActivityConverterProcessor}
*
*/
public class TypeConverterProcessor implements StreamsProcessor, Serializable {
- public static final String STREAMS_ID = "TypeConverterProcessor";
+ public static final String STREAMS_ID = "TypeConverterProcessor";
- private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class);
- private List<String> formats = Lists.newArrayList();
+ private List<String> formats = Lists.newArrayList();
- protected ObjectMapper mapper;
+ protected ObjectMapper mapper;
- protected Class outClass;
+ protected Class outClass;
- public TypeConverterProcessor(Class outClass) {
- this.outClass = outClass;
- }
-
- public TypeConverterProcessor(Class outClass, List<String> formats) {
- this(outClass);
- this.formats = formats;
- }
+ public TypeConverterProcessor(Class outClass) {
+ this.outClass = outClass;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public TypeConverterProcessor(Class outClass, List<String> formats) {
+ this(outClass);
+ this.formats = formats;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- List<StreamsDatum> result = Lists.newLinkedList();
- Object inDoc = entry.getDocument();
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
- Object outDoc = TypeConverterUtil.getInstance().convert(inDoc, outClass, mapper);
+ List<StreamsDatum> result = Lists.newLinkedList();
+ Object inDoc = entry.getDocument();
- if( outDoc != null ) {
- entry.setDocument(outDoc);
- result.add(entry);
- }
+ Object outDoc = TypeConverterUtil.getInstance().convert(inDoc, outClass, mapper);
- return result;
+ if ( outDoc != null ) {
+ entry.setDocument(outDoc);
+ result.add(entry);
}
- @Override
- public void prepare(Object configurationObject) {
- if( formats.size() > 0 )
- this.mapper = StreamsJacksonMapper.getInstance(formats);
- else
- this.mapper = StreamsJacksonMapper.getInstance();
- }
+ return result;
+ }
- @Override
- public void cleanUp() {
- this.mapper = null;
+ @Override
+ public void prepare(Object configurationObject) {
+ if ( formats.size() > 0 ) {
+ this.mapper = StreamsJacksonMapper.getInstance(formats);
+ } else {
+ this.mapper = StreamsJacksonMapper.getInstance();
}
+ }
+
+ @Override
+ public void cleanUp() {
+ this.mapper = null;
+ }
-};
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
index 4ace9c4..8843d0e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
@@ -18,9 +18,11 @@
package org.apache.streams.converter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,49 +30,56 @@ import java.io.IOException;
/**
* TypeConverterUtil supports TypeConverterProcessor in converting between String json and
- * jackson-compatible POJO objects
+ * jackson-compatible POJO objects.
*/
public class TypeConverterUtil {
- private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
- private static final TypeConverterUtil INSTANCE = new TypeConverterUtil();
+ private static final TypeConverterUtil INSTANCE = new TypeConverterUtil();
- public static TypeConverterUtil getInstance(){
- return INSTANCE;
- }
+ public static TypeConverterUtil getInstance() {
+ return INSTANCE;
+ }
+
+ public Object convert(Object object, Class outClass) {
+ return TypeConverterUtil.getInstance().convert(object, outClass, StreamsJacksonMapper.getInstance());
+ }
- public Object convert(Object object, Class outClass) {
- return TypeConverterUtil.getInstance().convert(object, outClass, StreamsJacksonMapper.getInstance());
+ /**
+ * convert
+ * @param object
+ * @param outClass
+ * @param mapper
+ * @return
+ */
+ public Object convert(Object object, Class outClass, ObjectMapper mapper) {
+ ObjectNode node = null;
+ Object outDoc = null;
+ if ( object instanceof String ) {
+ try {
+ node = mapper.readValue((String)object, ObjectNode.class);
+ } catch (IOException ex) {
+ LOGGER.warn(ex.getMessage());
+ LOGGER.warn(object.toString());
+ }
+ } else {
+ node = mapper.convertValue(object, ObjectNode.class);
}
- public Object convert(Object object, Class outClass, ObjectMapper mapper) {
- ObjectNode node = null;
- Object outDoc = null;
- if( object instanceof String ) {
- try {
- node = mapper.readValue((String)object, ObjectNode.class);
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- LOGGER.warn(object.toString());
- }
+ if(node != null) {
+ try {
+ if ( outClass == String.class ) {
+ outDoc = mapper.writeValueAsString(node);
} else {
- node = mapper.convertValue(object, ObjectNode.class);
+ outDoc = mapper.convertValue(node, outClass);
}
-
- if(node != null) {
- try {
- if( outClass == String.class )
- outDoc = mapper.writeValueAsString(node);
- else
- outDoc = mapper.convertValue(node, outClass);
-
- } catch (Throwable e) {
- LOGGER.warn(e.getMessage());
- LOGGER.warn(node.toString());
- }
- }
-
- return outDoc;
+ } catch (Throwable ex) {
+ LOGGER.warn(ex.getMessage());
+ LOGGER.warn(node.toString());
+ }
}
+
+ return outDoc;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
index e0c7a68..ed40a17 100644
--- a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
+++ b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
@@ -18,15 +18,16 @@
package org.apache.streams.filters;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.verbs.ObjectCombination;
import org.apache.streams.verbs.VerbDefinition;
import org.apache.streams.verbs.VerbDefinitionMatchUtil;
import org.apache.streams.verbs.VerbDefinitionResolver;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,57 +40,60 @@ import java.util.Set;
*/
public class VerbDefinitionDropFilter implements StreamsProcessor {
- public static final String STREAMS_ID = "VerbDefinitionDropFilter";
+ public static final String STREAMS_ID = "VerbDefinitionDropFilter";
- private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionDropFilter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionDropFilter.class);
- protected Set<VerbDefinition> verbDefinitionSet;
- protected VerbDefinitionResolver resolver;
+ protected Set<VerbDefinition> verbDefinitionSet;
+ protected VerbDefinitionResolver resolver;
- public VerbDefinitionDropFilter() {
- // get with reflection
- }
-
- public VerbDefinitionDropFilter(Set<VerbDefinition> verbDefinitionSet) {
- this();
- this.verbDefinitionSet = verbDefinitionSet;
- }
+ public VerbDefinitionDropFilter() {
+ // get with reflection
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public VerbDefinitionDropFilter(Set<VerbDefinition> verbDefinitionSet) {
+ this();
+ this.verbDefinitionSet = verbDefinitionSet;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- List<StreamsDatum> result = Lists.newArrayList();
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
- LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
+ List<StreamsDatum> result = Lists.newArrayList();
- Activity activity;
+ LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity;
- activity = (Activity) entry.getDocument();
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- if( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == false )
- result.add(entry);
+ activity = (Activity) entry.getDocument();
- return result;
+ if ( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == false ) {
+ result.add(entry);
}
- @Override
- public void prepare(Object o) {
- if( verbDefinitionSet != null)
- resolver = new VerbDefinitionResolver(verbDefinitionSet);
- else resolver = new VerbDefinitionResolver();
- Preconditions.checkNotNull(resolver);
- }
+ return result;
+ }
- @Override
- public void cleanUp() {
- // noOp
+ @Override
+ public void prepare(Object configuration) {
+ if ( verbDefinitionSet != null) {
+ resolver = new VerbDefinitionResolver(verbDefinitionSet);
+ } else {
+ resolver = new VerbDefinitionResolver();
}
+ Preconditions.checkNotNull(resolver);
+ }
+
+ @Override
+ public void cleanUp() {
+ // noOp
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
index 82e8c99..7562905 100644
--- a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
+++ b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
@@ -18,19 +18,21 @@
package org.apache.streams.filters;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.verbs.ObjectCombination;
import org.apache.streams.verbs.VerbDefinition;
import org.apache.streams.verbs.VerbDefinitionMatchUtil;
import org.apache.streams.verbs.VerbDefinitionResolver;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.List;
+import java.util.Set;
/**
* Checks one or more verb definitions against a stream of Activity documents, and drops any activities
@@ -38,57 +40,60 @@ import java.util.*;
*/
public class VerbDefinitionKeepFilter implements StreamsProcessor {
- public static final String STREAMS_ID = "VerbDefinitionKeepFilter";
+ public static final String STREAMS_ID = "VerbDefinitionKeepFilter";
- private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionKeepFilter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionKeepFilter.class);
- protected Set<VerbDefinition> verbDefinitionSet;
- protected VerbDefinitionResolver resolver;
+ protected Set<VerbDefinition> verbDefinitionSet;
+ protected VerbDefinitionResolver resolver;
- public VerbDefinitionKeepFilter() {
- // get with reflection
- }
-
- public VerbDefinitionKeepFilter(Set<VerbDefinition> verbDefinitionSet) {
- this();
- this.verbDefinitionSet = verbDefinitionSet;
- }
+ public VerbDefinitionKeepFilter() {
+ // get with reflection
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public VerbDefinitionKeepFilter(Set<VerbDefinition> verbDefinitionSet) {
+ this();
+ this.verbDefinitionSet = verbDefinitionSet;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- List<StreamsDatum> result = Lists.newArrayList();
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
- LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
+ List<StreamsDatum> result = Lists.newArrayList();
- Activity activity;
+ LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity;
- activity = (Activity) entry.getDocument();
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- if( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == true )
- result.add(entry);
+ activity = (Activity) entry.getDocument();
- return result;
+ if ( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == true ) {
+ result.add(entry);
}
- @Override
- public void prepare(Object o) {
- if( verbDefinitionSet != null)
- resolver = new VerbDefinitionResolver(verbDefinitionSet);
- else resolver = new VerbDefinitionResolver();
- Preconditions.checkNotNull(resolver);
- }
+ return result;
+ }
- @Override
- public void cleanUp() {
- // noOp
+ @Override
+ public void prepare(Object configuration) {
+ if ( verbDefinitionSet != null ) {
+ resolver = new VerbDefinitionResolver(verbDefinitionSet);
+ } else {
+ resolver = new VerbDefinitionResolver();
}
+ Preconditions.checkNotNull(resolver);
+ }
+
+ @Override
+ public void cleanUp() {
+ // noOp
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
index d8309d9..8cacf1f 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
@@ -18,11 +18,19 @@
package org.apache.streams.components.http.persist;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
@@ -33,12 +41,6 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,183 +53,189 @@ import java.util.Map;
public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
- private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter";
+ private static final String STREAMS_ID = "SimpleHTTPPostPersistWriter";
- private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class);
- protected ObjectMapper mapper;
+ protected ObjectMapper mapper;
- protected URIBuilder uriBuilder;
+ protected URIBuilder uriBuilder;
- protected CloseableHttpClient httpclient;
+ protected CloseableHttpClient httpclient;
- protected HttpPersistWriterConfiguration configuration;
+ protected HttpPersistWriterConfiguration configuration;
- protected String authHeader;
+ protected String authHeader;
- public SimpleHTTPPostPersistWriter() {
- this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
- }
+ public SimpleHTTPPostPersistWriter() {
+ this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
+ }
- public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) {
- this.configuration = configuration;
- }
+ public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) {
+ this.configuration = configuration;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public void write(StreamsDatum entry) {
+ @Override
+ public void write(StreamsDatum entry) {
- ObjectNode payload;
- try {
- payload = preparePayload(entry);
- } catch( Exception e ) {
- LOGGER.warn("Exception preparing payload, using empty payload");
- payload = mapper.createObjectNode();
- }
+ ObjectNode payload;
+ try {
+ payload = preparePayload(entry);
+ } catch ( Exception ex ) {
+ LOGGER.warn("Exception preparing payload, using empty payload");
+ payload = mapper.createObjectNode();
+ }
- Map<String, String> params = prepareParams(entry);
+ Map<String, String> params = prepareParams(entry);
- URI uri = prepareURI(params);
+ URI uri = prepareURI(params);
- HttpPost httppost = prepareHttpPost(uri, payload);
+ HttpPost httppost = prepareHttpPost(uri, payload);
- ObjectNode result = executePost(httppost);
+ ObjectNode result = executePost(httppost);
- try {
- LOGGER.debug(mapper.writeValueAsString(result));
- } catch (JsonProcessingException e) {
- LOGGER.warn("Non-json response", e.getMessage());
- }
+ try {
+ LOGGER.debug(mapper.writeValueAsString(result));
+ } catch (JsonProcessingException ex) {
+ LOGGER.warn("Non-json response", ex.getMessage());
}
-
- /**
- Override this to alter request URI
- */
- protected URI prepareURI(Map<String, String> params) {
- URI uri = null;
- for( Map.Entry<String,String> param : params.entrySet()) {
- uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
- }
- try {
- uri = uriBuilder.build();
- } catch (URISyntaxException e) {
- LOGGER.error("URI error {}", uriBuilder.toString());
- }
- return uri;
+ }
+
+ /**
+ Override this to alter request URI.
+ */
+ protected URI prepareURI(Map<String, String> params) {
+ URI uri = null;
+ for ( Map.Entry<String,String> param : params.entrySet()) {
+ uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
}
-
- /**
- Override this to add parameters to the request
- */
- protected Map<String, String> prepareParams(StreamsDatum entry) {
- return new HashMap<>();
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException ex) {
+ LOGGER.error("URI error {}", uriBuilder.toString());
}
-
- /**
- Override this to alter json payload on to the request
- */
- protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
-
- if( entry.getDocument() != null ) {
- if( entry.getDocument() instanceof ObjectNode )
- return (ObjectNode) entry.getDocument();
- else return mapper.convertValue(entry.getDocument(), ObjectNode.class);
- }
- else return null;
+ return uri;
+ }
+
+ /**
+ Override this to add parameters to the request.
+ */
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+ return new HashMap<>();
+ }
+
+ /**
+ Override this to alter json payload on to the request.
+ */
+ protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
+
+ if ( entry.getDocument() != null ) {
+ if ( entry.getDocument() instanceof ObjectNode ) {
+ return (ObjectNode) entry.getDocument();
+ } else {
+ return mapper.convertValue(entry.getDocument(), ObjectNode.class);
+ }
+ } else {
+ return null;
}
-
- /**
- Override this to add headers to the request
- */
- public HttpPost prepareHttpPost(URI uri, ObjectNode payload) {
- HttpPost httppost = new HttpPost(uri);
- httppost.addHeader("content-type", this.configuration.getContentType());
- httppost.addHeader("accept-charset", "UTF-8");
- if( !Strings.isNullOrEmpty(authHeader))
- httppost.addHeader("Authorization", "Basic " + authHeader);
- try {
- String entity = mapper.writeValueAsString(payload);
- httppost.setEntity(new StringEntity(entity));
- } catch (JsonProcessingException | UnsupportedEncodingException e) {
- LOGGER.warn(e.getMessage());
- }
- return httppost;
+ }
+
+ /**
+ Override this to add headers to the request.
+ */
+ public HttpPost prepareHttpPost(URI uri, ObjectNode payload) {
+ HttpPost httppost = new HttpPost(uri);
+ httppost.addHeader("content-type", this.configuration.getContentType());
+ httppost.addHeader("accept-charset", "UTF-8");
+ if ( !Strings.isNullOrEmpty(authHeader)) {
+ httppost.addHeader("Authorization", "Basic " + authHeader);
}
-
- protected ObjectNode executePost(HttpPost httpPost) {
-
- Preconditions.checkNotNull(httpPost);
-
- ObjectNode result = null;
-
- CloseableHttpResponse response = null;
-
- String entityString;
- try {
- response = httpclient.execute(httpPost);
- HttpEntity entity = response.getEntity();
- // TODO: handle retry
- if (response.getStatusLine() != null && response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK && entity != null) {
- entityString = EntityUtils.toString(entity);
- result = mapper.readValue(entityString, ObjectNode.class);
- }
- } catch (IOException e) {
- LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
- } finally {
- try {
- if (response != null) {
- response.close();
- }
- } catch (IOException ignored) {}
+ try {
+ String entity = mapper.writeValueAsString(payload);
+ httppost.setEntity(new StringEntity(entity));
+ } catch (JsonProcessingException | UnsupportedEncodingException ex) {
+ LOGGER.warn(ex.getMessage());
+ }
+ return httppost;
+ }
+
+ protected ObjectNode executePost(HttpPost httpPost) {
+
+ Preconditions.checkNotNull(httpPost);
+
+ ObjectNode result = null;
+
+ CloseableHttpResponse response = null;
+
+ String entityString;
+ try {
+ response = httpclient.execute(httpPost);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle retry
+ if (response.getStatusLine() != null && response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ result = mapper.readValue(entityString, ObjectNode.class);
+ }
+ } catch (IOException ex) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
}
- return result;
+ } catch (IOException ignored) {
+ LOGGER.trace("IOException", ignored);
+ }
}
+ return result;
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- mapper = StreamsJacksonMapper.getInstance();
+ mapper = StreamsJacksonMapper.getInstance();
- uriBuilder = new URIBuilder()
- .setScheme(this.configuration.getProtocol())
- .setHost(this.configuration.getHostname())
- .setPort(this.configuration.getPort().intValue())
- .setPath(this.configuration.getResourcePath());
-
- if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
- uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
- if( !Strings.isNullOrEmpty(configuration.getUsername())
- && !Strings.isNullOrEmpty(configuration.getPassword())) {
- String string = configuration.getUsername() + ":" + configuration.getPassword();
- authHeader = Base64.encodeBase64String(string.getBytes());
- }
-
- httpclient = HttpClients.createDefault();
+ uriBuilder = new URIBuilder()
+ .setScheme(this.configuration.getProtocol())
+ .setHost(this.configuration.getHostname())
+ .setPort(this.configuration.getPort().intValue())
+ .setPath(this.configuration.getResourcePath());
+ if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) {
+ uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
+ }
+ if ( !Strings.isNullOrEmpty(configuration.getUsername()) && !Strings.isNullOrEmpty(configuration.getPassword())) {
+ String string = configuration.getUsername() + ":" + configuration.getPassword();
+ authHeader = Base64.encodeBase64String(string.getBytes());
}
- @Override
- public void cleanUp() {
-
- LOGGER.info("shutting down SimpleHTTPPostPersistWriter");
- try {
- httpclient.close();
- } catch (IOException e) {
- LOGGER.error(e.getMessage());
- } finally {
- try {
- httpclient.close();
- } catch (IOException e) {
- LOGGER.error(e.getMessage());
- } finally {
- httpclient = null;
- }
- }
+ httpclient = HttpClients.createDefault();
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ LOGGER.info("shutting down SimpleHTTPPostPersistWriter");
+ try {
+ httpclient.close();
+ } catch (IOException ex) {
+ LOGGER.error(ex.getMessage());
+ } finally {
+ try {
+ httpclient.close();
+ } catch (IOException e2) {
+ LOGGER.error(e2.getMessage());
+ } finally {
+ httpclient = null;
+ }
}
+ }
}