You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:22 UTC

[41/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
index d39c087..cdba150 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
@@ -19,61 +19,72 @@ under the License.
 
 package org.apache.streams.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.data.DocumentClassifier;
 import org.apache.streams.data.util.ActivityUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
+ * Ensures generic String and ObjectNode documents can be converted to Activity
+ *
+ * <p/>
  * BaseDocumentClassifier is included by default in all
  * @see org.apache.streams.converter.ActivityConverterProcessor
  *
- * Ensures generic String and ObjectNode documents can be converted to Activity
- *
  */
 public class BaseDocumentClassifier implements DocumentClassifier {
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public List<Class> detectClasses(Object document) {
-
-        Activity activity;
-        ObjectNode node = null;
-
-        List<Class> classes = new ArrayList<>();
-        // Soon javax.validation will available in jackson
-        //   That will make this simpler and more powerful
-        if( document instanceof String ) {
-            classes.add(String.class);
-            try {
-                activity = this.mapper.readValue((String)document, Activity.class);
-                if(activity != null && ActivityUtil.isValid(activity))
-                    classes.add(Activity.class);
-            } catch (IOException e1) {
-                try {
-                    node = this.mapper.readValue((String)document, ObjectNode.class);
-                    classes.add(ObjectNode.class);
-                } catch (IOException ignored) { }
-            }
-        } else if( document instanceof ObjectNode ){
-            classes.add(ObjectNode.class);
-            activity = this.mapper.convertValue(document, Activity.class);
-            if(ActivityUtil.isValid(activity))
-                classes.add(Activity.class);
-        } else {
-            classes.add(document.getClass());
-        }
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseDocumentClassifier.class);
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public List<Class> detectClasses(Object document) {
 
-        return classes;
+    Activity activity;
+    ObjectNode node = null;
 
+    List<Class> classes = new ArrayList<>();
+    // Soon javax.validation will available in jackson
+    //   That will make this simpler and more powerful
+    if ( document instanceof String ) {
+      classes.add(String.class);
+      try {
+        activity = this.mapper.readValue((String)document, Activity.class);
+        if (activity != null && ActivityUtil.isValid(activity)) {
+          classes.add(Activity.class);
+        }
+      } catch (IOException e1) {
+        try {
+          node = this.mapper.readValue((String)document, ObjectNode.class);
+          classes.add(ObjectNode.class);
+        } catch (IOException ignored) {
+          LOGGER.trace("ignoring ", ignored);
+        }
+      }
+    } else if ( document instanceof ObjectNode ) {
+      classes.add(ObjectNode.class);
+      activity = this.mapper.convertValue(document, Activity.class);
+      if (ActivityUtil.isValid(activity)) {
+        classes.add(Activity.class);
+      }
+    } else {
+      classes.add(document.getClass());
     }
 
+    return classes;
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
index b6cde29..cb61f0e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
@@ -19,75 +19,88 @@ under the License.
 
 package org.apache.streams.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 
 /**
+ * Ensures generic ObjectNode representation of an Activity can be converted to Activity
+ *
+ * <p/>
  * BaseObjectNodeActivityConverter is included by default in all
  * @see {@link org.apache.streams.converter.ActivityConverterProcessor}
  *
- * Ensures generic ObjectNode representation of an Activity can be converted to Activity
  *
  */
 public class BaseObjectNodeActivityConverter implements ActivityConverter<ObjectNode> {
 
-    public static Class requiredClass = ObjectNode.class;
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseObjectNodeActivityConverter.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  public static Class requiredClass = ObjectNode.class;
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    @Override
-    public ObjectNode fromActivity(Activity deserialized) throws ActivityConversionException {
-        try {
-           return mapper.convertValue(deserialized, ObjectNode.class);
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        }
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public List<Activity> toActivityList(ObjectNode serialized) throws ActivityConversionException {
-        List<Activity> activityList = Lists.newArrayList();
-        try {
-            activityList.add(mapper.convertValue(serialized, Activity.class));
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        } finally {
-            return activityList;
-        }
+  @Override
+  public ObjectNode fromActivity(Activity deserialized) throws ActivityConversionException {
+    try {
+      return mapper.convertValue(deserialized, ObjectNode.class);
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
     }
+  }
+
+  @Override
+  public List<ObjectNode> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
 
-    @Override
-    public List<ObjectNode> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
+  @Override
+  public List<Activity> toActivityList(ObjectNode serialized) throws ActivityConversionException {
+    List<Activity> activityList = Lists.newArrayList();
+    try {
+      activityList.add(mapper.convertValue(serialized, Activity.class));
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
+    } finally {
+      return activityList;
     }
+  }
 
-    @Override
-    public List<Activity> toActivityList(List<ObjectNode> list) {
-        List<Activity> result = Lists.newArrayList();
-        for( ObjectNode item : list ) {
-            try {
-                result.addAll(toActivityList(item));
-            } catch (ActivityConversionException e) {}
-        }
-        return result;
+  @Override
+  public List<Activity> toActivityList(List<ObjectNode> list) {
+    List<Activity> result = Lists.newArrayList();
+    for ( ObjectNode item : list ) {
+      try {
+        result.addAll(toActivityList(item));
+      } catch (ActivityConversionException ex) {
+        LOGGER.trace("ActivityConversionException", ex);
+      }
     }
+    return result;
+  }
+
+
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
index cb66414..a38585e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java
@@ -19,58 +19,54 @@ under the License.
 
 package org.apache.streams.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.ActivityObjectConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 
-import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
+ * Ensures generic ObjectNode representation of an Activity can be converted to Activity.
+ *
+ * <p/>
  * BaseObjectNodeActivityConverter is included by default in all
  * @see {@link ActivityConverterProcessor}
  *
- * Ensures generic ObjectNode representation of an Activity can be converted to Activity
- *
  */
 public class BaseObjectNodeActivityObjectConverter implements ActivityObjectConverter<ObjectNode> {
 
-    public static Class requiredClass = ObjectNode.class;
+  public static Class requiredClass = ObjectNode.class;
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public ObjectNode fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
-        try {
-            return mapper.convertValue(deserialized, ObjectNode.class);
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        }
+  @Override
+  public ObjectNode fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+    try {
+      return mapper.convertValue(deserialized, ObjectNode.class);
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
     }
+  }
 
-    @Override
-    public ActivityObject toActivityObject(ObjectNode serialized) throws ActivityConversionException {
-        try {
-            return mapper.convertValue(serialized, ActivityObject.class);
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        }
+  @Override
+  public ActivityObject toActivityObject(ObjectNode serialized) throws ActivityConversionException {
+    try {
+      return mapper.convertValue(serialized, ActivityObject.class);
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
index 7438abb..da15dee 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
@@ -19,75 +19,86 @@ under the License.
 
 package org.apache.streams.converter;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 
 /**
+ * Ensures generic String Json representation of an Activity can be converted to Activity
+ *
+ * <p/>
  * BaseObjectNodeActivityConverter is included by default in all
  * @see {@link org.apache.streams.converter.ActivityConverterProcessor}
  *
- * Ensures generic String Json representation of an Activity can be converted to Activity
- *
  */
 public class BaseStringActivityConverter implements ActivityConverter<String> {
 
-    public static Class requiredClass = String.class;
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseObjectNodeActivityConverter.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  public static final Class requiredClass = String.class;
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    @Override
-    public String fromActivity(Activity deserialized) throws ActivityConversionException {
-        try {
-            return mapper.writeValueAsString(deserialized);
-        } catch (JsonProcessingException e) {
-            throw new ActivityConversionException();
-        }
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public List<Activity> toActivityList(String serialized) throws ActivityConversionException {
-        List<Activity> activityList = Lists.newArrayList();
-        try {
-            activityList.add(mapper.readValue(serialized, Activity.class));
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        } finally {
-            return activityList;
-        }
+  @Override
+  public String fromActivity(Activity deserialized) throws ActivityConversionException {
+    try {
+      return mapper.writeValueAsString(deserialized);
+    } catch (JsonProcessingException ex) {
+      throw new ActivityConversionException();
     }
+  }
+
+  @Override
+  public List<String> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
 
-    @Override
-    public List<String> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
+  @Override
+  public List<Activity> toActivityList(String serialized) throws ActivityConversionException {
+    List<Activity> activityList = Lists.newArrayList();
+    try {
+      activityList.add(mapper.readValue(serialized, Activity.class));
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
+    } finally {
+      return activityList;
     }
+  }
 
-    @Override
-    public List<Activity> toActivityList(List<String> list) {
-        List<Activity> result = Lists.newArrayList();
-        for( String item : list ) {
-            try {
-                result.addAll(toActivityList(item));
-            } catch (ActivityConversionException e) {}
-        }
-        return result;
+  @Override
+  public List<Activity> toActivityList(List<String> list) {
+    List<Activity> result = Lists.newArrayList();
+    for ( String item : list ) {
+      try {
+        result.addAll(toActivityList(item));
+      } catch (ActivityConversionException ex) {
+        LOGGER.trace("ActivityConversionException", ex);
+      }
     }
+    return result;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
index 3bbbdac..7322fc1 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java
@@ -19,52 +19,53 @@ under the License.
 
 package org.apache.streams.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.data.ActivityObjectConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.ActivityObject;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 /**
+ * Ensures generic ObjectNode representation of an Activity can be converted to Activity.
+ *
+ * <p/>
  * BaseObjectNodeActivityConverter is included by default in all
  * @see {@link ActivityConverterProcessor}
  *
- * Ensures generic ObjectNode representation of an Activity can be converted to Activity
- *
  */
 public class BaseStringActivityObjectConverter implements ActivityObjectConverter<String> {
 
-    public static Class requiredClass = String.class;
+  public static Class requiredClass = String.class;
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public String fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
-        try {
-            return mapper.writeValueAsString(deserialized);
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        }
+  @Override
+  public String fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+    try {
+      return mapper.writeValueAsString(deserialized);
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
     }
+  }
 
-    @Override
-    public ActivityObject toActivityObject(String serialized) throws ActivityConversionException {
-        try {
-            return mapper.readValue(serialized, ActivityObject.class);
-        } catch (Exception e) {
-            throw new ActivityConversionException();
-        }
+  @Override
+  public ActivityObject toActivityObject(String serialized) throws ActivityConversionException {
+    try {
+      return mapper.readValue(serialized, ActivityObject.class);
+    } catch (Exception ex) {
+      throw new ActivityConversionException();
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
index 26dfcb3..3443bd9 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
@@ -19,14 +19,14 @@
 package org.apache.streams.converter;
 
 /**
- * Predefined field symbols
+ * Predefined field symbols.
  */
 public class FieldConstants {
 
-    protected static final String ID = "ID";
-    protected static final String SEQ = "SEQ";
-    protected static final String TS = "TS";
-    protected static final String META = "META";
-    protected static final String DOC = "DOC";
+  protected static final String ID = "ID";
+  protected static final String SEQ = "SEQ";
+  protected static final String TS = "TS";
+  protected static final String META = "META";
+  protected static final String DOC = "DOC";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
index b3ee72f..44aa56b 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java
@@ -19,11 +19,12 @@ under the License.
 
 package org.apache.streams.converter;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.core.util.DatumUtils;
-import org.apache.streams.pojo.json.Activity;
+
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,53 +34,62 @@ import java.util.List;
  * HoconConverterProcessor is a utility processor for converting any datum document
  * with translation rules expressed as HOCON in the classpath or at a URL.
  *
+ * <p/>
  * To use this capability without a dedicated stream processor, just use HoconConverterUtil.
  */
 public class HoconConverterProcessor implements StreamsProcessor {
 
-    public static final String STREAMS_ID = "HoconConverterProcessor";
+  public static final String STREAMS_ID = "HoconConverterProcessor";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HoconConverterProcessor.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(HoconConverterProcessor.class);
 
-    protected Class outClass;
-    protected String hocon;
-    protected String inPath;
-    protected String outPath;
+  protected Class outClass;
+  protected String hocon;
+  protected String inPath;
+  protected String outPath;
 
-    public HoconConverterProcessor(Class outClass, String hocon, String inPath, String outPath) {
-        this.outClass = outClass;
-        this.hocon = hocon;
-        this.inPath = inPath;
-        this.outPath = outPath;
-    }
+  /**
+   * HoconConverterProcessor.
+   *
+   * @param outClass outClass
+   * @param hocon hocon
+   * @param inPath inPath
+   * @param outPath outPath
+   */
+  public HoconConverterProcessor(Class outClass, String hocon, String inPath, String outPath) {
+    this.outClass = outClass;
+    this.hocon = hocon;
+    this.inPath = inPath;
+    this.outPath = outPath;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Object document = entry.getDocument();
+    List<StreamsDatum> result = Lists.newLinkedList();
+    Object document = entry.getDocument();
 
-        Object outDoc = HoconConverterUtil.getInstance().convert(document, outClass, hocon, inPath, outPath);
+    Object outDoc = HoconConverterUtil.getInstance().convert(document, outClass, hocon, inPath, outPath);
 
-        StreamsDatum datum = DatumUtils.cloneDatum(entry);
-        datum.setDocument(outDoc);
-        result.add(datum);
+    StreamsDatum datum = DatumUtils.cloneDatum(entry);
+    datum.setDocument(outDoc);
+    result.add(datum);
 
-        return result;
-    }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
+  @Override
+  public void cleanUp() {
 
-    }
-};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
index f8db30c..bac081c 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.converter;
 
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -26,7 +28,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigObject;
 import com.typesafe.config.ConfigRenderOptions;
 import com.typesafe.config.ConfigValue;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,95 +36,104 @@ import java.io.IOException;
 
 /**
  * HoconConverterUtil supports HoconConverterProcessor in converting types via application
- * of hocon (https://github.com/typesafehub/config/blob/master/HOCON.md) scripts
+ * of hocon (https://github.com/typesafehub/config/blob/master/HOCON.md) scripts.
  */
 public class HoconConverterUtil {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HoconConverterUtil.class);
-
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    private static final HoconConverterUtil INSTANCE = new HoconConverterUtil();
-
-    public static HoconConverterUtil getInstance(){
-        return INSTANCE;
-    }
-
-    public Object convert(Object object, Class outClass, String hoconResource) {
-        Config hocon = ConfigFactory.parseResources(hoconResource);
-        return convert(object, outClass, hocon, null);
-    }
-
-    public Object convert(Object object, Class outClass, String hoconResource, String outPath) {
-        Config hocon = ConfigFactory.parseResources(hoconResource);
-        return convert(object, outClass, hocon, outPath);
+  private static final Logger LOGGER = LoggerFactory.getLogger(HoconConverterUtil.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private static final HoconConverterUtil INSTANCE = new HoconConverterUtil();
+
+  public static HoconConverterUtil getInstance() {
+    return INSTANCE;
+  }
+
+  public Object convert(Object object, Class outClass, String hoconResource) {
+    Config hocon = ConfigFactory.parseResources(hoconResource);
+    return convert(object, outClass, hocon, null);
+  }
+
+  public Object convert(Object object, Class outClass, String hoconResource, String outPath) {
+    Config hocon = ConfigFactory.parseResources(hoconResource);
+    return convert(object, outClass, hocon, outPath);
+  }
+
+  public Object convert(Object object, Class outClass, String hoconResource, String inPath, String outPath) {
+    Config hocon = ConfigFactory.parseResources(hoconResource);
+    return convert(object, outClass, hocon, inPath, outPath);
+  }
+
+  public Object convert(Object object, Class outClass, Config hocon, String outPath) {
+    return convert(object, outClass, hocon, null, outPath);
+  }
+
+  /**
+   * convert.
+   * @param object object
+   * @param outClass outClass
+   * @param hocon hocon
+   * @param inPath inPath
+   * @param outPath outPath
+   * @return result
+   */
+  public Object convert(Object object, Class outClass, Config hocon, String inPath, String outPath) {
+    String json = null;
+    Object outDoc = null;
+    if ( object instanceof String ) {
+      json = (String) object;
+    } else {
+      try {
+        json = mapper.writeValueAsString(object);
+      } catch (JsonProcessingException ex) {
+        LOGGER.warn("Failed to process input:", object);
+        return outDoc;
+      }
     }
 
-    public Object convert(Object object, Class outClass, String hoconResource, String inPath, String outPath) {
-        Config hocon = ConfigFactory.parseResources(hoconResource);
-        return convert(object, outClass, hocon, inPath, outPath);
+    Config base;
+    if( inPath == null) {
+      base = ConfigFactory.parseString(json);
+    } else {
+      ObjectNode node;
+      try {
+        node = mapper.readValue(json, ObjectNode.class);
+        ObjectNode root = mapper.createObjectNode();
+        root.set(inPath, node);
+        json = mapper.writeValueAsString(root);
+        base = ConfigFactory.parseString(json);
+      } catch (Exception ex) {
+        LOGGER.warn("Failed to process input:", object);
+        return outDoc;
+      }
     }
 
-    public Object convert(Object object, Class outClass, Config hocon, String outPath) {
-        return convert(object, outClass, hocon, null, outPath);
+    Config converted = hocon.withFallback(base);
+
+    String outJson = null;
+    try {
+      if( outPath == null ) {
+        outJson = converted.resolve().root().render(ConfigRenderOptions.concise());
+      } else {
+        Config resolved = converted.resolve();
+        ConfigObject outObject = resolved.withOnlyPath(outPath).root();
+        ConfigValue outValue = outObject.get(outPath);
+        outJson = outValue.render(ConfigRenderOptions.concise());
+      }
+    } catch (Exception ex) {
+      LOGGER.warn("Failed to convert:", json);
+      LOGGER.warn(ex.getMessage());
     }
-
-    public Object convert(Object object, Class outClass, Config hocon, String inPath, String outPath) {
-        String json = null;
-        Object outDoc = null;
-        if( object instanceof String ) {
-            json = (String) object;
-        } else {
-            try {
-                json = mapper.writeValueAsString(object);
-            } catch (JsonProcessingException e) {
-                LOGGER.warn("Failed to process input:", object);
-                return outDoc;
-            }
-        }
-
-        Config base;
-        if( inPath == null)
-            base = ConfigFactory.parseString(json);
-        else {
-            ObjectNode node;
-            try {
-                node = mapper.readValue(json, ObjectNode.class);
-                ObjectNode root = mapper.createObjectNode();
-                root.set(inPath, node);
-                json = mapper.writeValueAsString(root);
-                base = ConfigFactory.parseString(json);
-            } catch (Exception e) {
-                LOGGER.warn("Failed to process input:", object);
-                return outDoc;
-            }
-        }
-
-        Config converted = hocon.withFallback(base);
-
-        String outJson = null;
-        try {
-            if( outPath == null )
-                outJson = converted.resolve().root().render(ConfigRenderOptions.concise());
-            else {
-                Config resolved = converted.resolve();
-                ConfigObject outObject = resolved.withOnlyPath(outPath).root();
-                ConfigValue outValue = outObject.get(outPath);
-                outJson = outValue.render(ConfigRenderOptions.concise());
-            }
-        } catch (Exception e) {
-            LOGGER.warn("Failed to convert:", json);
-            LOGGER.warn(e.getMessage());
-        }
-        if( outClass == String.class )
-            return outJson;
-        else {
-            try {
-                outDoc = mapper.readValue( outJson, outClass );
-            } catch (IOException e) {
-                LOGGER.warn("Failed to convert:", object);
-            }
-        }
-        return outDoc;
+    if ( outClass == String.class )
+      return outJson;
+    else {
+      try {
+        outDoc = mapper.readValue( outJson, outClass );
+      } catch (IOException ex) {
+        LOGGER.warn("Failed to convert:", object);
+      }
     }
+    return outDoc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
index a38568b..d245c3e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -42,185 +42,219 @@ import java.util.Map;
  */
 public class LineReadWriteUtil {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
-
-    private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap();
-
-    private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+  private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+
+  private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap();
+
+  private static final List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+
+  private List<String> fields;
+  private String fieldDelimiter = "\t";
+  private String lineDelimiter = "\n";
+  private String encoding = "UTF-8";
+
+  private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private LineReadWriteUtil() {
+  }
+
+  private LineReadWriteUtil(LineReadWriteConfiguration configuration) {
+    this.fields = configuration.getFields();
+    this.fieldDelimiter = configuration.getFieldDelimiter();
+    this.lineDelimiter = configuration.getLineDelimiter();
+    this.encoding = configuration.getEncoding();
+  }
+
+  public static LineReadWriteUtil getInstance() {
+    return getInstance(new LineReadWriteConfiguration());
+  }
+
+  /**
+   * getInstance.
+   * @param configuration
+   * @return result
+   */
+  public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) {
+    if ( INSTANCE_MAP.containsKey(configuration)
+        &&
+        INSTANCE_MAP.get(configuration) != null) {
+      return INSTANCE_MAP.get(configuration);
+    } else {
+      INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration));
+      return INSTANCE_MAP.get(configuration);
+    }
+  }
+
+  /**
+   * processLine
+   * @param line
+   * @return result
+   */
+  public StreamsDatum processLine(String line) {
+
+    List<String> expectedFields = fields;
+    if ( line.endsWith(lineDelimiter)) {
+      line = trimLineDelimiter(line);
+    }
+    String[] parsedFields = line.split(fieldDelimiter);
 
-    private List<String> fields;
-    private String fieldDelimiter = "\t";
-    private String lineDelimiter = "\n";
-    private String encoding = "UTF-8";
+    if ( parsedFields.length == 0) {
+      return null;
+    }
 
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    String id = null;
+    DateTime ts = null;
+    BigInteger seq = null;
+    Map<String, Object> metadata = null;
+    String json = null;
 
-    private LineReadWriteUtil() {
+    if ( expectedFields.contains( FieldConstants.DOC )
+        && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
+      json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
     }
 
-    private LineReadWriteUtil(LineReadWriteConfiguration configuration) {
-        this.fields = configuration.getFields();
-        this.fieldDelimiter = configuration.getFieldDelimiter();
-        this.lineDelimiter = configuration.getLineDelimiter();
-        this.encoding = configuration.getEncoding();
+    if ( expectedFields.contains( FieldConstants.ID )
+        && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
+      id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
     }
-
-    public static LineReadWriteUtil getInstance() {
-        return getInstance(new LineReadWriteConfiguration());
+    if ( expectedFields.contains( FieldConstants.SEQ )
+        && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
+      try {
+        seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
+      } catch ( NumberFormatException nfe ) {
+        LOGGER.warn("invalid sequence number {}", nfe);
+      }
     }
-
-    public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) {
-        if( INSTANCE_MAP.containsKey(configuration) &&
-            INSTANCE_MAP.get(configuration) != null)
-            return INSTANCE_MAP.get(configuration);
-        else {
-            INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration));
-            return INSTANCE_MAP.get(configuration);
-        }
+    if ( expectedFields.contains( FieldConstants.TS )
+        && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
+      ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
     }
-
-    public StreamsDatum processLine(String line) {
-
-        List<String> expectedFields = fields;
-        if( line.endsWith(lineDelimiter)) line = trimLineDelimiter(line);
-        String[] parsedFields = line.split(fieldDelimiter);
-
-        if( parsedFields.length == 0)
-            return null;
-
-        String id = null;
-        DateTime ts = null;
-        BigInteger seq = null;
-        Map<String, Object> metadata = null;
-        String json = null;
-
-        if( expectedFields.contains( FieldConstants.DOC )
-                && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
-            json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
-        }
-
-        if( expectedFields.contains( FieldConstants.ID )
-                && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
-            id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
-        }
-        if( expectedFields.contains( FieldConstants.SEQ )
-                && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
-            try {
-                seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
-            } catch( NumberFormatException nfe )
-            { LOGGER.warn("invalid sequence number {}", nfe); }
-        }
-        if( expectedFields.contains( FieldConstants.TS )
-                && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
-            ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
-        }
-        if( expectedFields.contains( FieldConstants.META )
-                && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
-            metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
-        }
-
-        StreamsDatum datum = new StreamsDatum(json);
-        datum.setId(id);
-        datum.setTimestamp(ts);
-        datum.setMetadata(metadata);
-        datum.setSequenceid(seq);
-        return datum;
-
+    if ( expectedFields.contains( FieldConstants.META )
+        && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
+      metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
     }
 
-    public String convertResultToString(StreamsDatum entry) {
-        String metadataJson = null;
-        try {
-            metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("Error converting metadata to a string", e);
-        }
+    StreamsDatum datum = new StreamsDatum(json);
+    datum.setId(id);
+    datum.setTimestamp(ts);
+    datum.setMetadata(metadata);
+    datum.setSequenceid(seq);
+    return datum;
+
+  }
+
+  /**
+   * convertResultToString
+   * @param entry
+   * @return result
+   */
+  public String convertResultToString(StreamsDatum entry) {
+    String metadataJson = null;
+    try {
+      metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn("Error converting metadata to a string", ex);
+    }
 
-        String documentJson = null;
-        try {
-            if( entry.getDocument() instanceof String )
-                documentJson = (String)entry.getDocument();
-            else
-                documentJson = MAPPER.writeValueAsString(entry.getDocument());
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("Error converting document to string", e);
-        }
+    String documentJson = null;
+    try {
+      if ( entry.getDocument() instanceof String ) {
+        documentJson = (String) entry.getDocument();
+      } else {
+        documentJson = MAPPER.writeValueAsString(entry.getDocument());
+      }
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn("Error converting document to string", ex);
+    }
 
-        if (Strings.isNullOrEmpty(documentJson))
-            return null;
-        else {
-            StringBuilder stringBuilder = new StringBuilder();
-            Iterator<String> fields = this.fields.iterator();
-            List<String> fielddata = Lists.newArrayList();
-            Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
-            while( fields.hasNext() ) {
-                String field = fields.next();
-                if( field.equals(FieldConstants.DOC) )
-                    fielddata.add(documentJson);
-                else if( field.equals(FieldConstants.ID) )
-                    fielddata.add(entry.getId());
-                else if( field.equals(FieldConstants.SEQ) )
-                    if( entry.getSequenceid() != null)
-                        fielddata.add(entry.getSequenceid().toString());
-                    else
-                        fielddata.add("null");
-                else if( field.equals(FieldConstants.TS) )
-                    if( entry.getTimestamp() != null )
-                        fielddata.add(entry.getTimestamp().toString());
-                    else
-                        fielddata.add(DateTime.now().toString());
-                else if( field.equals(FieldConstants.META) )
-                    fielddata.add(metadataJson);
-                else if( entry.getMetadata().containsKey(field)) {
-                    fielddata.add(entry.getMetadata().get(field).toString());
-                } else {
-                    fielddata.add(null);
-                }
-
-            }
-            joiner.appendTo(stringBuilder, fielddata);
-            return stringBuilder.toString();
+    if (Strings.isNullOrEmpty(documentJson)) {
+      return null;
+    } else {
+      StringBuilder stringBuilder = new StringBuilder();
+      Iterator<String> fields = this.fields.iterator();
+      List<String> fielddata = Lists.newArrayList();
+      Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
+      while( fields.hasNext() ) {
+        String field = fields.next();
+        if ( field.equals(FieldConstants.DOC) ) {
+          fielddata.add(documentJson);
+        } else if ( field.equals(FieldConstants.ID) ) {
+          fielddata.add(entry.getId());
+        } else if ( field.equals(FieldConstants.SEQ) ) {
+          if (entry.getSequenceid() != null) {
+            fielddata.add(entry.getSequenceid().toString());
+          } else {
+            fielddata.add("null");
+          }
+        } else if ( field.equals(FieldConstants.TS) ) {
+          if (entry.getTimestamp() != null) {
+            fielddata.add(entry.getTimestamp().toString());
+          } else {
+            fielddata.add(DateTime.now().toString());
+          }
+        } else if ( field.equals(FieldConstants.META) ) {
+          fielddata.add(metadataJson);
+        } else if ( entry.getMetadata().containsKey(field)) {
+          fielddata.add(entry.getMetadata().get(field).toString());
+        } else {
+          fielddata.add(null);
         }
+      }
+      joiner.appendTo(stringBuilder, fielddata);
+      return stringBuilder.toString();
     }
-
-    public DateTime parseTs(String field) {
-
-        DateTime timestamp = null;
+  }
+
+  /**
+   * parseTs
+   * @param field
+   * @return
+   */
+  public DateTime parseTs(String field) {
+
+    DateTime timestamp = null;
+    try {
+      long longts = Long.parseLong(field);
+      timestamp = new DateTime(longts);
+    } catch ( Exception e1 ) {
+      try {
+        timestamp = DateTime.parse(field);
+      } catch ( Exception e2 ) {
         try {
-            long longts = Long.parseLong(field);
-            timestamp = new DateTime(longts);
-        } catch ( Exception e ) {
-            try {
-                timestamp = DateTime.parse(field);
-            } catch ( Exception e2 ) {
-                try {
-                    timestamp = MAPPER.readValue(field, DateTime.class);
-                } catch ( Exception e3 ) {
-                    LOGGER.warn("Could not parse timestamp:{} ", field);
-                }
-            }
+          timestamp = MAPPER.readValue(field, DateTime.class);
+        } catch ( Exception e3 ) {
+          LOGGER.warn("Could not parse timestamp:{} ", field);
         }
-
-        return timestamp;
+      }
     }
 
-    public Map<String, Object> parseMap(String field) {
+    return timestamp;
+  }
 
-        Map<String, Object> metadata = null;
+  /**
+   * parseMap
+   * @param field
+   * @return result
+   */
+  public Map<String, Object> parseMap(String field) {
 
-        try {
-            JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
-            metadata = MAPPER.convertValue(jsonNode, Map.class);
-        } catch (Exception e) {
-            LOGGER.warn("failed in parseMap: " + e.getMessage());
-        }
-        return metadata;
-    }
+    Map<String, Object> metadata = null;
 
-    private String trimLineDelimiter(String str) {
-        if( !Strings.isNullOrEmpty(str))
-            if( str.endsWith(lineDelimiter))
-                return str.substring(0,str.length()-1);
-        return str;
+    try {
+      JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
+      metadata = MAPPER.convertValue(jsonNode, Map.class);
+    } catch (Exception ex) {
+      LOGGER.warn("failed in parseMap: " + ex.getMessage());
+    }
+    return metadata;
+  }
+
+  private String trimLineDelimiter(String str) {
+    if ( !Strings.isNullOrEmpty(str)) {
+      if (str.endsWith(lineDelimiter)) {
+        return str.substring(0, str.length() - 1);
+      }
     }
+    return str;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
index edd70f4..a269f4d 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
@@ -16,13 +16,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
+
 package org.apache.streams.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,65 +35,68 @@ import java.util.List;
 /**
  * TypeConverterProcessor converts between String json and jackson-compatible POJO objects.
  *
+ * <p/>
  * Activity is one supported jackson-compatible POJO, so JSON String and objects with structual similarities
  *   to Activity can be converted to Activity objects.
  *
+ * <p/>
  * However, conversion to Activity should probably use {@link org.apache.streams.converter.ActivityConverterProcessor}
  *
  */
 public class TypeConverterProcessor implements StreamsProcessor, Serializable {
 
-    public static final String STREAMS_ID = "TypeConverterProcessor";
+  public static final String STREAMS_ID = "TypeConverterProcessor";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class);
 
-    private List<String> formats = Lists.newArrayList();
+  private List<String> formats = Lists.newArrayList();
 
-    protected ObjectMapper mapper;
+  protected ObjectMapper mapper;
 
-    protected Class outClass;
+  protected Class outClass;
 
-    public TypeConverterProcessor(Class outClass) {
-        this.outClass = outClass;
-    }
-
-    public TypeConverterProcessor(Class outClass, List<String> formats) {
-        this(outClass);
-        this.formats = formats;
-    }
+  public TypeConverterProcessor(Class outClass) {
+    this.outClass = outClass;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public TypeConverterProcessor(Class outClass, List<String> formats) {
+    this(outClass);
+    this.formats = formats;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Object inDoc = entry.getDocument();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        Object outDoc = TypeConverterUtil.getInstance().convert(inDoc, outClass, mapper);
+    List<StreamsDatum> result = Lists.newLinkedList();
+    Object inDoc = entry.getDocument();
 
-        if( outDoc != null ) {
-            entry.setDocument(outDoc);
-            result.add(entry);
-        }
+    Object outDoc = TypeConverterUtil.getInstance().convert(inDoc, outClass, mapper);
 
-        return result;
+    if ( outDoc != null ) {
+      entry.setDocument(outDoc);
+      result.add(entry);
     }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        if( formats.size() > 0 )
-            this.mapper = StreamsJacksonMapper.getInstance(formats);
-        else
-            this.mapper = StreamsJacksonMapper.getInstance();
-    }
+    return result;
+  }
 
-    @Override
-    public void cleanUp() {
-        this.mapper = null;
+  @Override
+  public void prepare(Object configurationObject) {
+    if ( formats.size() > 0 ) {
+      this.mapper = StreamsJacksonMapper.getInstance(formats);
+    } else {
+      this.mapper = StreamsJacksonMapper.getInstance();
     }
+  }
+
+  @Override
+  public void cleanUp() {
+    this.mapper = null;
+  }
 
-};
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
index 4ace9c4..8843d0e 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
@@ -18,9 +18,11 @@
 
 package org.apache.streams.converter;
 
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,49 +30,56 @@ import java.io.IOException;
 
 /**
  * TypeConverterUtil supports TypeConverterProcessor in converting between String json and
- * jackson-compatible POJO objects
+ * jackson-compatible POJO objects.
  */
 public class TypeConverterUtil {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
 
-    private static final TypeConverterUtil INSTANCE = new TypeConverterUtil();
+  private static final TypeConverterUtil INSTANCE = new TypeConverterUtil();
 
-    public static TypeConverterUtil getInstance(){
-        return INSTANCE;
-    }
+  public static TypeConverterUtil getInstance() {
+    return INSTANCE;
+  }
+
+  public Object convert(Object object, Class outClass) {
+    return TypeConverterUtil.getInstance().convert(object, outClass, StreamsJacksonMapper.getInstance());
+  }
 
-    public Object convert(Object object, Class outClass) {
-        return TypeConverterUtil.getInstance().convert(object, outClass, StreamsJacksonMapper.getInstance());
+  /**
+   * convert
+   * @param object
+   * @param outClass
+   * @param mapper
+   * @return
+   */
+  public Object convert(Object object, Class outClass, ObjectMapper mapper) {
+    ObjectNode node = null;
+    Object outDoc = null;
+    if ( object instanceof String ) {
+      try {
+        node = mapper.readValue((String)object, ObjectNode.class);
+      } catch (IOException ex) {
+        LOGGER.warn(ex.getMessage());
+        LOGGER.warn(object.toString());
+      }
+    } else {
+      node = mapper.convertValue(object, ObjectNode.class);
     }
 
-    public Object convert(Object object, Class outClass, ObjectMapper mapper) {
-        ObjectNode node = null;
-        Object outDoc = null;
-        if( object instanceof String ) {
-            try {
-                node = mapper.readValue((String)object, ObjectNode.class);
-            } catch (IOException e) {
-               LOGGER.warn(e.getMessage());
-                LOGGER.warn(object.toString());
-            }
+    if(node != null) {
+      try {
+        if ( outClass == String.class ) {
+          outDoc = mapper.writeValueAsString(node);
         } else {
-            node = mapper.convertValue(object, ObjectNode.class);
+          outDoc = mapper.convertValue(node, outClass);
         }
-
-        if(node != null) {
-            try {
-                if( outClass == String.class )
-                    outDoc = mapper.writeValueAsString(node);
-                else
-                    outDoc = mapper.convertValue(node, outClass);
-
-            } catch (Throwable e) {
-                LOGGER.warn(e.getMessage());
-                LOGGER.warn(node.toString());
-            }
-        }
-
-        return outDoc;
+      } catch (Throwable ex) {
+        LOGGER.warn(ex.getMessage());
+        LOGGER.warn(node.toString());
+      }
     }
+
+    return outDoc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
index e0c7a68..ed40a17 100644
--- a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
+++ b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java
@@ -18,15 +18,16 @@
 
 package org.apache.streams.filters;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.verbs.ObjectCombination;
 import org.apache.streams.verbs.VerbDefinition;
 import org.apache.streams.verbs.VerbDefinitionMatchUtil;
 import org.apache.streams.verbs.VerbDefinitionResolver;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,57 +40,60 @@ import java.util.Set;
  */
 public class VerbDefinitionDropFilter implements StreamsProcessor {
 
-    public static final String STREAMS_ID = "VerbDefinitionDropFilter";
+  public static final String STREAMS_ID = "VerbDefinitionDropFilter";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionDropFilter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionDropFilter.class);
 
-    protected Set<VerbDefinition> verbDefinitionSet;
-    protected VerbDefinitionResolver resolver;
+  protected Set<VerbDefinition> verbDefinitionSet;
+  protected VerbDefinitionResolver resolver;
 
-    public VerbDefinitionDropFilter() {
-        // get with reflection
-    }
-
-    public VerbDefinitionDropFilter(Set<VerbDefinition> verbDefinitionSet) {
-        this();
-        this.verbDefinitionSet = verbDefinitionSet;
-    }
+  public VerbDefinitionDropFilter() {
+    // get with reflection
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public VerbDefinitionDropFilter(Set<VerbDefinition> verbDefinitionSet) {
+    this();
+    this.verbDefinitionSet = verbDefinitionSet;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        List<StreamsDatum> result = Lists.newArrayList();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
+    List<StreamsDatum> result = Lists.newArrayList();
 
-        Activity activity;
+    LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
 
-        Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+    Activity activity;
 
-        activity = (Activity) entry.getDocument();
+    Preconditions.checkArgument(entry.getDocument() instanceof Activity);
 
-        if( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == false )
-            result.add(entry);
+    activity = (Activity) entry.getDocument();
 
-        return result;
+    if ( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == false ) {
+      result.add(entry);
     }
 
-    @Override
-    public void prepare(Object o) {
-        if( verbDefinitionSet != null)
-            resolver = new VerbDefinitionResolver(verbDefinitionSet);
-        else resolver = new VerbDefinitionResolver();
-        Preconditions.checkNotNull(resolver);
-    }
+    return result;
+  }
 
-    @Override
-    public void cleanUp() {
-        // noOp
+  @Override
+  public void prepare(Object configuration) {
+    if ( verbDefinitionSet != null) {
+      resolver = new VerbDefinitionResolver(verbDefinitionSet);
+    } else {
+      resolver = new VerbDefinitionResolver();
     }
+    Preconditions.checkNotNull(resolver);
+  }
+
+  @Override
+  public void cleanUp() {
+    // noOp
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
index 82e8c99..7562905 100644
--- a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
+++ b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java
@@ -18,19 +18,21 @@
 
 package org.apache.streams.filters;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.verbs.ObjectCombination;
 import org.apache.streams.verbs.VerbDefinition;
 import org.apache.streams.verbs.VerbDefinitionMatchUtil;
 import org.apache.streams.verbs.VerbDefinitionResolver;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Checks one or more verb definitions against a stream of Activity documents, and drops any activities
@@ -38,57 +40,60 @@ import java.util.*;
  */
 public class VerbDefinitionKeepFilter implements StreamsProcessor {
 
-    public static final String STREAMS_ID = "VerbDefinitionKeepFilter";
+  public static final String STREAMS_ID = "VerbDefinitionKeepFilter";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionKeepFilter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionKeepFilter.class);
 
-    protected Set<VerbDefinition> verbDefinitionSet;
-    protected VerbDefinitionResolver resolver;
+  protected Set<VerbDefinition> verbDefinitionSet;
+  protected VerbDefinitionResolver resolver;
 
-    public VerbDefinitionKeepFilter() {
-        // get with reflection
-    }
-
-    public VerbDefinitionKeepFilter(Set<VerbDefinition> verbDefinitionSet) {
-        this();
-        this.verbDefinitionSet = verbDefinitionSet;
-    }
+  public VerbDefinitionKeepFilter() {
+    // get with reflection
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public VerbDefinitionKeepFilter(Set<VerbDefinition> verbDefinitionSet) {
+    this();
+    this.verbDefinitionSet = verbDefinitionSet;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        List<StreamsDatum> result = Lists.newArrayList();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
+    List<StreamsDatum> result = Lists.newArrayList();
 
-        Activity activity;
+    LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass());
 
-        Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+    Activity activity;
 
-        activity = (Activity) entry.getDocument();
+    Preconditions.checkArgument(entry.getDocument() instanceof Activity);
 
-        if( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == true )
-            result.add(entry);
+    activity = (Activity) entry.getDocument();
 
-        return result;
+    if ( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == true ) {
+      result.add(entry);
     }
 
-    @Override
-    public void prepare(Object o) {
-        if( verbDefinitionSet != null)
-            resolver = new VerbDefinitionResolver(verbDefinitionSet);
-        else resolver = new VerbDefinitionResolver();
-        Preconditions.checkNotNull(resolver);
-    }
+    return result;
+  }
 
-    @Override
-    public void cleanUp() {
-        // noOp
+  @Override
+  public void prepare(Object configuration) {
+    if ( verbDefinitionSet != null ) {
+      resolver = new VerbDefinitionResolver(verbDefinitionSet);
+    } else {
+      resolver = new VerbDefinitionResolver();
     }
+    Preconditions.checkNotNull(resolver);
+  }
+
+  @Override
+  public void cleanUp() {
+    // noOp
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
index d8309d9..8cacf1f 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
@@ -18,11 +18,19 @@
 
 package org.apache.streams.components.http.persist;
 
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
@@ -33,12 +41,6 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,183 +53,189 @@ import java.util.Map;
 
 public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
 
-    private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter";
+  private static final String STREAMS_ID = "SimpleHTTPPostPersistWriter";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class);
 
-    protected ObjectMapper mapper;
+  protected ObjectMapper mapper;
 
-    protected URIBuilder uriBuilder;
+  protected URIBuilder uriBuilder;
 
-    protected CloseableHttpClient httpclient;
+  protected CloseableHttpClient httpclient;
 
-    protected HttpPersistWriterConfiguration configuration;
+  protected HttpPersistWriterConfiguration configuration;
 
-    protected String authHeader;
+  protected String authHeader;
 
-    public SimpleHTTPPostPersistWriter() {
-        this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
-    }
+  public SimpleHTTPPostPersistWriter() {
+    this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
+  }
 
-    public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) {
-        this.configuration = configuration;
-    }
+  public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) {
+    this.configuration = configuration;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
+  @Override
+  public void write(StreamsDatum entry) {
 
-        ObjectNode payload;
-        try {
-            payload = preparePayload(entry);
-        } catch( Exception e ) {
-            LOGGER.warn("Exception preparing payload, using empty payload");
-            payload = mapper.createObjectNode();
-        }
+    ObjectNode payload;
+    try {
+      payload = preparePayload(entry);
+    } catch ( Exception ex ) {
+      LOGGER.warn("Exception preparing payload, using empty payload");
+      payload = mapper.createObjectNode();
+    }
 
 
-        Map<String, String> params = prepareParams(entry);
+    Map<String, String> params = prepareParams(entry);
 
-        URI uri = prepareURI(params);
+    URI uri = prepareURI(params);
 
-        HttpPost httppost = prepareHttpPost(uri, payload);
+    HttpPost httppost = prepareHttpPost(uri, payload);
 
-        ObjectNode result = executePost(httppost);
+    ObjectNode result = executePost(httppost);
 
-        try {
-            LOGGER.debug(mapper.writeValueAsString(result));
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("Non-json response", e.getMessage());
-        }
+    try {
+      LOGGER.debug(mapper.writeValueAsString(result));
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn("Non-json response", ex.getMessage());
     }
-
-    /**
-     Override this to alter request URI
-     */
-    protected URI prepareURI(Map<String, String> params) {
-        URI uri = null;
-        for( Map.Entry<String,String> param : params.entrySet()) {
-            uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
-        }
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            LOGGER.error("URI error {}", uriBuilder.toString());
-        }
-        return uri;
+  }
+
+  /**
+   Override this to alter request URI.
+   */
+  protected URI prepareURI(Map<String, String> params) {
+    URI uri = null;
+    for ( Map.Entry<String,String> param : params.entrySet()) {
+      uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
     }
-
-    /**
-     Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-        return new HashMap<>();
+    try {
+      uri = uriBuilder.build();
+    } catch (URISyntaxException ex) {
+      LOGGER.error("URI error {}", uriBuilder.toString());
     }
-
-    /**
-     Override this to alter json payload on to the request
-     */
-    protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
-
-        if( entry.getDocument() != null ) {
-            if( entry.getDocument() instanceof ObjectNode )
-                return (ObjectNode) entry.getDocument();
-            else return mapper.convertValue(entry.getDocument(), ObjectNode.class);
-        }
-        else return null;
+    return uri;
+  }
+
+  /**
+   Override this to add parameters to the request.
+   */
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
+    return new HashMap<>();
+  }
+
+  /**
+   Override this to alter json payload on to the request.
+   */
+  protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
+
+    if ( entry.getDocument() != null ) {
+      if ( entry.getDocument() instanceof ObjectNode ) {
+        return (ObjectNode) entry.getDocument();
+      } else {
+        return mapper.convertValue(entry.getDocument(), ObjectNode.class);
+      }
+    } else {
+      return null;
     }
-
-    /**
-     Override this to add headers to the request
-     */
-    public HttpPost prepareHttpPost(URI uri, ObjectNode payload) {
-        HttpPost httppost = new HttpPost(uri);
-        httppost.addHeader("content-type", this.configuration.getContentType());
-        httppost.addHeader("accept-charset", "UTF-8");
-        if( !Strings.isNullOrEmpty(authHeader))
-            httppost.addHeader("Authorization", "Basic " + authHeader);
-        try {
-            String entity = mapper.writeValueAsString(payload);
-            httppost.setEntity(new StringEntity(entity));
-        } catch (JsonProcessingException | UnsupportedEncodingException e) {
-            LOGGER.warn(e.getMessage());
-        }
-        return httppost;
+  }
+
+  /**
+   Override this to add headers to the request.
+   */
+  public HttpPost prepareHttpPost(URI uri, ObjectNode payload) {
+    HttpPost httppost = new HttpPost(uri);
+    httppost.addHeader("content-type", this.configuration.getContentType());
+    httppost.addHeader("accept-charset", "UTF-8");
+    if ( !Strings.isNullOrEmpty(authHeader)) {
+      httppost.addHeader("Authorization", "Basic " + authHeader);
     }
-
-    protected ObjectNode executePost(HttpPost httpPost) {
-
-        Preconditions.checkNotNull(httpPost);
-
-        ObjectNode result = null;
-
-        CloseableHttpResponse response = null;
-
-        String entityString;
-        try {
-            response = httpclient.execute(httpPost);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine() != null && response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK && entity != null) {
-                entityString = EntityUtils.toString(entity);
-                result = mapper.readValue(entityString, ObjectNode.class);
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
-        } finally {
-            try {
-                if (response != null) {
-                    response.close();
-                }
-            } catch (IOException ignored) {}
+    try {
+      String entity = mapper.writeValueAsString(payload);
+      httppost.setEntity(new StringEntity(entity));
+    } catch (JsonProcessingException | UnsupportedEncodingException ex) {
+      LOGGER.warn(ex.getMessage());
+    }
+    return httppost;
+  }
+
+  protected ObjectNode executePost(HttpPost httpPost) {
+
+    Preconditions.checkNotNull(httpPost);
+
+    ObjectNode result = null;
+
+    CloseableHttpResponse response = null;
+
+    String entityString;
+    try {
+      response = httpclient.execute(httpPost);
+      HttpEntity entity = response.getEntity();
+      // TODO: handle retry
+      if (response.getStatusLine() != null && response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK && entity != null) {
+        entityString = EntityUtils.toString(entity);
+        result = mapper.readValue(entityString, ObjectNode.class);
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
         }
-        return result;
+      } catch (IOException ignored) {
+        LOGGER.trace("IOException", ignored);
+      }
     }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-        mapper = StreamsJacksonMapper.getInstance();
+    mapper = StreamsJacksonMapper.getInstance();
 
-        uriBuilder = new URIBuilder()
-                .setScheme(this.configuration.getProtocol())
-                .setHost(this.configuration.getHostname())
-                .setPort(this.configuration.getPort().intValue())
-                .setPath(this.configuration.getResourcePath());
-
-        if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
-            uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
-        if( !Strings.isNullOrEmpty(configuration.getUsername())
-                && !Strings.isNullOrEmpty(configuration.getPassword())) {
-            String string = configuration.getUsername() + ":" + configuration.getPassword();
-            authHeader = Base64.encodeBase64String(string.getBytes());
-        }
-
-        httpclient = HttpClients.createDefault();
+    uriBuilder = new URIBuilder()
+        .setScheme(this.configuration.getProtocol())
+        .setHost(this.configuration.getHostname())
+        .setPort(this.configuration.getPort().intValue())
+        .setPath(this.configuration.getResourcePath());
 
+    if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) {
+      uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
+    }
+    if ( !Strings.isNullOrEmpty(configuration.getUsername())        && !Strings.isNullOrEmpty(configuration.getPassword())) {
+      String string = configuration.getUsername() + ":" + configuration.getPassword();
+      authHeader = Base64.encodeBase64String(string.getBytes());
     }
 
-    @Override
-    public void cleanUp() {
-
-        LOGGER.info("shutting down SimpleHTTPPostPersistWriter");
-        try {
-            httpclient.close();
-        } catch (IOException e) {
-            LOGGER.error(e.getMessage());
-        } finally {
-            try {
-                httpclient.close();
-            } catch (IOException e) {
-                LOGGER.error(e.getMessage());
-            } finally {
-                httpclient = null;
-            }
-        }
+    httpclient = HttpClients.createDefault();
+
+  }
+
+  @Override
+  public void cleanUp() {
+
+    LOGGER.info("shutting down SimpleHTTPPostPersistWriter");
+    try {
+      httpclient.close();
+    } catch (IOException ex) {
+      LOGGER.error(ex.getMessage());
+    } finally {
+      try {
+        httpclient.close();
+      } catch (IOException e2) {
+        LOGGER.error(e2.getMessage());
+      } finally {
+        httpclient = null;
+      }
     }
+  }
 }