You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/16 17:54:33 UTC

incubator-metron git commit: METRON-223: Invalid and Erroneous messages should go to kafka for further analysis in the Parser Topology. This closes apache/incubator-metron#151

Repository: incubator-metron
Updated Branches:
  refs/heads/master bdbf33a9d -> 916432c96


METRON-223: Invalid and Erroneous messages should go to kafka for further analysis in the Parser Topology.  This closes apache/incubator-metron#151


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/916432c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/916432c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/916432c9

Branch: refs/heads/master
Commit: 916432c96626750c9a35caef346789778af3818b
Parents: bdbf33a
Author: cstella <ce...@gmail.com>
Authored: Thu Jun 16 13:54:11 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Jun 16 13:54:11 2016 -0400

----------------------------------------------------------------------
 .../roles/metron-kafka/defaults/main.yml        |   2 +
 .../org/apache/metron/common/Constants.java     |   2 +
 .../common/configuration/FieldValidator.java    |   3 +-
 .../configuration/SensorParserConfig.java       |  31 ++-
 .../transformation/FieldTransformation.java     |   3 +-
 .../field/validation/FieldValidation.java       |   3 +-
 .../apache/metron/common/utils/ErrorUtils.java  |  88 ++++++--
 .../apache/metron/common/utils/StringUtils.java |  36 +++
 .../common/writer/BulkWriterComponent.java      |  19 +-
 .../components/ConfigUploadComponent.java       |  24 +-
 .../components/KafkaWithZKComponent.java        |  12 +-
 .../org/apache/metron/parsers/GrokParser.java   |   2 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  |  58 ++---
 .../apache/metron/parsers/bolt/WriterBolt.java  |  93 ++++++++
 .../metron/parsers/bolt/WriterHandler.java      |  90 ++++++++
 .../apache/metron/parsers/csv/CSVParser.java    |   7 +-
 .../parsers/topology/ParserTopologyBuilder.java |  89 ++++++--
 .../parsers/topology/ParserTopologyCLI.java     |  47 +++-
 .../metron/parsers/writer/KafkaWriter.java      |  31 ++-
 .../metron/parsers/bolt/ParserBoltTest.java     |  19 +-
 .../metron/parsers/bolt/WriterBoltTest.java     | 223 +++++++++++++++++++
 .../metron/parsers/csv/CSVParserTest.java       |   8 +
 .../components/ParserTopologyComponent.java     |   8 +-
 .../integration/WriterBoltIntegrationTest.java  | 205 +++++++++++++++++
 24 files changed, 974 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-deployment/roles/metron-kafka/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron-kafka/defaults/main.yml b/metron-deployment/roles/metron-kafka/defaults/main.yml
index 4e83a67..d108d0f 100644
--- a/metron-deployment/roles/metron-kafka/defaults/main.yml
+++ b/metron-deployment/roles/metron-kafka/defaults/main.yml
@@ -20,5 +20,7 @@ topics_to_create:
   - { topic: "pcap",        num_partitions: 1, replication_factor: 1, retention_gb: 10 }
   - { topic: "bro",         num_partitions: 1, replication_factor: 1, retention_gb: 10 }
   - { topic: "yaf",         num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "parser_invalid",         num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "parser_error",         num_partitions: 1, replication_factor: 1, retention_gb: 10 }
   - { topic: "snort",       num_partitions: 1, replication_factor: 1, retention_gb: 10 }
   - { topic: "enrichments", num_partitions: 1, replication_factor: 1, retention_gb: 10 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 1b0695f..1175e8b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -27,6 +27,8 @@ public class Constants {
   public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
   public static final String SENSOR_TYPE = "source.type";
   public static final String ENRICHMENT_TOPIC = "enrichments";
+  public static final String DEFAULT_PARSER_ERROR_TOPIC = "parser_error";
+  public static final String DEFAULT_PARSER_INVALID_TOPIC = "parser_invalid";
   public static final String ERROR_STREAM = "error";
   public static final String INVALID_STREAM = "invalid";
   public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
index 46b4b74..80d770a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
@@ -23,12 +23,13 @@ import org.apache.metron.common.field.validation.FieldValidation;
 import org.apache.metron.common.field.validation.FieldValidations;
 import org.json.simple.JSONObject;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class FieldValidator {
+public class FieldValidator implements Serializable {
 
   public enum Config {
      FIELD_VALIDATIONS("fieldValidations")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 82b407f..2d9a9e0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -22,17 +22,36 @@ import com.google.common.collect.ImmutableList;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SensorParserConfig {
+public class SensorParserConfig implements Serializable {
 
   private String parserClassName;
   private String filterClassName;
   private String sensorTopic;
   private String writerClassName;
+  private String errorWriterClassName;
+  private String invalidWriterClassName;
+
+  public String getErrorWriterClassName() {
+    return errorWriterClassName;
+  }
+
+  public void setErrorWriterClassName(String errorWriterClassName) {
+    this.errorWriterClassName = errorWriterClassName;
+  }
+
+  public String getInvalidWriterClassName() {
+    return invalidWriterClassName;
+  }
+
+  public void setInvalidWriterClassName(String invalidWriterClassName) {
+    this.invalidWriterClassName = invalidWriterClassName;
+  }
 
   public String getWriterClassName() {
     return writerClassName;
@@ -107,7 +126,8 @@ public class SensorParserConfig {
             ", filterClassName='" + filterClassName + '\'' +
             ", sensorTopic='" + sensorTopic + '\'' +
             ", writerClassName='" + writerClassName + '\'' +
-            ", parserConfig=" + parserConfig +
+            ", errorWriterClassName='" + errorWriterClassName + '\'' +
+            ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
             ", parserConfig=" + parserConfig +
             ", fieldTransformations=" + fieldTransformations +
             '}';
@@ -128,7 +148,9 @@ public class SensorParserConfig {
       return false;
     if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
       return false;
-    if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
+    if (getErrorWriterClassName() != null ? !getErrorWriterClassName().equals(that.getErrorWriterClassName()) : that.getErrorWriterClassName() != null)
+      return false;
+    if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null)
       return false;
     if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
       return false;
@@ -142,7 +164,8 @@ public class SensorParserConfig {
     result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
     result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
     result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
-    result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
+    result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0);
+    result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0);
     result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
     result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
index b983815..e308dd3 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
@@ -18,10 +18,11 @@
 
 package org.apache.metron.common.field.transformation;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-public interface FieldTransformation {
+public interface FieldTransformation extends Serializable {
   Map<String, Object> map( Map<String, Object> input
                          , List<String> outputField
                          , Map<String, Object> fieldMappingConfig

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
index 442f2d6..c0d4c40 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
@@ -18,9 +18,10 @@
 
 package org.apache.metron.common.field.validation;
 
+import java.io.Serializable;
 import java.util.Map;
 
-public interface FieldValidation {
+public interface FieldValidation extends Serializable {
   boolean isValid( Map<String, Object> input
                  , Map<String, Object> validationConfig
                  , Map<String, Object> globalConfig

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index 9fffe2a..92e0ab9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,9 +17,19 @@
  */
 package org.apache.metron.common.utils;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
 import backtype.storm.task.OutputCollector;
 import backtype.storm.tuple.Values;
+import org.apache.commons.beanutils.Converter;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.metron.common.Constants;
 import org.json.simple.JSONObject;
@@ -92,37 +102,73 @@ public class ErrorUtils {
     }
   }
 
-  @SuppressWarnings("unchecked") // JSONObject extends HashMap w/o type parameters
-  public static JSONObject generateErrorMessage(String message, Throwable t) {
+  @SuppressWarnings("unchecked")
+  public static JSONObject generateErrorMessage(String message, Throwable t)
+  {
+    return generateErrorMessage(message, t, Optional.empty(), Optional.empty());
+  }
+  public static JSONObject generateErrorMessage(String message
+                                               , Throwable t
+                                               , Optional<String> sensorType
+                                               , Optional<Object> rawMessage
+                                               )
+  {
     JSONObject error_message = new JSONObject();
-
+		
 		/*
      * Save full stack trace in object.
 		 */
-		String stackTrace = ExceptionUtils.getStackTrace(t);
-		
-		String exception = t.toString();
-		
+    String stackTrace = ExceptionUtils.getStackTrace(t);
+
+    String exception = t.toString();
+
+
     error_message.put("time", System.currentTimeMillis());
     try {
       error_message.put("hostname", InetAddress.getLocalHost().getHostName());
     } catch (UnknownHostException ex) {
-      LOGGER.info("Unable to resolve hostname while generating error message", ex);
+
     }
-		
-		error_message.put("message", message);
-		error_message.put(Constants.SENSOR_TYPE, "error");
-		error_message.put("exception", exception);
-		error_message.put("stack", stackTrace);
-		
-		return error_message;
-	}
+    if(rawMessage.isPresent()) {
+      if(rawMessage.get() instanceof byte[]) {
+        error_message.put("rawMessage", Bytes.toString((byte[])rawMessage.get()));
+        error_message.put("rawMessage_bytes", toByteArrayList((byte[])rawMessage.get()));
+      }
+      else {
+        error_message.put("rawMessage", rawMessage.get());
+      }
+    }
+    error_message.put("message", message);
+    error_message.put(Constants.SENSOR_TYPE, StringUtils.join("_", sensorType, Optional.of("error")));
+    error_message.put("exception", exception);
+    error_message.put("stack", stackTrace);
+
+    return error_message;
+  }
+
+  private static List<Byte> toByteArrayList(byte[] list) {
+    List<Byte> ret = new ArrayList<>();
+    for(byte b : list) {
+      ret.add(b);
+    }
+    return ret;
+  }
+
+  public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
+    handleError(collector, t, errorStream, Optional.empty(), Optional.empty());
+  }
+  public static void handleError(OutputCollector collector
+                                , Throwable t
+                                , String errorStream
+                                , Optional<String> sensorType
+                                , Optional<Object> rawMessage
+                                )
+  {
+    JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t, sensorType, rawMessage);
+    collector.emit(errorStream, new Values(error));
+    collector.reportError(t);
+  }
 
-	public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
-		JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t);
-		collector.emit(errorStream, new Values(error));
-		collector.reportError(t);
-	}
 
 	public static String generateThreadDump() {
 		final StringBuilder dump = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java
new file mode 100644
index 0000000..9b0d77a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+public class StringUtils {
+  public static String join(String delim, Optional<String>... parts) {
+    return Joiner.on(delim).join(
+            Arrays.asList(parts).stream().filter(
+                    part -> part.isPresent()
+            ).map( part -> part.get())
+             .toArray()
+                                );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
index 7d66ce1..c0e4f37 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -60,8 +60,10 @@ public class BulkWriterComponent<MESSAGE_T> {
 
   public void error(Throwable e, Iterable<Tuple> tuples) {
     tuples.forEach(t -> collector.ack(t));
-    LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
-    ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+    if(!Iterables.isEmpty(tuples)) {
+      LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
+      ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+    }
   }
 
   protected Collection<Tuple> createTupleCollection() {
@@ -69,6 +71,19 @@ public class BulkWriterComponent<MESSAGE_T> {
   }
 
 
+  public void errorAll(Throwable e) {
+    for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) {
+      error(e, kv.getValue());
+      sensorTupleMap.remove(kv.getKey());
+      sensorMessageMap.remove(kv.getKey());
+    }
+  }
+
+  public void errorAll(String sensorType, Throwable e) {
+    error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()));
+    sensorTupleMap.remove(sensorType);
+    sensorMessageMap.remove(sensorType);
+  }
   public void write( String sensorType
                    , Tuple tuple
                    , MESSAGE_T message

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
index 865b017..7e0f325 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
@@ -20,11 +20,13 @@ package org.apache.metron.integration.components;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 public class ConfigUploadComponent implements InMemoryComponent {
@@ -33,6 +35,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
   private String globalConfigPath;
   private String parserConfigsPath;
   private String enrichmentConfigsPath;
+  private Optional<String> globalConfig = Optional.empty();
   private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
   public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
     this.topologyProperties = topologyProperties;
@@ -58,21 +61,32 @@ public class ConfigUploadComponent implements InMemoryComponent {
     return this;
   }
 
+  public ConfigUploadComponent withGlobalConfig(String globalConfig) {
+    this.globalConfig = Optional.ofNullable(globalConfig);
+    return this;
+  }
 
   @Override
   public void start() throws UnableToStartException {
     try {
-      ConfigurationsUtils.uploadConfigsToZookeeper( globalConfigPath
-                                                  , parserConfigsPath
-                                                  , enrichmentConfigsPath
-                                                  , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
-                                                  );
+      if(globalConfigPath != null) {
+        ConfigurationsUtils.uploadConfigsToZookeeper(globalConfigPath
+                , parserConfigsPath
+                , enrichmentConfigsPath
+                , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+        );
+      }
       for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
         ConfigurationsUtils.writeSensorParserConfigToZookeeper( kv.getKey()
                                                               , kv.getValue()
                                                               , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
                                                               );
       }
+      if(globalConfig.isPresent()) {
+        ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.get().getBytes()
+                                                        , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+                                                        );
+      }
 
     } catch (Exception e) {
       throw new UnableToStartException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index 99f916d..6d2261b 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
 import kafka.admin.AdminUtils;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
+import kafka.common.TopicExistsException;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
@@ -215,9 +216,14 @@ public class KafkaWithZKComponent implements InMemoryComponent {
   }
 
   public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
-    AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
-    if(waitUntilMetadataIsPropagated) {
-      waitUntilMetadataIsPropagated(name, numPartitions);
+    try {
+      AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
+      if (waitUntilMetadataIsPropagated) {
+        waitUntilMetadataIsPropagated(name, numPartitions);
+      }
+    }
+    catch(TopicExistsException tee) {
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index da72da8..925c329 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -169,7 +169,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
       }
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
-      throw new RuntimeException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
+      throw new IllegalStateException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
     }
     return messages;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 46a49fc..aebb8d7 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -43,6 +43,8 @@ import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.common.interfaces.MessageWriter;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.*;
@@ -50,38 +52,22 @@ import java.util.function.Function;
 
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class);
   private OutputCollector collector;
   private MessageParser<JSONObject> parser;
   private MessageFilter<JSONObject> filter = new GenericMessageFilter();
-  private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
-  private BulkMessageWriter<JSONObject> messageWriter;
-  private BulkWriterComponent<JSONObject> writerComponent;
-  private boolean isBulk = false;
+  private WriterHandler writer;
   public ParserBolt( String zookeeperUrl
                    , String sensorType
                    , MessageParser<JSONObject> parser
-                   , MessageWriter<JSONObject> writer
+                   , WriterHandler writer
   )
   {
     super(zookeeperUrl, sensorType);
-    isBulk = false;
+    this.writer = writer;
     this.parser = parser;
-    messageWriter = new WriterToBulkWriter<>(writer);
   }
 
-  public ParserBolt( String zookeeperUrl
-                   , String sensorType
-                   , MessageParser<JSONObject> parser
-                   , BulkMessageWriter<JSONObject> writer
-  )
-  {
-    super(zookeeperUrl, sensorType);
-    isBulk = true;
-    this.parser = parser;
-    messageWriter = writer;
-
-
-  }
 
   public ParserBolt withMessageFilter(MessageFilter<JSONObject> filter) {
     this.filter = filter;
@@ -103,23 +89,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     }
     parser.init();
 
-    if(isBulk) {
-      writerTransformer = config -> new ParserWriterConfiguration(config);
-    }
-    else {
-      writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
-    }
-    try {
-      messageWriter.init(stormConf, writerTransformer.apply(getConfigurations()));
-    } catch (Exception e) {
-      throw new IllegalStateException("Unable to initialize message writer", e);
-    }
-    this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
-      @Override
-      protected Collection<Tuple> createTupleCollection() {
-        return new HashSet<>();
-      }
-    };
+    writer.init(stormConf, collector, getConfigurations());
+
     SensorParserConfig config = getSensorParserConfig();
     if(config != null) {
       config.init();
@@ -139,7 +110,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     try {
       //we want to ack the tuple in the situation where we have are not doing a bulk write
       //otherwise we want to defer to the writerComponent who will ack on bulk commit.
-      boolean ackTuple = !isBulk;
+      boolean ackTuple = !writer.handleAck();
       int numWritten = 0;
       if(sensorParserConfig != null) {
         List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
@@ -152,13 +123,13 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
                 handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
               }
             }
+            numWritten++;
             if(!isGloballyValid(message, fieldValidations)) {
               message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");
               collector.emit(Constants.INVALID_STREAM, new Values(message));
             }
             else {
-              numWritten++;
-              writerComponent.write(getSensorType(), tuple, message, messageWriter, writerTransformer.apply(getConfigurations()));
+              writer.write(getSensorType(), tuple, message, getConfigurations());
             }
           }
         }
@@ -170,7 +141,12 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
         collector.ack(tuple);
       }
     } catch (Throwable ex) {
-      ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
+      ErrorUtils.handleError( collector
+                            , ex
+                            , Constants.ERROR_STREAM
+                            , Optional.of(getSensorType())
+                            , Optional.ofNullable(originalMessage)
+                            );
       collector.ack(tuple);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
new file mode 100644
index 0000000..380f760
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.json.simple.JSONObject;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class WriterBolt extends BaseRichBolt {
+  private WriterHandler handler;
+  private ParserConfigurations configuration;
+  private String sensorType;
+  private transient OutputCollector collector;
+  public WriterBolt(WriterHandler handler, ParserConfigurations configuration, String sensorType) {
+    this.handler = handler;
+    this.configuration = configuration;
+    this.sensorType = sensorType;
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+    handler.init(stormConf, collector, configuration);
+  }
+
+  private JSONObject getMessage(Tuple tuple) {
+    Object ret = tuple.getValueByField("message");
+    if(ret != null) {
+      ret = tuple.getValue(0);
+    }
+    if(ret != null) {
+      return (JSONObject)((JSONObject)ret).clone();
+    }
+    else {
+      return null;
+    }
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    JSONObject message = null;
+    try {
+      message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
+      handler.write(sensorType, tuple, message, configuration);
+      if(!handler.handleAck()) {
+        collector.ack(tuple);
+      }
+    } catch (Throwable e) {
+      ErrorUtils.handleError( collector
+                            , e
+                            , Constants.ERROR_STREAM
+                            , Optional.of(sensorType)
+                            , Optional.ofNullable(message)
+                            );
+      collector.ack(tuple);
+    }
+  }
+
+  /**
+   * Declare the output schema for all the streams of this topology.
+   *
+   * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
+   */
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
new file mode 100644
index 0000000..ecd1ce8
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.apache.metron.common.writer.BulkWriterComponent;
+import org.apache.metron.common.writer.WriterToBulkWriter;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WriterHandler implements Serializable {
+  private BulkMessageWriter<JSONObject> messageWriter;
+  private transient BulkWriterComponent<JSONObject> writerComponent;
+  private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
+  private boolean isBulk = false;
+  public WriterHandler(MessageWriter<JSONObject> writer) {
+    isBulk = false;
+    messageWriter = new WriterToBulkWriter<>(writer);
+
+  }
+  public WriterHandler(BulkMessageWriter<JSONObject> writer) {
+    isBulk = true;
+    messageWriter = writer;
+  }
+
+
+  public boolean handleAck() {
+    return isBulk;
+  }
+
+  public void init(Map stormConf, OutputCollector collector, ParserConfigurations configurations) {
+    if(isBulk) {
+      writerTransformer = config -> new ParserWriterConfiguration(config);
+    }
+    else {
+      writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
+    }
+    try {
+      messageWriter.init(stormConf, writerTransformer.apply(configurations));
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to initialize message writer", e);
+    }
+    this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
+      @Override
+      protected Collection<Tuple> createTupleCollection() {
+        return new HashSet<>();
+      }
+    };
+  }
+
+  public void write( String sensorType
+                   , Tuple tuple
+                   , JSONObject message
+                   , ParserConfigurations configurations
+                   ) throws Exception {
+    writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations));
+  }
+
+  public void errorAll(String sensorType, Throwable e) {
+    writerComponent.errorAll(sensorType, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
index a0097ed..19a768e 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
@@ -84,9 +84,10 @@ public class CSVParser extends BasicParser {
       else {
         return Collections.emptyList();
       }
-    } catch (Exception e) {
-      LOG.error("Unable to parse " + new String(rawMessage), e);
-      return Collections.emptyList();
+    } catch (Throwable e) {
+      String message = "Unable to parse " + new String(rawMessage) + ": " + e.getMessage();
+      LOG.error(message, e);
+      throw new IllegalStateException(message, e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 4b357be..d83d260 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -19,6 +19,7 @@ package org.apache.metron.parsers.topology;
 
 import backtype.storm.topology.TopologyBuilder;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
@@ -29,12 +30,15 @@ import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.AbstractWriter;
 import org.apache.metron.parsers.bolt.ParserBolt;
+import org.apache.metron.parsers.bolt.WriterBolt;
+import org.apache.metron.parsers.bolt.WriterHandler;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.writer.KafkaWriter;
 import org.json.simple.JSONObject;
 import storm.kafka.KafkaSpout;
 import storm.kafka.ZkHosts;
 
+import java.util.EnumMap;
 import java.util.Map;
 
 public class ParserTopologyBuilder {
@@ -46,7 +50,11 @@ public class ParserTopologyBuilder {
                          int spoutParallelism,
                          int spoutNumTasks,
                          int parserParallelism,
-                         int parserNumTasks
+                         int parserNumTasks,
+                         int invalidWriterParallelism,
+                         int invalidWriterNumTasks,
+                         int errorWriterParallelism,
+                         int errorWriterNumTasks
                                      ) throws Exception {
     CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
     client.start();
@@ -67,31 +75,68 @@ public class ParserTopologyBuilder {
            .setNumTasks(spoutNumTasks);
     MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
     parser.configure(sensorParserConfig.getParserConfig());
-    ParserBolt parserBolt = null;
-    {
-      if(sensorParserConfig.getWriterClassName() == null) {
-        KafkaWriter writer = new KafkaWriter(brokerUrl);
-        writer.configure(sensorType, new ParserWriterConfiguration(configurations));
-        parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, writer);
-      }
-      else {
-        AbstractWriter writer = ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName());
-        writer.configure(sensorType, new ParserWriterConfiguration(configurations));
-        if(writer instanceof BulkMessageWriter) {
-          parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, (BulkMessageWriter<JSONObject>)writer);
-        }
-        else if(writer instanceof MessageWriter) {
-          parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, (MessageWriter<JSONObject>)writer);
-        }
-        else {
-          throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter or a BulkMessageWriter");
-        }
-      }
-    }
+
+    ParserBolt parserBolt = new ParserBolt(zookeeperUrl
+                                          , sensorType
+                                          , parser
+                                          ,getHandler( sensorType
+                                                     , configurations
+                                                     , sensorParserConfig.getWriterClassName() == null
+                                                     ? new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC)
+                                                     :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
+                                                     )
+                                          );
+
+    WriterBolt errorBolt = new WriterBolt(getHandler( sensorType
+                                                    , configurations
+                                                    , sensorParserConfig.getErrorWriterClassName() == null
+                                                    ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC)
+                                                                                .withConfigPrefix("error")
+                                                    :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
+                                                    )
+                                         , configurations
+                                         , sensorType
+                                         );
+    WriterBolt invalidBolt = new WriterBolt(getHandler( sensorType
+                                                    , configurations
+                                                    , sensorParserConfig.getErrorWriterClassName() == null
+                                                    ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC)
+                                                                                .withConfigPrefix("invalid")
+                                                    :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
+                                                    )
+                                         , configurations
+                                         , sensorType
+                                         );
+
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
            .setNumTasks(parserNumTasks)
            .shuffleGrouping("kafkaSpout");
+    if(errorWriterNumTasks > 0) {
+      builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
+              .setNumTasks(errorWriterNumTasks)
+              .shuffleGrouping("parserBolt", Constants.ERROR_STREAM)
+      ;
+    }
+    if(invalidWriterNumTasks > 0) {
+      builder.setBolt("invalidMessageWriter", invalidBolt, invalidWriterParallelism)
+              .setNumTasks(invalidWriterNumTasks)
+              .shuffleGrouping("parserBolt", Constants.INVALID_STREAM)
+      ;
+    }
     return builder;
   }
 
+  private static WriterHandler getHandler(String sensorType, ParserConfigurations configurations, AbstractWriter writer) {
+    writer.configure(sensorType, new ParserWriterConfiguration(configurations));
+    if(writer instanceof BulkMessageWriter) {
+      return new WriterHandler((BulkMessageWriter<JSONObject>)writer);
+    }
+    else if(writer instanceof MessageWriter) {
+      return new WriterHandler((MessageWriter<JSONObject>)writer);
+    }
+    else {
+      throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter or a BulkMessageWriter");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index eb7f021..0578fa5 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -67,9 +67,23 @@ public class ParserTopologyCLI {
       o.setType(Number.class);
       return o;
     }),
-    PARSER_PARALLISM("pp", code -> {
+    PARSER_PARALLELISM("pp", code -> {
       Option o = new Option(code, "parser_p", true, "Parser Parallelism Hint");
-      o.setArgName("PARSER_PARALLELISM_HINT");
+      o.setArgName("PARALLELISM_HINT");
+      o.setRequired(false);
+      o.setType(Number.class);
+      return o;
+    }),
+    INVALID_WRITER_PARALLELISM("iwp", code -> {
+      Option o = new Option(code, "invalid_writer_p", true, "Invalid Message Writer Parallelism Hint");
+      o.setArgName("PARALLELISM_HINT");
+      o.setRequired(false);
+      o.setType(Number.class);
+      return o;
+    }),
+    ERROR_WRITER_PARALLELISM("ewp", code -> {
+      Option o = new Option(code, "error_writer_p", true, "Error Writer Parallelism Hint");
+      o.setArgName("PARALLELISM_HINT");
       o.setRequired(false);
       o.setType(Number.class);
       return o;
@@ -83,7 +97,21 @@ public class ParserTopologyCLI {
     }),
     PARSER_NUM_TASKS("pnt", code -> {
       Option o = new Option(code, "parser_num_tasks", true, "Parser Num Tasks");
-      o.setArgName("PARSER_NUM_TASKS");
+      o.setArgName("NUM_TASKS");
+      o.setRequired(false);
+      o.setType(Number.class);
+      return o;
+    }),
+    INVALID_WRITER_NUM_TASKS("iwnt", code -> {
+      Option o = new Option(code, "invalid_writer_num_tasks", true, "Invalid Writer Num Tasks");
+      o.setArgName("NUM_TASKS");
+      o.setRequired(false);
+      o.setType(Number.class);
+      return o;
+    }),
+    ERROR_WRITER_NUM_TASKS("ewnt", code -> {
+      Option o = new Option(code, "error_writer_num_tasks", true, "Error Writer Num Tasks");
+      o.setArgName("NUM_TASKS");
       o.setRequired(false);
       o.setType(Number.class);
       return o;
@@ -226,8 +254,12 @@ public class ParserTopologyCLI {
       String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
       int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
       int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
-      int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLISM.get(cmd, "1"));
+      int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
       int parserNumTasks= Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
+      int errorParallelism = Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
+      int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
+      int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1"));
+      int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1"));
       SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF;
       TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
               brokerUrl,
@@ -236,7 +268,12 @@ public class ParserTopologyCLI {
               spoutParallelism,
               spoutNumTasks,
               parserParallelism,
-              parserNumTasks);
+              parserNumTasks,
+              invalidParallelism,
+              invalidNumTasks,
+              errorParallelism,
+              errorNumTasks
+      );
       Config stormConf = ParserOptions.getConfig(cmd);
 
       if (ParserOptions.TEST.has(cmd)) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
index 6262dc1..f8578c8 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
@@ -25,12 +25,14 @@ import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.MessageWriter;
 import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.StringUtils;
 import org.apache.metron.common.writer.AbstractWriter;
 import org.json.simple.JSONObject;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>, Serializable {
   public enum Configurations {
@@ -44,11 +46,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     Configurations(String key) {
       this.key = key;
     }
-    public Object get(Map<String, Object> config) {
-      return config.get(key);
+    public Object get(Optional<String> configPrefix, Map<String, Object> config) {
+      return config.get(StringUtils.join(".", configPrefix, Optional.of(key)));
     }
-    public <T> T getAndConvert(Map<String, Object> config, Class<T> clazz) {
-      Object o = get(config);
+    public <T> T getAndConvert(Optional<String> configPrefix, Map<String, Object> config, Class<T> clazz) {
+      Object o = get(configPrefix, config);
       if(o != null) {
         return ConversionUtils.convert(o, clazz);
       }
@@ -61,11 +63,10 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
   private int requiredAcks = 1;
   private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
   private KafkaProducer kafkaProducer;
+  private String configPrefix = null;
 
   public KafkaWriter() {}
 
-
-
   public KafkaWriter(String brokerUrl) {
     this.brokerUrl = brokerUrl;
   }
@@ -89,26 +90,34 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     this.kafkaTopic= topic;
     return this;
   }
+  public KafkaWriter withConfigPrefix(String prefix) {
+    this.configPrefix = prefix;
+    return this;
+  }
+
+  public Optional<String> getConfigPrefix() {
+    return Optional.ofNullable(configPrefix);
+  }
   @Override
   public void configure(String sensorName, WriterConfiguration configuration) {
     Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
-    String brokerUrl = Configurations.BROKER.getAndConvert(configMap, String.class);
+    String brokerUrl = Configurations.BROKER.getAndConvert(getConfigPrefix(), configMap, String.class);
     if(brokerUrl != null) {
       this.brokerUrl = brokerUrl;
     }
-    String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(configMap, String.class);
+    String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
     if(keySerializer != null) {
       withKeySerializer(keySerializer);
     }
-    String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(configMap, String.class);
+    String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
     if(valueSerializer != null) {
       withValueSerializer(keySerializer);
     }
-    Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(configMap, Integer.class);
+    Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(getConfigPrefix(), configMap, Integer.class);
     if(requiredAcks!= null) {
       withRequiredAcks(requiredAcks);
     }
-    String topic = Configurations.TOPIC.getAndConvert(configMap, String.class);
+    String topic = Configurations.TOPIC.getAndConvert(getConfigPrefix(), configMap, String.class);
     if(topic != null) {
       withTopic(topic);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 71f8fcf..0b07bc3 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -31,6 +31,7 @@ import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.parsers.interfaces.MessageFilter;
@@ -39,6 +40,7 @@ import org.apache.metron.common.interfaces.MessageWriter;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 
@@ -81,10 +83,11 @@ public class ParserBoltTest extends BaseBoltTest {
   @Mock
   private Tuple t5;
 
+
   @Test
   public void testEmpty() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
       @Override
       protected ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
@@ -119,8 +122,9 @@ public class ParserBoltTest extends BaseBoltTest {
 
   @Test
   public void test() throws Exception {
+
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
       @Override
       protected ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
@@ -177,7 +181,7 @@ public void testImplicitBatchOfOne() throws Exception {
 
   String sensorType = "yaf";
 
-  ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+  ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
     @Override
     protected ParserConfigurations defaultConfigurations() {
       return new ParserConfigurations() {
@@ -220,7 +224,8 @@ public void testImplicitBatchOfOne() throws Exception {
   @Test
   public void testFilter() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
       protected SensorParserConfig getSensorParserConfig() {
         try {
@@ -246,7 +251,7 @@ public void testImplicitBatchOfOne() throws Exception {
 
     String sensorType = "yaf";
 
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
       protected ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
@@ -281,7 +286,7 @@ public void testImplicitBatchOfOne() throws Exception {
 
     String sensorType = "yaf";
 
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
       protected ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
@@ -325,7 +330,7 @@ public void testImplicitBatchOfOne() throws Exception {
   public void testBatchOfFiveWithError() throws Exception {
 
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
       @Override
       protected ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
new file mode 100644
index 0000000..fe8c238
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.bolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.test.bolt.BaseBoltTest;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class WriterBoltTest extends BaseBoltTest{
+  @Mock
+  protected TopologyContext topologyContext;
+
+  @Mock
+  protected OutputCollector outputCollector;
+
+  @Mock
+  private MessageWriter<JSONObject> writer;
+
+  @Mock
+  private BulkMessageWriter<JSONObject> batchWriter;
+
+  private ParserConfigurations getConfigurations(int batchSize) {
+    return new ParserConfigurations() {
+          @Override
+          public SensorParserConfig getSensorParserConfig(String sensorType) {
+            return new SensorParserConfig() {
+              @Override
+              public Map<String, Object> getParserConfig() {
+                return new HashMap<String, Object>() {{
+                  put(ParserWriterConfiguration.BATCH_CONF, batchSize);
+                }};
+              }
+            };
+          }
+        };
+  }
+  @Test
+  public void testBatchHappyPath() throws Exception {
+    ParserConfigurations configurations = getConfigurations(5);
+    String sensorType = "test";
+    List<Tuple> tuples = new ArrayList<>();
+    for(int i = 0;i < 5;++i) {
+      Tuple t = mock(Tuple.class);
+      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      tuples.add(t);
+    }
+    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+    bolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(batchWriter, times(1)).init(any(), any());
+    for(int i = 0;i < 4;++i) {
+      Tuple t = tuples.get(i);
+      bolt.execute(t);
+      verify(outputCollector, times(0)).ack(t);
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+    }
+    bolt.execute(tuples.get(4));
+    for(Tuple t : tuples) {
+      verify(outputCollector, times(1)).ack(t);
+    }
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(outputCollector, times(0)).reportError(any());
+    verify(outputCollector, times(0)).fail(any());
+  }
+  @Test
+  public void testNonBatchHappyPath() throws Exception {
+    ParserConfigurations configurations = getConfigurations(1);
+    String sensorType = "test";
+    Tuple t = mock(Tuple.class);
+    when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+    WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
+    bolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(writer, times(1)).init();
+    bolt.execute(t);
+    verify(outputCollector, times(1)).ack(t);
+    verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(outputCollector, times(0)).reportError(any());
+    verify(outputCollector, times(0)).fail(any());
+  }
+  @Test
+  public void testNonBatchErrorPath() throws Exception {
+    ParserConfigurations configurations = getConfigurations(1);
+    String sensorType = "test";
+    Tuple t = mock(Tuple.class);
+    when(t.getValueByField(eq("message"))).thenThrow(new IllegalStateException());
+    WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
+    bolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(writer, times(1)).init();
+    bolt.execute(t);
+    verify(outputCollector, times(1)).ack(t);
+    verify(writer, times(0)).write(eq(sensorType), any(), any(), any());
+    verify(outputCollector, times(1)).reportError(any());
+    verify(outputCollector, times(0)).fail(any());
+  }
+  @Test
+  public void testNonBatchErrorPathErrorInWrite() throws Exception {
+    ParserConfigurations configurations = getConfigurations(1);
+    String sensorType = "test";
+    Tuple t = mock(Tuple.class);
+    when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+    WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
+    bolt.prepare(new HashMap(), topologyContext, outputCollector);
+    doThrow(new Exception()).when(writer).write(any(), any(), any(), any());
+    verify(writer, times(1)).init();
+    bolt.execute(t);
+    verify(outputCollector, times(1)).ack(t);
+    verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(outputCollector, times(1)).reportError(any());
+    verify(outputCollector, times(0)).fail(any());
+  }
+  @Test
+  public void testBatchErrorPath() throws Exception {
+    ParserConfigurations configurations = getConfigurations(5);
+    String sensorType = "test";
+    List<Tuple> tuples = new ArrayList<>();
+    for(int i = 0;i < 4;++i) {
+      Tuple t = mock(Tuple.class);
+      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      tuples.add(t);
+    }
+    Tuple errorTuple = mock(Tuple.class);
+    Tuple goodTuple = mock(Tuple.class);
+    when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
+    when(errorTuple.getValueByField(eq("message"))).thenThrow(new IllegalStateException());
+
+    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+    bolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(batchWriter, times(1)).init(any(), any());
+    for(int i = 0;i < 4;++i) {
+      Tuple t = tuples.get(i);
+      bolt.execute(t);
+      verify(outputCollector, times(0)).ack(t);
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+    }
+    bolt.execute(errorTuple);
+    for(Tuple t : tuples) {
+      verify(outputCollector, times(0)).ack(t);
+    }
+    bolt.execute(goodTuple);
+    for(Tuple t : tuples) {
+      verify(outputCollector, times(1)).ack(t);
+    }
+    verify(outputCollector, times(1)).ack(goodTuple);
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(outputCollector, times(1)).reportError(any());
+    verify(outputCollector, times(0)).fail(any());
+  }
+
+  @Test
+  public void testBatchErrorPathExceptionInWrite() throws Exception {
+    ParserConfigurations configurations = getConfigurations(5);
+    String sensorType = "test";
+    List<Tuple> tuples = new ArrayList<>();
+    for(int i = 0;i < 4;++i) {
+      Tuple t = mock(Tuple.class);
+      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      tuples.add(t);
+    }
+    Tuple goodTuple = mock(Tuple.class);
+    when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
+
+    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+    bolt.prepare(new HashMap(), topologyContext, outputCollector);
+    doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
+    verify(batchWriter, times(1)).init(any(), any());
+    for(int i = 0;i < 4;++i) {
+      Tuple t = tuples.get(i);
+      bolt.execute(t);
+      verify(outputCollector, times(0)).ack(t);
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+    }
+    bolt.execute(goodTuple);
+    for(Tuple t : tuples) {
+      verify(outputCollector, times(1)).ack(t);
+    }
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(outputCollector, times(1)).ack(goodTuple);
+    verify(outputCollector, times(1)).reportError(any());
+    verify(outputCollector, times(0)).fail(any());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
index 5f314fa..e667e54 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
@@ -95,5 +95,13 @@ public class CSVParserTest {
       Assert.assertEquals(" bar", o.get("col2"));
       Assert.assertEquals(" grok", o.get("col3"));
     }
+    {
+      String line = "foo";
+      try {
+        List<JSONObject> results = parser.parse(Bytes.toBytes(line));
+        Assert.fail("Expected exception");
+      }
+      catch(IllegalStateException iae) {}
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 0cc74f5..ad00deb 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -75,6 +75,10 @@ public class ParserTopologyComponent implements InMemoryComponent {
                                                                    , 1
                                                                    , 1
                                                                    , 1
+                                                                   , 1
+                                                                   , 1
+                                                                   , 1
+                                                                   , 1
                                                                    );
       Map<String, Object> stormConf = new HashMap<>();
       stormConf.put(Config.TOPOLOGY_DEBUG, true);
@@ -87,6 +91,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
 
   @Override
   public void stop() {
-    stormCluster.shutdown();
+    if(stormCluster != null) {
+      stormCluster.shutdown();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
new file mode 100644
index 0000000..a6e83f1
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writers.integration;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.field.validation.FieldValidation;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.integration.*;
+import org.apache.metron.integration.components.ConfigUploadComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.mock.MockTableProvider;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
+import org.apache.metron.test.TestDataType;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.test.utils.SampleDataUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.*;
+
+public class WriterBoltIntegrationTest extends BaseIntegrationTest {
+  public static class MockValidator implements FieldValidation{
+
+    @Override
+    public boolean isValid(Map<String, Object> input, Map<String, Object> validationConfig, Map<String, Object> globalConfig) {
+      if(input.get("action").equals("invalid")) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void initialize(Map<String, Object> validationConfig, Map<String, Object> globalConfig) {
+    }
+  }
+  /**
+   {
+    "fieldValidations" : [
+        {
+          "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"
+        }
+                         ]
+   }
+    */
+  @Multiline
+  public static String globalConfig;
+
+  /**
+   {
+    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+   ,"sensorTopic":"dummy"
+   ,"parserConfig":
+   {
+    "columns" : {
+                "action" : 0
+               ,"dummy" : 1
+                 }
+   }
+   }
+   */
+  @Multiline
+  public static String parserConfig;
+
+  @Test
+  public void test() throws UnableToStartException, IOException {
+    final String sensorType = "dummy";
+    final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("valid,foo"));
+      add(Bytes.toBytes("invalid,foo"));
+      add(Bytes.toBytes("error"));
+    }};
+    final Properties topologyProperties = new Properties();
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(sensorType, 1));
+      add(new KafkaWithZKComponent.Topic(Constants.DEFAULT_PARSER_ERROR_TOPIC, 1));
+      add(new KafkaWithZKComponent.Topic(Constants.DEFAULT_PARSER_INVALID_TOPIC, 1));
+      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    }});
+    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+
+    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
+            .withTopologyProperties(topologyProperties)
+            .withGlobalConfig(globalConfig)
+            .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+
+    ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
+            .withSensorType(sensorType)
+            .withTopologyProperties(topologyProperties)
+            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("config", configUploadComponent)
+            .withComponent("storm", parserTopologyComponent)
+            .withMillisecondsBetweenAttempts(5000)
+            .withNumRetries(10)
+            .build();
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(sensorType, inputMessages);
+      Map<String, List<JSONObject>> outputMessages =
+              runner.process(new Processor<Map<String, List<JSONObject>>>() {
+                Map<String, List<JSONObject>> messages = null;
+
+                public ReadinessState process(ComponentRunner runner) {
+                  KafkaWithZKComponent kafkaWithZKComponent = runner.getComponent("kafka", KafkaWithZKComponent.class);
+                  List<byte[]> outputMessages = kafkaWithZKComponent.readMessages(Constants.ENRICHMENT_TOPIC);
+                  List<byte[]> invalid = kafkaWithZKComponent.readMessages(Constants.DEFAULT_PARSER_INVALID_TOPIC);
+                  List<byte[]> error = kafkaWithZKComponent.readMessages(Constants.DEFAULT_PARSER_ERROR_TOPIC);
+                  if(outputMessages.size() == 1 && invalid.size() == 1 && error.size() == 1) {
+                    messages = new HashMap<String, List<JSONObject>>() {{
+                      put(Constants.ENRICHMENT_TOPIC, loadMessages(outputMessages));
+                      put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(error));
+                      put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(invalid));
+                    }};
+                    return ReadinessState.READY;
+                  }
+                  return ReadinessState.NOT_READY;
+                }
+
+                public Map<String, List<JSONObject>> getResult() {
+                  return messages;
+                }
+              });
+      Assert.assertEquals(3, outputMessages.size());
+      Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
+      Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action"));
+      Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).size());
+      Assert.assertEquals("error", outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage"));
+      Assert.assertTrue(Arrays.equals(listToBytes(outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage_bytes"))
+                                     , "error".getBytes()
+                                     )
+                      );
+      Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).size());
+      Assert.assertEquals("invalid", outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).get(0).get("action"));
+    }
+    finally {
+      if(runner != null) {
+        runner.stop();
+      }
+    }
+  }
+  private static byte[] listToBytes(Object o ){
+    List<Byte> l = (List<Byte>)o;
+    byte[] ret = new byte[l.size()];
+    int i = 0;
+    for(Number b : l) {
+      ret[i++] = b.byteValue();
+    }
+    return ret;
+  }
+  private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
+    List<JSONObject> tmp = new ArrayList<>();
+    Iterables.addAll(tmp
+            , Iterables.transform(outputMessages
+                    , message -> {
+                      try {
+                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message)
+                                             , new TypeReference<Map<String, Object>>() {}
+                                             )
+                        );
+                      } catch (Exception ex) {
+                        throw new IllegalStateException(ex);
+                      }
+                    }
+            )
+    );
+    return tmp;
+  }
+}