You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/05/02 19:06:23 UTC

metron git commit: METRON-1543 Unable to Set Parser Output Topic in Sensor Config (nickwallen) closes apache/metron#1007

Repository: metron
Updated Branches:
  refs/heads/master 2b4f0b840 -> 3bb926df5


METRON-1543 Unable to Set Parser Output Topic in Sensor Config (nickwallen) closes apache/metron#1007


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bb926df
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bb926df
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bb926df

Branch: refs/heads/master
Commit: 3bb926df5d253a907bbf8dab4b76b78dd32993ea
Parents: 2b4f0b8
Author: nickwallen <ni...@nickallen.org>
Authored: Wed May 2 15:06:03 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed May 2 15:06:03 2018 -0400

----------------------------------------------------------------------
 .../org/apache/metron/common/Constants.java     |  10 +-
 .../configuration/SensorParserConfig.java       | 410 +++++++++++--------
 .../parsers/topology/ParserTopologyBuilder.java | 139 ++++---
 .../parsers/topology/ParserTopologyCLI.java     | 147 +++++--
 .../components/ParserTopologyComponent.java     |  80 ++--
 .../parsers/topology/ParserTopologyCLITest.java | 122 ++++--
 ...pleHbaseEnrichmentWriterIntegrationTest.java |  69 ++--
 .../integration/WriterBoltIntegrationTest.java  | 109 ++---
 .../apache/metron/writer/kafka/KafkaWriter.java |   5 +
 9 files changed, 676 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index b939a92..12b541c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -17,9 +17,7 @@
  */
 package org.apache.metron.common;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class Constants {
@@ -37,9 +35,17 @@ public class Constants {
   public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
   public static final String GUID = "guid";
 
+  /**
+   * The key in the global configuration that defines the global parser error topic.
+   *
+   * <p>This value is used only if the error topic is left undefined in a sensor's parser configuration.
+   */
+  public static final String PARSER_ERROR_TOPIC_GLOBALS_KEY = "parser.error.topic";
+
   public interface Field {
     String getName();
   }
+
   public enum Fields implements Field {
      SRC_ADDR("ip_src_addr")
     ,SRC_PORT("ip_src_port")

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index d347481..1dfb045 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -18,6 +18,9 @@
 package org.apache.metron.common.configuration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
@@ -27,35 +30,171 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * The configuration object that defines a parser for a given sensor.  Each
+ * sensor has its own parser configuration.
+ */
 public class SensorParserConfig implements Serializable {
 
+  /**
+   * The class name of the parser.
+   */
   private String parserClassName;
+
+  /**
+   * Allows logic to be defined to filter or ignore messages.  Messages that have been
+   * filtered will not be parsed.
+   *
+   * This should be a fully qualified name of a class that implements the
+   * org.apache.metron.parsers.interfaces.MessageFilter interface.
+   */
   private String filterClassName;
+
+  /**
+   * The input topic containing the sensor telemetry to parse.
+   */
   private String sensorTopic;
+
+  /**
+   * The output topic where the parsed telemetry will be written.
+   */
+  private String outputTopic;
+
+  /**
+   * The error topic where errors are written to.
+   */
+  private String errorTopic;
+
+  /**
+   * The fully qualified name of a class used to write messages
+   * to the output topic.
+   *
+   * <p>A sensible default is provided.
+   */
   private String writerClassName;
+
+  /**
+   * The fully qualified name of a class used to write messages
+   * to the error topic.
+   *
+   * <p>A sensible default is provided.
+   */
   private String errorWriterClassName;
-  private String invalidWriterClassName;
+
+  /**
+   * Determines if parser metadata is made available to the parser's field
+   * transformations. If true, the parser field transformations can access
+   * parser metadata values.
+   *
+   * <p>By default, this is false and parser metadata is not available
+   * to the field transformations.
+   */
   private Boolean readMetadata = false;
+
+  /**
+   * Determines if parser metadata is automatically merged into the message.  If
+   * true, parser metadata values will appear as fields within the message.
+   *
+   * <p>By default, this is false and metadata is not merged.
+   */
   private Boolean mergeMetadata = false;
+
+  /**
+   * The number of workers for the topology.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer numWorkers = null;
+
+  /**
+   * The number of ackers for the topology.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer numAckers= null;
+
+  /**
+   * The parallelism of the Kafka spout.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer spoutParallelism = 1;
+
+  /**
+   * The number of tasks for the Kafka spout.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer spoutNumTasks = 1;
+
+  /**
+   * The parallelism of the parser bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer parserParallelism = 1;
+
+  /**
+   * The number of tasks for the parser bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer parserNumTasks = 1;
+
+  /**
+   * The parallelism of the error writer bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer errorWriterParallelism = 1;
+
+  /**
+   * The number of tasks for the error writer bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer errorWriterNumTasks = 1;
-  private Map<String, Object> cacheConfig = new HashMap<>();
+
+  /**
+   * Configuration properties passed to the Kafka spout.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Map<String, Object> spoutConfig = new HashMap<>();
+
+  /**
+   * The Kafka security protocol.
+   *
+   * <p>This property can be overridden on the CLI.  This property can also be overridden by the spout config.
+   */
   private String securityProtocol = null;
+
+  /**
+   * Configuration properties passed to the storm topology.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Map<String, Object> stormConfig = new HashMap<>();
 
   /**
-   * Cache config for stellar field transformations.
-   * * stellar.cache.maxSize - The maximum number of elements in the cache.
-   * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
-   * @return
+   * Configuration for the parser.
+   */
+  private Map<String, Object> parserConfig = new HashMap<>();
+
+  /**
+   * The field transformations applied to the parsed messages. These allow fields
+   * of the parsed message to be transformed.
    */
+  private List<FieldTransformer> fieldTransformations = new ArrayList<>();
+
+  /**
+   * Configures the cache that backs stellar field transformations.
+   *
+   * <li>stellar.cache.maxSize - The maximum number of elements in the cache.
+   * <li>stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
+   */
+  private Map<String, Object> cacheConfig = new HashMap<>();
+
   public Map<String, Object> getCacheConfig() {
     return cacheConfig;
   }
@@ -64,10 +203,6 @@ public class SensorParserConfig implements Serializable {
     this.cacheConfig = cacheConfig;
   }
 
-  /**
-   * Return the number of workers for the topology.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getNumWorkers() {
     return numWorkers;
   }
@@ -76,10 +211,6 @@ public class SensorParserConfig implements Serializable {
     this.numWorkers = numWorkers;
   }
 
-  /**
-   * Return the number of ackers for the topology.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getNumAckers() {
     return numAckers;
   }
@@ -88,10 +219,6 @@ public class SensorParserConfig implements Serializable {
     this.numAckers = numAckers;
   }
 
-  /**
-   * Return the spout parallelism.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getSpoutParallelism() {
     return spoutParallelism;
   }
@@ -100,10 +227,6 @@ public class SensorParserConfig implements Serializable {
     this.spoutParallelism = spoutParallelism;
   }
 
-  /**
-   * Return the spout num tasks.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getSpoutNumTasks() {
     return spoutNumTasks;
   }
@@ -112,10 +235,6 @@ public class SensorParserConfig implements Serializable {
     this.spoutNumTasks = spoutNumTasks;
   }
 
-  /**
-   * Return the parser parallelism.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getParserParallelism() {
     return parserParallelism;
   }
@@ -124,10 +243,6 @@ public class SensorParserConfig implements Serializable {
     this.parserParallelism = parserParallelism;
   }
 
-  /**
-   * Return the parser number of tasks.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getParserNumTasks() {
     return parserNumTasks;
   }
@@ -136,10 +251,6 @@ public class SensorParserConfig implements Serializable {
     this.parserNumTasks = parserNumTasks;
   }
 
-  /**
-   * Return the error writer bolt parallelism.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getErrorWriterParallelism() {
     return errorWriterParallelism;
   }
@@ -148,10 +259,6 @@ public class SensorParserConfig implements Serializable {
     this.errorWriterParallelism = errorWriterParallelism;
   }
 
-  /**
-   * Return the error writer bolt number of tasks.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getErrorWriterNumTasks() {
     return errorWriterNumTasks;
   }
@@ -160,10 +267,6 @@ public class SensorParserConfig implements Serializable {
     this.errorWriterNumTasks = errorWriterNumTasks;
   }
 
-  /**
-   * Return the spout config.  This includes kafka properties.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Map<String, Object> getSpoutConfig() {
     return spoutConfig;
   }
@@ -172,11 +275,6 @@ public class SensorParserConfig implements Serializable {
     this.spoutConfig = spoutConfig;
   }
 
-  /**
-   * Return security protocol to use.  This property will be used for the parser unless overridden on the CLI.
-   * The order of precedence is CLI > spout config > config in the sensor parser config.
-   * @return
-   */
   public String getSecurityProtocol() {
     return securityProtocol;
   }
@@ -185,10 +283,6 @@ public class SensorParserConfig implements Serializable {
     this.securityProtocol = securityProtocol;
   }
 
-  /**
-   * Return Storm topologyconfig.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Map<String, Object> getStormConfig() {
     return stormConfig;
   }
@@ -197,10 +291,6 @@ public class SensorParserConfig implements Serializable {
     this.stormConfig = stormConfig;
   }
 
-  /**
-   * Return whether or not to merge metadata sent into the message.  If true, then metadata become proper fields.
-   * @return
-   */
   public Boolean getMergeMetadata() {
     return mergeMetadata;
   }
@@ -209,10 +299,6 @@ public class SensorParserConfig implements Serializable {
     this.mergeMetadata = mergeMetadata;
   }
 
-  /**
-   * Return whether or not to read metadata at all.
-   * @return
-   */
   public Boolean getReadMetadata() {
     return readMetadata;
   }
@@ -229,22 +315,13 @@ public class SensorParserConfig implements Serializable {
     this.errorWriterClassName = errorWriterClassName;
   }
 
-  public String getInvalidWriterClassName() {
-    return invalidWriterClassName;
-  }
-
-  public void setInvalidWriterClassName(String invalidWriterClassName) {
-    this.invalidWriterClassName = invalidWriterClassName;
-  }
-
   public String getWriterClassName() {
     return writerClassName;
   }
+
   public void setWriterClassName(String classNames) {
     this.writerClassName = classNames;
   }
-  private Map<String, Object> parserConfig = new HashMap<>();
-  private List<FieldTransformer> fieldTransformations = new ArrayList<>();
 
   public List<FieldTransformer> getFieldTransformations() {
     return fieldTransformations;
@@ -278,6 +355,22 @@ public class SensorParserConfig implements Serializable {
     this.sensorTopic = sensorTopic;
   }
 
+  public String getOutputTopic() {
+    return outputTopic;
+  }
+
+  public void setOutputTopic(String outputTopic) {
+    this.outputTopic = outputTopic;
+  }
+
+  public String getErrorTopic() {
+    return errorTopic;
+  }
+
+  public void setErrorTopic(String errorTopic) {
+    this.errorTopic = errorTopic;
+  }
+
   public Map<String, Object> getParserConfig() {
     return parserConfig;
   }
@@ -298,112 +391,103 @@ public class SensorParserConfig implements Serializable {
     }
   }
 
-
   public String toJSON() throws JsonProcessingException {
     return JSONUtils.INSTANCE.toJSON(this, true);
   }
 
   @Override
-  public String toString() {
-    return "SensorParserConfig{" +
-            "parserClassName='" + parserClassName + '\'' +
-            ", filterClassName='" + filterClassName + '\'' +
-            ", sensorTopic='" + sensorTopic + '\'' +
-            ", writerClassName='" + writerClassName + '\'' +
-            ", errorWriterClassName='" + errorWriterClassName + '\'' +
-            ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
-            ", readMetadata=" + readMetadata +
-            ", mergeMetadata=" + mergeMetadata +
-            ", numWorkers=" + numWorkers +
-            ", numAckers=" + numAckers +
-            ", spoutParallelism=" + spoutParallelism +
-            ", spoutNumTasks=" + spoutNumTasks +
-            ", parserParallelism=" + parserParallelism +
-            ", parserNumTasks=" + parserNumTasks +
-            ", errorWriterParallelism=" + errorWriterParallelism +
-            ", errorWriterNumTasks=" + errorWriterNumTasks +
-            ", spoutConfig=" + spoutConfig +
-            ", securityProtocol='" + securityProtocol + '\'' +
-            ", stormConfig=" + stormConfig +
-            ", parserConfig=" + parserConfig +
-            ", fieldTransformations=" + fieldTransformations +
-            '}';
-  }
-
-  @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    SensorParserConfig that = (SensorParserConfig) o;
+    if (this == o) {
+      return true;
+    }
 
-    if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null)
-      return false;
-    if (getFilterClassName() != null ? !getFilterClassName().equals(that.getFilterClassName()) : that.getFilterClassName() != null)
-      return false;
-    if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
-      return false;
-    if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
-      return false;
-    if (getErrorWriterClassName() != null ? !getErrorWriterClassName().equals(that.getErrorWriterClassName()) : that.getErrorWriterClassName() != null)
-      return false;
-    if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null)
-      return false;
-    if (getReadMetadata() != null ? !getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != null)
-      return false;
-    if (getMergeMetadata() != null ? !getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() != null)
+    if (o == null || getClass() != o.getClass()) {
       return false;
-    if (getNumWorkers() != null ? !getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null)
-      return false;
-    if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : that.getNumAckers() != null)
-      return false;
-    if (getSpoutParallelism() != null ? !getSpoutParallelism().equals(that.getSpoutParallelism()) : that.getSpoutParallelism() != null)
-      return false;
-    if (getSpoutNumTasks() != null ? !getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() != null)
-      return false;
-    if (getParserParallelism() != null ? !getParserParallelism().equals(that.getParserParallelism()) : that.getParserParallelism() != null)
-      return false;
-    if (getParserNumTasks() != null ? !getParserNumTasks().equals(that.getParserNumTasks()) : that.getParserNumTasks() != null)
-      return false;
-    if (getErrorWriterParallelism() != null ? !getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : that.getErrorWriterParallelism() != null)
-      return false;
-    if (getErrorWriterNumTasks() != null ? !getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : that.getErrorWriterNumTasks() != null)
-      return false;
-    if (getSpoutConfig() != null ? !getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null)
-      return false;
-    if (getSecurityProtocol() != null ? !getSecurityProtocol().equals(that.getSecurityProtocol()) : that.getSecurityProtocol() != null)
-      return false;
-    if (getStormConfig() != null ? !getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null)
-      return false;
-    if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
-      return false;
-    return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null;
+    }
 
+    SensorParserConfig that = (SensorParserConfig) o;
+    return new EqualsBuilder()
+            .append(parserClassName, that.parserClassName)
+            .append(filterClassName, that.filterClassName)
+            .append(sensorTopic, that.sensorTopic)
+            .append(outputTopic, that.outputTopic)
+            .append(errorTopic, that.errorTopic)
+            .append(writerClassName, that.writerClassName)
+            .append(errorWriterClassName, that.errorWriterClassName)
+            .append(readMetadata, that.readMetadata)
+            .append(mergeMetadata, that.mergeMetadata)
+            .append(numWorkers, that.numWorkers)
+            .append(numAckers, that.numAckers)
+            .append(spoutParallelism, that.spoutParallelism)
+            .append(spoutNumTasks, that.spoutNumTasks)
+            .append(parserParallelism, that.parserParallelism)
+            .append(parserNumTasks, that.parserNumTasks)
+            .append(errorWriterParallelism, that.errorWriterParallelism)
+            .append(errorWriterNumTasks, that.errorWriterNumTasks)
+            .append(spoutConfig, that.spoutConfig)
+            .append(securityProtocol, that.securityProtocol)
+            .append(stormConfig, that.stormConfig)
+            .append(cacheConfig, that.cacheConfig)
+            .append(parserConfig, that.parserConfig)
+            .append(fieldTransformations, that.fieldTransformations)
+            .isEquals();
   }
 
   @Override
   public int hashCode() {
-    int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
-    result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
-    result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
-    result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
-    result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0);
-    result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0);
-    result = 31 * result + (getReadMetadata() != null ? getReadMetadata().hashCode() : 0);
-    result = 31 * result + (getMergeMetadata() != null ? getMergeMetadata().hashCode() : 0);
-    result = 31 * result + (getNumWorkers() != null ? getNumWorkers().hashCode() : 0);
-    result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() : 0);
-    result = 31 * result + (getSpoutParallelism() != null ? getSpoutParallelism().hashCode() : 0);
-    result = 31 * result + (getSpoutNumTasks() != null ? getSpoutNumTasks().hashCode() : 0);
-    result = 31 * result + (getParserParallelism() != null ? getParserParallelism().hashCode() : 0);
-    result = 31 * result + (getParserNumTasks() != null ? getParserNumTasks().hashCode() : 0);
-    result = 31 * result + (getErrorWriterParallelism() != null ? getErrorWriterParallelism().hashCode() : 0);
-    result = 31 * result + (getErrorWriterNumTasks() != null ? getErrorWriterNumTasks().hashCode() : 0);
-    result = 31 * result + (getSpoutConfig() != null ? getSpoutConfig().hashCode() : 0);
-    result = 31 * result + (getSecurityProtocol() != null ? getSecurityProtocol().hashCode() : 0);
-    result = 31 * result + (getStormConfig() != null ? getStormConfig().hashCode() : 0);
-    result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
-    result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
-    return result;
+    return new HashCodeBuilder(17, 37)
+            .append(parserClassName)
+            .append(filterClassName)
+            .append(sensorTopic)
+            .append(outputTopic)
+            .append(errorTopic)
+            .append(writerClassName)
+            .append(errorWriterClassName)
+            .append(readMetadata)
+            .append(mergeMetadata)
+            .append(numWorkers)
+            .append(numAckers)
+            .append(spoutParallelism)
+            .append(spoutNumTasks)
+            .append(parserParallelism)
+            .append(parserNumTasks)
+            .append(errorWriterParallelism)
+            .append(errorWriterNumTasks)
+            .append(spoutConfig)
+            .append(securityProtocol)
+            .append(stormConfig)
+            .append(cacheConfig)
+            .append(parserConfig)
+            .append(fieldTransformations)
+            .toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+            .append("parserClassName", parserClassName)
+            .append("filterClassName", filterClassName)
+            .append("sensorTopic", sensorTopic)
+            .append("outputTopic", outputTopic)
+            .append("errorTopic", errorTopic)
+            .append("writerClassName", writerClassName)
+            .append("errorWriterClassName", errorWriterClassName)
+            .append("readMetadata", readMetadata)
+            .append("mergeMetadata", mergeMetadata)
+            .append("numWorkers", numWorkers)
+            .append("numAckers", numAckers)
+            .append("spoutParallelism", spoutParallelism)
+            .append("spoutNumTasks", spoutNumTasks)
+            .append("parserParallelism", parserParallelism)
+            .append("parserNumTasks", parserNumTasks)
+            .append("errorWriterParallelism", errorWriterParallelism)
+            .append("errorWriterNumTasks", errorWriterNumTasks)
+            .append("spoutConfig", spoutConfig)
+            .append("securityProtocol", securityProtocol)
+            .append("stormConfig", stormConfig)
+            .append("cacheConfig", cacheConfig)
+            .append("parserConfig", parserConfig)
+            .append("fieldTransformations", fieldTransformations)
+            .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 2865dd6..cd4ad50 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -84,7 +84,7 @@ public class ParserTopologyBuilder {
    * @param errorWriterNumTasksSupplier      Supplier for the number of tasks for the bolt that handles errors
    * @param kafkaSpoutConfigSupplier         Supplier for the configuration options for the kafka spout
    * @param securityProtocolSupplier         Supplier for the security protocol
-   * @param outputTopic                      The output kafka topic
+   * @param outputTopicSupplier              Supplier for the output kafka topic
    * @param stormConfigSupplier              Supplier for the storm config
    * @return A Storm topology that parses telemetry data received from an external sensor
    * @throws Exception
@@ -100,7 +100,8 @@ public class ParserTopologyBuilder {
                                       ValueSupplier<Integer> errorWriterNumTasksSupplier,
                                       ValueSupplier<Map> kafkaSpoutConfigSupplier,
                                       ValueSupplier<String> securityProtocolSupplier,
-                                      Optional<String> outputTopic,
+                                      ValueSupplier<String> outputTopicSupplier,
+                                      ValueSupplier<String> errorTopicSupplier,
                                       ValueSupplier<Config> stormConfigSupplier
   ) throws Exception {
 
@@ -113,24 +114,27 @@ public class ParserTopologyBuilder {
     int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class);
     int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class);
     int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class);
+    String outputTopic = outputTopicSupplier.get(parserConfig, String.class);
+
     Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
     Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
+    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig), parserConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
             .setNumTasks(spoutNumTasks);
 
     // create the parser bolt
-    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic);
+    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, Optional.of(outputTopic));
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
             .setNumTasks(parserNumTasks)
             .localOrShuffleGrouping("kafkaSpout");
 
     // create the error bolt, if needed
     if (errorWriterNumTasks > 0) {
-      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
+      String errorTopic = errorTopicSupplier.get(parserConfig, String.class);
+      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, errorTopic);
       builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
               .setNumTasks(errorWriterNumTasks)
               .localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
@@ -176,24 +180,35 @@ public class ParserTopologyBuilder {
                                          );
   }
 
-  private static KafkaWriter createKafkaWriter( Optional<String> broker
-                                              , String zkQuorum
-                                              , Optional<String> securityProtocol
-                                              )
-  {
-    KafkaWriter ret = null;
+  /**
+   * Create a Kafka writer.
+   *
+   * @param broker An optional URL to the Kafka brokers.
+   * @param zkQuorum The URL to Zookeeper.
+   * @param securityProtocol An optional security protocol in use.
+   * @return
+   */
+  private static KafkaWriter createKafkaWriter(Optional<String> broker,
+                                               String zkQuorum,
+                                               Optional<String> securityProtocol) {
+    KafkaWriter writer = new KafkaWriter();
+
+    // cluster URL; either broker or zookeeper
     if(broker.isPresent()) {
-      ret = new KafkaWriter(broker.get());
-    }
-    else {
-      ret = new KafkaWriter().withZkQuorum(zkQuorum);
+      writer.withBrokerUrl(broker.get());
+
+    } else {
+      writer.withZkQuorum(zkQuorum);
     }
+
+    // security protocol
     if(securityProtocol.isPresent()) {
       HashMap<String, Object> config = new HashMap<>();
       config.put("security.protocol", securityProtocol.get());
-      ret.withProducerConfigs(config);
+      writer.withProducerConfigs(config);
     }
-    return ret;
+
+    return writer;
   }
 
   /**
@@ -206,27 +221,31 @@ public class ParserTopologyBuilder {
    * @param parserConfig
    * @return A Storm bolt that parses input from a sensor
    */
-  private static ParserBolt createParserBolt( String zookeeperUrl
-                                            , Optional<String> brokerUrl
-                                            , String sensorType
-                                            , Optional<String> securityProtocol
-                                            , ParserConfigurations configs
-                                            , SensorParserConfig parserConfig
-                                            , Optional<String> outputTopic
-                                            )
-  {
+  private static ParserBolt createParserBolt( String zookeeperUrl,
+                                              Optional<String> brokerUrl,
+                                              String sensorType,
+                                              Optional<String> securityProtocol,
+                                              ParserConfigurations configs,
+                                              SensorParserConfig parserConfig,
+                                              Optional<String> outputTopic) {
 
     // create message parser
     MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
     parser.configure(parserConfig.getParserConfig());
 
-    // create writer - if not configured uses a sensible default
-    AbstractWriter writer = parserConfig.getWriterClassName() == null ?
-            createKafkaWriter( brokerUrl
-                             , zookeeperUrl
-                             , securityProtocol
-                             ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) :
-            ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    // create a writer
+    AbstractWriter writer;
+    if(parserConfig.getWriterClassName() == null) {
+
+      // if not configured, use a sensible default
+      writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+              .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
+
+    } else {
+      writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    }
+
+    // configure it
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
     // create a writer handler
@@ -238,37 +257,47 @@ public class ParserTopologyBuilder {
   /**
    * Create a bolt that handles error messages.
    *
-   * @param zookeeperUrl    Kafka zookeeper URL
-   * @param brokerUrl    Kafka Broker URL
-   * @param sensorType   Type of sensor that is being consumed.
-   * @param securityProtocol   Security protocol used (if any)
+   * @param zookeeperUrl Kafka zookeeper URL
+   * @param brokerUrl Kafka Broker URL
+   * @param sensorType Type of sensor that is being consumed.
+   * @param securityProtocol Security protocol used (if any)
    * @param configs
-   * @param parserConfig
+   * @param parserConfig The sensor's parser configuration.
    * @return A Storm bolt that handles error messages.
    */
-  private static WriterBolt createErrorBolt( String zookeeperUrl
-                                           , Optional<String> brokerUrl
-                                           , String sensorType
-                                           , Optional<String> securityProtocol
-                                           , ParserConfigurations configs
-                                           , SensorParserConfig parserConfig
-                                           )
-  {
+  private static WriterBolt createErrorBolt( String zookeeperUrl,
+                                             Optional<String> brokerUrl,
+                                             String sensorType,
+                                             Optional<String> securityProtocol,
+                                             ParserConfigurations configs,
+                                             SensorParserConfig parserConfig,
+                                             String errorTopic) {
+
+    // create a writer
+    AbstractWriter writer;
+    if (parserConfig.getErrorWriterClassName() == null) {
+
+      if(errorTopic == null) {
+        errorTopic = (String) configs.getGlobalConfig().get(Constants.PARSER_ERROR_TOPIC_GLOBALS_KEY);
+      }
+
+      // if not configured, uses a sensible default
+      writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+              .withTopic(errorTopic)
+              .withConfigPrefix("error");
+
+    } else {
+      writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    }
 
-    // create writer - if not configured uses a sensible default
-    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ?
-              createKafkaWriter( brokerUrl
-                               , zookeeperUrl
-                               , securityProtocol
-                               ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic"))
-                                .withConfigPrefix("error")
-            : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    // configure it
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
     // create a writer handler
     WriterHandler writerHandler = createWriterHandler(writer);
 
-    return new WriterBolt(writerHandler, configs, sensorType).withErrorType(Constants.ErrorType.PARSER_ERROR);
+    return new WriterBolt(writerHandler, configs, sensorType)
+            .withErrorType(Constants.ErrorType.PARSER_ERROR);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 3824212..f60ff44 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -310,34 +310,40 @@ public class ParserTopologyCLI {
     String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
 
     /*
-    It bears mentioning why we're creating this ValueSupplier indirection here.
-    As a separation of responsibilities, the CLI class defines the order of precedence
-    for the various topological and structural properties for creating a parser.  This is
-    desirable because there are now (i.e. integration tests)
-    and may be in the future (i.e. a REST service to start parsers without using the CLI)
-    other mechanisms to construct parser topologies.  It's sensible to split those concerns..
-
-    Unfortunately, determining the structural parameters for a parser requires interacting with
-    external services (e.g. zookeeper) that are set up well within the ParserTopology class.
-    Rather than pulling the infrastructure to interact with those services out and moving it into the
-    CLI class and breaking that separation of concerns, we've created a supplier
-    indirection where are providing the logic as to how to create precedence in the CLI class
-    without owning the responsibility of constructing the infrastructure where the values are
-    necessarily supplied.
-
+     * It bears mentioning why we're creating this ValueSupplier indirection here.
+     * As a separation of responsibilities, the CLI class defines the order of precedence
+     * for the various topological and structural properties for creating a parser.  This is
+     * desirable because there are now (i.e. integration tests)
+     * and may be in the future (i.e. a REST service to start parsers without using the CLI)
+     * other mechanisms to construct parser topologies.  It's sensible to split those concerns..
+     *
+     * Unfortunately, determining the structural parameters for a parser requires interacting with
+     * external services (e.g. zookeeper) that are set up well within the ParserTopology class.
+     * Rather than pulling the infrastructure to interact with those services out and moving it into the
+     * CLI class and breaking that separation of concerns, we've created a supplier
+     * indirection where are providing the logic as to how to create precedence in the CLI class
+     * without owning the responsibility of constructing the infrastructure where the values are
+     * necessarily supplied.
+     *
      */
+
+    // kafka spout parallelism
     ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
       if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
       }
       return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
     };
+
+    // kafka spout number of tasks
     ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
       if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
       }
       return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
     };
+
+    // parser bolt parallelism
     ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
       if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
@@ -345,6 +351,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
     };
 
+    // parser bolt number of tasks
     ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
       if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
@@ -352,6 +359,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
     };
 
+    // error bolt parallelism
     ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
       if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
@@ -359,6 +367,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
     };
 
+    // error bolt number of tasks
     ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
       if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
@@ -366,6 +375,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
     };
 
+    // kafka spout config
     ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
         return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
@@ -373,6 +383,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>());
     };
 
+    // security protocol
     ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
       Optional<String> sp = Optional.empty();
       if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
@@ -384,6 +395,7 @@ public class ParserTopologyCLI {
       return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
     };
 
+    // storm configuration
     ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
       Map<String, Object> c = parserConfig.getStormConfig();
       Config finalConfig = new Config();
@@ -399,39 +411,84 @@ public class ParserTopologyCLI {
       return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
     };
 
-    Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
+    // output topic
+    ValueSupplier<String> outputTopic = (parserConfig, clazz) -> {
+      String topic;
+
+      if(ParserOptions.OUTPUT_TOPIC.has(cmd)) {
+        topic = ParserOptions.OUTPUT_TOPIC.get(cmd);
+
+      } else if(parserConfig.getOutputTopic() != null) {
+        topic = parserConfig.getOutputTopic();
+
+      } else {
+        topic = Constants.ENRICHMENT_TOPIC;
+      }
+
+      return topic;
+    };
+
+    // error topic
+    ValueSupplier<String> errorTopic = (parserConfig, clazz) -> {
+      String topic;
+
+      if(parserConfig.getErrorTopic() != null) {
+        topic = parserConfig.getErrorTopic();
+
+      } else {
+        // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
+        topic = null;
+      }
+
+      return topic;
+    };
 
-    return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic);
+    return getParserTopology(
+            zookeeperUrl,
+            brokerUrl,
+            sensorType,
+            spoutParallelism,
+            spoutNumTasks,
+            parserParallelism,
+            parserNumTasks,
+            errorParallelism,
+            errorNumTasks,
+            spoutConfig,
+            securityProtocol,
+            stormConf,
+            outputTopic,
+            errorTopic);
   }
 
-  protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl
-                                                                  , Optional<String> brokerUrl
-                                                                  , String sensorType
-                                                                  , ValueSupplier<Integer> spoutParallelism
-                                                                  , ValueSupplier<Integer> spoutNumTasks
-                                                                  , ValueSupplier<Integer> parserParallelism
-                                                                  , ValueSupplier<Integer> parserNumTasks
-                                                                  , ValueSupplier<Integer> errorParallelism
-                                                                  , ValueSupplier<Integer> errorNumTasks
-                                                                  , ValueSupplier<Map> spoutConfig
-                                                                  , ValueSupplier<String> securityProtocol
-                                                                  , ValueSupplier<Config> stormConf
-                                                                  , Optional<String> outputTopic
-                                                                  ) throws Exception
-  {
-    return ParserTopologyBuilder.build(zookeeperUrl,
-                brokerUrl,
-                sensorType,
-                spoutParallelism,
-                spoutNumTasks,
-                parserParallelism,
-                parserNumTasks,
-                errorParallelism,
-                errorNumTasks,
-                spoutConfig,
-                securityProtocol,
-                outputTopic,
-                stormConf
+  protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl,
+                                                                    Optional<String> brokerUrl,
+                                                                    String sensorType,
+                                                                    ValueSupplier<Integer> spoutParallelism,
+                                                                    ValueSupplier<Integer> spoutNumTasks,
+                                                                    ValueSupplier<Integer> parserParallelism,
+                                                                    ValueSupplier<Integer> parserNumTasks,
+                                                                    ValueSupplier<Integer> errorParallelism,
+                                                                    ValueSupplier<Integer> errorNumTasks,
+                                                                    ValueSupplier<Map> spoutConfig,
+                                                                    ValueSupplier<String> securityProtocol,
+                                                                    ValueSupplier<Config> stormConf,
+                                                                    ValueSupplier<String> outputTopic,
+                                                                    ValueSupplier<String> errorTopic) throws Exception {
+    return ParserTopologyBuilder.build(
+            zookeeperUrl,
+            brokerUrl,
+            sensorType,
+            spoutParallelism,
+            spoutNumTasks,
+            parserParallelism,
+            parserNumTasks,
+            errorParallelism,
+            errorNumTasks,
+            spoutConfig,
+            securityProtocol,
+            outputTopic,
+            errorTopic,
+            stormConf
         );
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 63d9e52..7f40684 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,14 +17,6 @@
  */
 package org.apache.metron.parsers.integration.components;
 
-import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
-import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.ZKServerComponent;
@@ -32,24 +24,37 @@ import org.apache.metron.parsers.topology.ParserTopologyBuilder;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.KillOptions;
-import org.apache.storm.topology.TopologyBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
 public class ParserTopologyComponent implements InMemoryComponent {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private Properties topologyProperties;
   private String brokerUrl;
   private String sensorType;
   private LocalCluster stormCluster;
   private String outputTopic;
+  private String errorTopic;
 
   public static class Builder {
+
     Properties topologyProperties;
     String brokerUrl;
     String sensorType;
     String outputTopic;
+    String errorTopic;
+
     public Builder withTopologyProperties(Properties topologyProperties) {
       this.topologyProperties = topologyProperties;
       return this;
@@ -68,16 +73,31 @@ public class ParserTopologyComponent implements InMemoryComponent {
       return this;
     }
 
+    public Builder withErrorTopic(String topic) {
+      this.errorTopic = topic;
+      return this;
+    }
+
     public ParserTopologyComponent build() {
-      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic);
+
+      if(sensorType == null) {
+        throw new IllegalArgumentException("The sensor type must be defined.");
+      }
+
+      if(outputTopic == null) {
+        throw new IllegalArgumentException("The output topic must be defined.");
+      }
+
+      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic);
     }
   }
 
-  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) {
+  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) {
     this.topologyProperties = topologyProperties;
     this.brokerUrl = brokerUrl;
     this.sensorType = sensorType;
     this.outputTopic = outputTopic;
+    this.errorTopic = errorTopic;
   }
 
   public void updateSensorType(String sensorType) {
@@ -89,24 +109,26 @@ public class ParserTopologyComponent implements InMemoryComponent {
     try {
       final Map<String, Object> stormConf = new HashMap<>();
       stormConf.put(Config.TOPOLOGY_DEBUG, true);
-      ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
-                                                                   , Optional.ofNullable(brokerUrl)
-                                                                   , sensorType
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> new HashMap<>()
-                                                                   , (x,y) -> null
-                                                                   , Optional.ofNullable(outputTopic)
-                                                                   , (x,y) -> {
-                                                                      Config c = new Config();
-                                                                      c.putAll(stormConf);
-                                                                      return c;
-                                                                      }
-                                                                   );
+      ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
+              topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
+              Optional.ofNullable(brokerUrl),
+              sensorType,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> new HashMap<>(),
+              (x,y) -> null,
+              (x,y) -> outputTopic,
+              (x,y) -> errorTopic,
+              (x,y) -> {
+                Config c = new Config();
+                c.putAll(stormConf);
+                return c;
+              }
+      );
 
       stormCluster = new LocalCluster();
       stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 97dac5a..fcfc93b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -217,17 +217,21 @@ public class ParserTopologyCLITest {
     private Map<String, Object> spoutConfig;
     private String securityProtocol;
     private Config stormConf;
-
-    public ParserInput( ValueSupplier<Integer> spoutParallelism
-                      , ValueSupplier<Integer> spoutNumTasks
-                      , ValueSupplier<Integer> parserParallelism
-                      , ValueSupplier<Integer> parserNumTasks
-                      , ValueSupplier<Integer> errorParallelism
-                      , ValueSupplier<Integer> errorNumTasks
-                      , ValueSupplier<Map> spoutConfig
-                      , ValueSupplier<String> securityProtocol
-                      , ValueSupplier<Config> stormConf
-                      , SensorParserConfig config
+    private String outputTopic;
+    private String errorTopic;
+
+    public ParserInput(ValueSupplier<Integer> spoutParallelism,
+                       ValueSupplier<Integer> spoutNumTasks,
+                       ValueSupplier<Integer> parserParallelism,
+                       ValueSupplier<Integer> parserNumTasks,
+                       ValueSupplier<Integer> errorParallelism,
+                       ValueSupplier<Integer> errorNumTasks,
+                       ValueSupplier<Map> spoutConfig,
+                       ValueSupplier<String> securityProtocol,
+                       ValueSupplier<Config> stormConf,
+                       ValueSupplier<String> outputTopic,
+                       ValueSupplier<String> errorTopic,
+                       SensorParserConfig config
                       )
     {
       this.spoutParallelism = spoutParallelism.get(config, Integer.class);
@@ -239,6 +243,8 @@ public class ParserTopologyCLITest {
       this.spoutConfig = spoutConfig.get(config, Map.class);
       this.securityProtocol = securityProtocol.get(config, String.class);
       this.stormConf = stormConf.get(config, Config.class);
+      this.outputTopic = outputTopic.get(config, String.class);
+      this.errorTopic = outputTopic.get(config, String.class);
     }
 
     public Integer getSpoutParallelism() {
@@ -276,30 +282,43 @@ public class ParserTopologyCLITest {
     public Config getStormConf() {
       return stormConf;
     }
+
+    public String getOutputTopic() {
+      return outputTopic;
+    }
+
+    public String getErrorTopic() {
+      return errorTopic;
+    }
   }
+
   /**
-{
-  "parserClassName": "org.apache.metron.parsers.GrokParser",
-  "sensorTopic": "squid",
-  "parserConfig": {
-    "grokPath": "/patterns/squid",
-    "patternLabel": "SQUID_DELIMITED",
-    "timestampField": "timestamp"
-  },
-  "fieldTransformations" : [
-    {
-      "transformation" : "STELLAR"
-    ,"output" : [ "full_hostname", "domain_without_subdomains" ]
-    ,"config" : {
-      "full_hostname" : "URL_TO_HOST(url)"
-      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
-                }
-    }
-                           ]
-}
+   * {
+   *   "parserClassName": "org.apache.metron.parsers.GrokParser",
+   *   "sensorTopic": "squid",
+   *   "parserConfig": {
+   *      "grokPath": "/patterns/squid",
+   *      "patternLabel": "SQUID_DELIMITED",
+   *      "timestampField": "timestamp"
+   *   },
+   *   "fieldTransformations" : [
+   *      {
+   *        "transformation" : "STELLAR",
+   *        "output" : [
+   *            "full_hostname",
+   *            "domain_without_subdomains"
+   *        ],
+   *        "config" : {
+   *            "full_hostname" : "URL_TO_HOST(url)",
+   *            "domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
+   *        }
+   *      }
+   *   ]
+   * }
    */
   @Multiline
   public static String baseConfig;
+
   private static SensorParserConfig getBaseConfig() {
     try {
       return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class);
@@ -600,18 +619,37 @@ public class ParserTopologyCLITest {
     final ParserInput[] parserInput = new ParserInput[]{null};
     new ParserTopologyCLI() {
       @Override
-      protected ParserTopologyBuilder.ParserTopology getParserTopology(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, Optional<String> outputTopic) throws Exception {
-       parserInput[0] = new ParserInput( spoutParallelism
-                                        , spoutNumTasks
-                                        , parserParallelism
-                                        , parserNumTasks
-                                        , errorParallelism
-                                        , errorNumTasks
-                                        , spoutConfig
-                                        , securityProtocol
-                                        , stormConf
-                                        , config
-                                        );
+      protected ParserTopologyBuilder.ParserTopology getParserTopology(
+              String zookeeperUrl,
+              Optional<String> brokerUrl,
+              String sensorType,
+              ValueSupplier<Integer> spoutParallelism,
+              ValueSupplier<Integer> spoutNumTasks,
+              ValueSupplier<Integer> parserParallelism,
+              ValueSupplier<Integer> parserNumTasks,
+              ValueSupplier<Integer> errorParallelism,
+              ValueSupplier<Integer> errorNumTasks,
+              ValueSupplier<Map> spoutConfig,
+              ValueSupplier<String> securityProtocol,
+              ValueSupplier<Config> stormConf,
+              ValueSupplier<String> outputTopic,
+              ValueSupplier<String> errorTopic) throws Exception {
+
+       parserInput[0] = new ParserInput(
+               spoutParallelism,
+               spoutNumTasks,
+               parserParallelism,
+               parserNumTasks,
+               errorParallelism,
+               errorNumTasks,
+               spoutConfig,
+               securityProtocol,
+               stormConf,
+               outputTopic,
+               errorTopic,
+               config
+       );
+
         return null;
       }
     }.createParserTopology(cmd);

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 4f513be..49d7521 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -30,9 +30,14 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.integration.*;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
@@ -40,41 +45,52 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 
 public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationTest {
 
   /**
-   {
-    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
-   ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter"
-   ,"sensorTopic":"dummy"
-   ,"parserConfig":
-   {
-     "shew.table" : "dummy"
-    ,"shew.cf" : "cf"
-    ,"shew.keyColumns" : "col2"
-    ,"shew.enrichmentType" : "et"
-    ,"shew.hbaseProvider" : "org.apache.metron.hbase.mock.MockHBaseTableProvider"
-    ,"columns" : {
-                "col1" : 0
-               ,"col2" : 1
-               ,"col3" : 2
-                 }
-   }
-   }
+   * {
+   *     "parserClassName": "org.apache.metron.parsers.csv.CSVParser",
+   *     "writerClassName": "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter",
+   *     "sensorTopic": "dummy",
+   *     "outputTopic": "output",
+   *     "errorTopic": "error",
+   *     "parserConfig": {
+   *        "shew.table": "dummy",
+   *        "shew.cf": "cf",
+   *        "shew.keyColumns": "col2",
+   *        "shew.enrichmentType": "et",
+   *        "shew.hbaseProvider": "org.apache.metron.hbase.mock.MockHBaseTableProvider",
+   *        "columns" : {
+   *             "col1": 0,
+   *             "col2": 1,
+   *             "col3": 2
+   *        }
+   *     }
+   * }
    */
   @Multiline
-  public static String parserConfig;
+  public static String parserConfigJSON;
 
   @Test
   public void test() throws UnableToStartException, IOException {
     final String sensorType = "dummy";
+
+    // the input messages to parse
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("col11,col12,col13"));
       add(Bytes.toBytes("col21,col22,col23"));
       add(Bytes.toBytes("col31,col32,col33"));
     }};
+
+    // setup external components; kafka, zookeeper
     MockHBaseTableProvider.addToCache(sensorType, "cf");
     final Properties topologyProperties = new Properties();
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
@@ -83,17 +99,20 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
 
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
+
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
             .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+            .withParserSensorConfig(sensorType, parserConfig);
 
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
             .withSensorType(sensorType)
             .withTopologyProperties(topologyProperties)
-            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+            .withBrokerUrl(kafkaComponent.getBrokerList())
+            .withOutputTopic(parserConfig.getOutputTopic())
+            .build();
 
-    //UnitTestHelper.verboseLogging();
     ComponentRunner runner = new ComponentRunner.Builder()
             .withComponent("zk", zkServerComponent)
             .withComponent("kafka", kafkaComponent)

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index 0cfaae3..cde08bc 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -45,7 +45,6 @@ import java.io.IOException;
 import java.util.*;
 
 public class WriterBoltIntegrationTest extends BaseIntegrationTest {
-  private static final String ERROR_TOPIC = "parser_error";
 
   public static class MockValidator implements FieldValidation{
 
@@ -62,48 +61,53 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }
   }
   /**
-   {
-    "fieldValidations" : [
-        {
-          "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"
-        }
-                         ],
-   "parser.error.topic":"parser_error"
-   }
-    */
+   * {
+   *   "fieldValidations" : [
+   *      { "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator" }
+   *   ]
+   * }
+   */
   @Multiline
   public static String globalConfig;
 
   /**
-   {
-    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
-   ,"sensorTopic":"dummy"
-   ,"parserConfig":
-   {
-    "columns" : {
-                "action" : 0
-               ,"dummy" : 1
-                 }
-   }
-   }
+   * {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser",
+   *    "sensorTopic": "dummy",
+   *    "outputTopic": "output",
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "columns" : {
+   *            "action" : 0,
+   *            "dummy" : 1
+   *        }
+   *    }
+   * }
    */
   @Multiline
-  public static String parserConfig;
+  public static String parserConfigJSON;
 
   @Test
   public void test() throws UnableToStartException, IOException, ParseException {
+
     UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL);
     final String sensorType = "dummy";
+
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
+
+    // the input messages to parser
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("valid,foo"));
       add(Bytes.toBytes("invalid,foo"));
       add(Bytes.toBytes("error"));
     }};
+
+    // setup external components; zookeeper, kafka
     final Properties topologyProperties = new Properties();
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(ERROR_TOPIC, 1));
+      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
@@ -111,14 +115,16 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
             .withGlobalConfig(globalConfig)
-            .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+            .withParserSensorConfig(sensorType, parserConfig);
 
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
             .withSensorType(sensorType)
             .withTopologyProperties(topologyProperties)
-            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+            .withBrokerUrl(kafkaComponent.getBrokerList())
+            .withErrorTopic(parserConfig.getErrorTopic())
+            .withOutputTopic(parserConfig.getOutputTopic())
+            .build();
 
-    //UnitTestHelper.verboseLogging();
     ComponentRunner runner = new ComponentRunner.Builder()
             .withComponent("zk", zkServerComponent)
             .withComponent("kafka", kafkaComponent)
@@ -131,48 +137,42 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor());
+      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(
+              getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic()));
+
+      // validate the output messages
       Map<String,List<JSONObject>> outputMessages = result.getResult();
       Assert.assertEquals(2, outputMessages.size());
       Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
       Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action"));
-      Assert.assertEquals(2, outputMessages.get(ERROR_TOPIC).size());
-      JSONObject invalidMessage = outputMessages.get(ERROR_TOPIC).get(0);
+      Assert.assertEquals(2, outputMessages.get(parserConfig.getErrorTopic()).size());
+
+      // validate an error message
+      JSONObject invalidMessage = outputMessages.get(parserConfig.getErrorTopic()).get(0);
       Assert.assertEquals(Constants.ErrorType.PARSER_INVALID.getType(), invalidMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
       JSONObject rawMessage = JSONUtils.INSTANCE.load((String) invalidMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()), JSONObject.class);
       Assert.assertEquals("foo", rawMessage.get("dummy"));
       Assert.assertEquals("invalid", rawMessage.get("action"));
-      JSONObject errorMessage = outputMessages.get(ERROR_TOPIC).get(1);
+
+      // validate the next error message
+      JSONObject errorMessage = outputMessages.get(parserConfig.getErrorTopic()).get(1);
       Assert.assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
       Assert.assertEquals("error", errorMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
-      // It's unclear if we need a rawMessageBytes field so commenting out for now
-      //Assert.assertTrue(Arrays.equals(listToBytes(errorMessage.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName())), "error".getBytes()));
-    }
-    finally {
+
+    } finally {
       if(runner != null) {
         runner.stop();
       }
     }
   }
-  private static byte[] listToBytes(Object o ){
-    List<Byte> l = (List<Byte>)o;
-    byte[] ret = new byte[l.size()];
-    int i = 0;
-    for(Number b : l) {
-      ret[i++] = b.byteValue();
-    }
-    return ret;
-  }
+
   private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
     List<JSONObject> tmp = new ArrayList<>();
-    Iterables.addAll(tmp
-            , Iterables.transform(outputMessages
-                    , message -> {
+    Iterables.addAll(tmp,
+            Iterables.transform(outputMessages,
+                    message -> {
                       try {
-                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message)
-                                             ,JSONUtils.MAP_SUPPLIER 
-                                             )
-                        );
+                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
                       } catch (Exception ex) {
                         throw new IllegalStateException(ex);
                       }
@@ -181,13 +181,14 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     );
     return tmp;
   }
+
   @SuppressWarnings("unchecked")
-  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(){
+  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){
 
     return new KafkaProcessor<>()
             .withKafkaComponentName("kafka")
-            .withReadTopic(Constants.ENRICHMENT_TOPIC)
-            .withErrorTopic(ERROR_TOPIC)
+            .withReadTopic(outputTopic)
+            .withErrorTopic(errorTopic)
             .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
               @Nullable
               @Override
@@ -201,7 +202,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
               public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
                 return new HashMap<String, List<JSONObject>>() {{
                   put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
-                  put(ERROR_TOPIC, loadMessages(messageSet.getErrors()));
+                  put(errorTopic, loadMessages(messageSet.getErrors()));
                 }};
               }
             });

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index f73e0f4..c4e3998 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -76,6 +76,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     this.brokerUrl = brokerUrl;
   }
 
+  public KafkaWriter withBrokerUrl(String brokerUrl) {
+    this.brokerUrl = brokerUrl;
+    return this;
+  }
+
   public KafkaWriter withZkQuorum(String zkQuorum) {
     this.zkQuorum = zkQuorum;
     return this;