You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/03/06 21:25:49 UTC

[2/3] incubator-metron git commit: METRON-694 Index Errors from Topologies (merrimanr) closes apache/incubator-metron#453

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index c5c1294..1f33060 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -18,6 +18,8 @@
 package org.apache.metron.enrichment.bolt;
 
 import org.apache.log4j.Level;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.writer.BulkWriterComponent;
@@ -112,9 +114,13 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
   @Mock
   private BulkMessageWriter<JSONObject> bulkMessageWriter;
 
+  @Mock
+  private MessageGetStrategy messageGetStrategy;
+
   @Test
   public void test() throws Exception {
-    BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl").withBulkMessageWriter(bulkMessageWriter);
+    BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl")
+            .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()).withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setTreeCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index b0076a4..90322fe 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -17,18 +17,22 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import org.apache.log4j.Level;
-import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.apache.storm.tuple.Values;
+import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.log4j.Level;
 import org.apache.metron.TestConstants;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.storm.tuple.Values;
 import org.hamcrest.Description;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -43,12 +47,16 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
 
@@ -194,7 +202,10 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
     UnitTestHelper.setLog4jLevel(GenericEnrichmentBolt.class, Level.FATAL);
     genericEnrichmentBolt.execute(tuple);
     UnitTestHelper.setLog4jLevel(GenericEnrichmentBolt.class, Level.ERROR);
-    verify(outputCollector, times(1)).emit(eq("error"), any(Values.class));
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+            .withThrowable(new Exception("Could not parse binary stream to JSON"));
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
     when(tuple.getStringByField("key")).thenReturn(key);
     when(tuple.getValueByField("message")).thenReturn(originalMessage);
     when(enrichmentAdapter.enrich(any())).thenReturn(new JSONObject());
@@ -217,6 +228,19 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
     verify(enrichmentAdapter, times(1)).logAccess(cacheKey2);
     verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, enrichedMessage)));
 
-
+    reset(outputCollector);
+    genericEnrichmentBolt.cache.invalidateAll();
+    when(enrichmentAdapter.enrich(cacheKey1)).thenReturn(null);
+    genericEnrichmentBolt.execute(tuple);
+    error = new MetronError()
+            .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+            .withErrorFields(new HashSet<String>() {{ add("field1"); }})
+            .addRawMessage(new JSONObject() {{
+              put("field1", "value1");
+              put("field2", "value2");
+              put("source.type", "test");
+            }})
+            .withThrowable(new CacheLoader.InvalidCacheLoadException("CacheLoader returned null for key CacheKey{field='field1', value='value1'}."));
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index 5b06d33..9f12fcd 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Values;
+import com.google.common.cache.LoadingCache;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.error.MetronError;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
@@ -31,11 +35,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -89,7 +95,7 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest {
   }
 
   @Test
-  public void test() {
+  public void test() throws Exception {
     StandAloneJoinBolt joinBolt = new StandAloneJoinBolt("zookeeperUrl");
     joinBolt.setCuratorFramework(client);
     joinBolt.setTreeCache(cache);
@@ -126,5 +132,16 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest {
     joinBolt.execute(tuple);
     verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage)));
     verify(outputCollector, times(1)).ack(tuple);
+
+    joinBolt.cache = mock(LoadingCache.class);
+    when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception")));
+    joinBolt.execute(tuple);
+
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+            .withMessage("Joining problem: {}")
+            .withThrowable(new ExecutionException(new Exception("join exception")))
+            .addRawMessage(new JSONObject());
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 7306c64..e012c55 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -76,6 +76,7 @@ import java.util.stream.Stream;
 import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.*;
 
 public class EnrichmentIntegrationTest extends BaseIntegrationTest {
+  private static final String ERROR_TOPIC = "enrichment_error";
   private static final String SRC_IP = "ip_src_addr";
   private static final String DST_IP = "ip_dst_addr";
   private static final String MALICIOUS_IP_TYPE = "malicious_ip";
@@ -139,13 +140,13 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       setProperty("enrichment.simple.hbase.table", enrichmentsTableName);
       setProperty("enrichment.simple.hbase.cf", cf);
       setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC);
-      setProperty("enrichment.error.topic", Constants.ENRICHMENT_ERROR_TOPIC);
+      setProperty("enrichment.error.topic", ERROR_TOPIC);
     }};
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
       add(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1));
-      add(new KafkaComponent.Topic(Constants.ENRICHMENT_ERROR_TOPIC, 1));
+      add(new KafkaComponent.Topic(ERROR_TOPIC, 1));
     }});
     String globalConfigStr = null;
     {
@@ -201,15 +202,14 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       fluxComponent.submitTopology();
 
       kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
-      ProcessorResult<List<Map<String, Object>>> result = runner.process(getProcessor());
-      // We expect failures, so we don't care if result returned failure or not
-      List<Map<String, Object>> docs = result.getResult();
+      ProcessorResult<Map<String, List<Map<String, Object>>>> result = runner.process(getProcessor());
+      Map<String,List<Map<String, Object>>> outputMessages = result.getResult();
+      List<Map<String, Object>> docs = outputMessages.get(Constants.INDEXING_TOPIC);
       Assert.assertEquals(inputMessages.size(), docs.size());
       validateAll(docs);
-
-      List<byte[]> errors = result.getProcessErrors();
+      List<Map<String, Object>> errors = outputMessages.get(ERROR_TOPIC);
       Assert.assertEquals(inputMessages.size(), errors.size());
-      validateErrors(result.getProcessErrors());
+      validateErrors(errors);
     } finally {
       runner.stop();
     }
@@ -234,10 +234,12 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     }
   }
 
-  protected void validateErrors(List<byte[]> errors) {
-    for(byte[] error : errors) {
-      // Don't reconstruct the entire message, just ensure it contains the known error message inside.
-      Assert.assertTrue(new String(error).contains(ErrorEnrichmentBolt.TEST_ERROR_MESSAGE));
+  protected void validateErrors(List<Map<String, Object>> errors) {
+    for(Map<String, Object> error : errors) {
+      Assert.assertEquals("Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.MESSAGE.getName()));
+      Assert.assertEquals("java.lang.IllegalStateException: Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.EXCEPTION.getName()));
+      Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName()));
+      Assert.assertEquals("{\"rawMessage\":\"Error Test Raw Message String\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
     }
   }
 
@@ -504,39 +506,47 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     return ret;
   }
 
+  private static List<Map<String, Object>> loadMessages(List<byte[]> outputMessages) {
+    List<Map<String, Object>> tmp = new ArrayList<>();
+    Iterables.addAll(tmp
+            , Iterables.transform(outputMessages
+                    , message -> {
+                      try {
+                        return new HashMap<>(JSONUtils.INSTANCE.load(new String(message)
+                                , new TypeReference<Map<String, Object>>() {}
+                        )
+                        );
+                      } catch (Exception ex) {
+                        throw new IllegalStateException(ex);
+                      }
+                    }
+            )
+    );
+    return tmp;
+  }
   @SuppressWarnings("unchecked")
-  private Processor<List<Map<String, Object>>> getProcessor() {
+  private KafkaProcessor<Map<String,List<Map<String, Object>>>> getProcessor(){
 
-    KafkaProcessor<List<Map<String, Object>>> kafkaProcessor = new KafkaProcessor<>().withKafkaComponentName("kafka")
+    return new KafkaProcessor<>()
+            .withKafkaComponentName("kafka")
             .withReadTopic(Constants.INDEXING_TOPIC)
-            .withErrorTopic(Constants.ENRICHMENT_ERROR_TOPIC)
-            .withInvalidTopic(Constants.INVALID_STREAM)
+            .withErrorTopic(ERROR_TOPIC)
             .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
               @Nullable
               @Override
               public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-                // this test is written to return 10 errors and 10 messages
-                // we can just check when the messages match here
-                // if they do then we are good
-                return messageSet.getMessages().size() == inputMessages.size();
+                return (messageSet.getMessages().size() == inputMessages.size()) && (messageSet.getErrors().size() == inputMessages.size());
               }
             })
-            .withProvideResult(new Function<KafkaMessageSet , List<Map<String, Object>>>() {
+            .withProvideResult(new Function<KafkaMessageSet,Map<String,List<Map<String, Object>>>>(){
               @Nullable
               @Override
-              public List<Map<String, Object>> apply(@Nullable KafkaMessageSet messageSet) {
-                List<Map<String,Object>> docs = new ArrayList<>();
-                for (byte[] message : messageSet.getMessages()) {
-                  try {
-                    docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() {
-                    }));
-                  } catch (IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                  }
-                }
-                return docs;
+              public Map<String,List<Map<String, Object>>> apply(@Nullable KafkaMessageSet messageSet) {
+                return new HashMap<String, List<Map<String, Object>>>() {{
+                  put(Constants.INDEXING_TOPIC, loadMessages(messageSet.getMessages()));
+                  put(ERROR_TOPIC, loadMessages(messageSet.getErrors()));
+                }};
               }
             });
-    return kafkaProcessor;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index 5296ea0..f243eef 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -24,7 +24,7 @@ and sent to
 * An indexing bolt configured to write to either elasticsearch or Solr
 * An indexing bolt configured to write to HDFS under `/apps/metron/enrichment/indexed`
 
-Errors during indexing are sent to a kafka queue called `index_errors`
+By default, errors during indexing are sent back into the `indexing` kafka queue so that they can be indexed and archived.
 
 ##Sensor Indexing Configuration
 The sensor specific configuration is intended to configure the

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json
new file mode 100644
index 0000000..4c9d786
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json
@@ -0,0 +1,17 @@
+{
+  "hdfs" : {
+    "index": "error",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "error",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "error",
+    "batchSize": 5,
+    "enabled" : true
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
index 8bf8f48..3e329f4 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -101,7 +101,8 @@ bolts:
                     - ref: "indexWriter"
             -   name: "withMessageGetter"
                 args:
-                    - "RAW"
+                    - "DEFAULT_JSON_FROM_POSITION"
+
     -   id: "hdfsIndexingBolt"
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
@@ -112,7 +113,7 @@ bolts:
                     - ref: "hdfsWriter"
             -   name: "withMessageGetter"
                 args:
-                    - "RAW"
+                    - "DEFAULT_JSON_FROM_POSITION"
 
     -   id: "indexingErrorBolt"
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index ae04e43..60cd1d1 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -44,7 +44,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
@@ -53,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
 
 public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
+  protected static final String ERROR_TOPIC = "indexing_error";
   protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
   protected String fluxPath = "../metron-indexing/src/main/flux/indexing/remote.yaml";
@@ -125,7 +125,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
       setProperty("indexing.workers", "1");
       setProperty("indexing.executors", "0");
       setProperty("index.input.topic", Constants.INDEXING_TOPIC);
-      setProperty("index.error.topic", Constants.INDEXING_ERROR_TOPIC);
+      setProperty("index.error.topic", ERROR_TOPIC);
       setProperty("index.date.format", dateFormat);
       //HDFS settings
 
@@ -138,7 +138,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1));
-      add(new KafkaComponent.Topic(Constants.INDEXING_ERROR_TOPIC, 1));
+      add(new KafkaComponent.Topic(ERROR_TOPIC, 1));
     }});
     List<Map<String, Object>> inputDocs = new ArrayList<>();
     for(byte[] b : inputMessages) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java
index 3eb4e8f..c309686 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java
@@ -24,7 +24,6 @@ public class ProcessorResult<T> {
     public static class Builder<T>{
         T result;
         List<byte[]> processErrors;
-        List<byte[]> processInvalids;
 
         public Builder(){}
 
@@ -38,25 +37,18 @@ public class ProcessorResult<T> {
             return this;
         }
 
-        public Builder withProcessInvalids(List<byte[]> processInvalids){
-            this.processInvalids = processInvalids;
-            return this;
-        }
-
         public ProcessorResult<T> build(){
-            return new ProcessorResult<T>(result,processErrors,processInvalids);
+            return new ProcessorResult<T>(result,processErrors);
         }
 
     }
 
     T result;
     List<byte[]> processErrors;
-    List<byte[]> processInvalids;
     @SuppressWarnings("unchecked")
-    public ProcessorResult(T result,List<byte[]> processErrors, List<byte[]> processInvalids){
+    public ProcessorResult(T result,List<byte[]> processErrors){
         this.result = result;
         this.processErrors = processErrors == null ? new ArrayList() : processErrors;
-        this.processInvalids = processInvalids == null ? new ArrayList() : processInvalids;
     }
 
     public T getResult(){
@@ -67,12 +59,8 @@ public class ProcessorResult<T> {
         return processErrors;
     }
 
-    public List<byte[]> getProcessInvalids(){
-        return processInvalids;
-    }
-
     public boolean failed(){
-        return processErrors.size() > 0 || processInvalids.size() > 0;
+        return processErrors.size() > 0;
     }
 
     public void getBadResults(StringBuffer buffer){
@@ -84,10 +72,5 @@ public class ProcessorResult<T> {
             buffer.append(new String(outputMessage));
         }
         buffer.append("\n");
-        buffer.append(String.format("%d Invalid Messages", processInvalids.size()));
-        for (byte[] outputMessage : processInvalids) {
-            buffer.append(new String(outputMessage));
-        }
-        buffer.append("\n");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
index 4227933..683fe6a 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
@@ -23,12 +23,10 @@ import java.util.List;
 public class KafkaMessageSet{
     public List<byte[]> messages;
     public List<byte[]> errors;
-    public List<byte[]> invalids;
 
-    public KafkaMessageSet(List<byte[]> messages, List<byte[]> errors, List<byte[]> invalids) {
+    public KafkaMessageSet(List<byte[]> messages, List<byte[]> errors) {
         this.messages = messages;
         this.errors = errors;
-        this.invalids = invalids;
     }
 
 
@@ -38,7 +36,4 @@ public class KafkaMessageSet{
     public List<byte[]> getErrors() {
         return errors;
     }
-    public List<byte[]> getInvalids() {
-        return invalids;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
index 6fdbbf4..63f073d 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
@@ -30,10 +30,8 @@ public class KafkaProcessor<T> implements Processor<T> {
     private String kafkaComponentName;
     private String readTopic;
     private String errorTopic;
-    private String invalidTopic;
     private List<byte[]> messages = new LinkedList<>();
     private List<byte[]> errors = new LinkedList<>();
-    private List<byte[]> invalids = new LinkedList<>();
 
     public KafkaProcessor(){}
     public KafkaProcessor withKafkaComponentName(String name){
@@ -48,10 +46,6 @@ public class KafkaProcessor<T> implements Processor<T> {
         this.errorTopic = topicName;
         return this;
     }
-    public KafkaProcessor withInvalidTopic(String topicName){
-        this.invalidTopic = topicName;
-        return this;
-    }
     public KafkaProcessor withValidateReadMessages(Function<KafkaMessageSet, Boolean> validate){
         this.validateReadMessages = validate;
         return this;
@@ -68,25 +62,19 @@ public class KafkaProcessor<T> implements Processor<T> {
         KafkaComponent kafkaComponent = runner.getComponent(kafkaComponentName, KafkaComponent.class);
         LinkedList<byte[]> outputMessages = new LinkedList<>(kafkaComponent.readMessages(readTopic));
         LinkedList<byte[]> outputErrors = null;
-        LinkedList<byte[]> outputInvalids = null;
 
         if (errorTopic != null) {
             outputErrors = new LinkedList<>(kafkaComponent.readMessages(errorTopic));
         }
-        if (invalidTopic != null) {
-            outputInvalids = new LinkedList<>(kafkaComponent.readMessages(invalidTopic));
-        }
-        Boolean validated = validateReadMessages.apply(new KafkaMessageSet(outputMessages,outputErrors,outputInvalids));
+        Boolean validated = validateReadMessages.apply(new KafkaMessageSet(outputMessages,outputErrors));
         if(validated == null){
             validated = false;
         }
         if(validated){
             messages.addAll(outputMessages);
             errors.addAll(outputErrors);
-            invalids.addAll(outputInvalids);
             outputMessages.clear();
             outputErrors.clear();
-            outputInvalids.clear();
             return ReadinessState.READY;
         }
         return ReadinessState.NOT_READY;
@@ -94,7 +82,7 @@ public class KafkaProcessor<T> implements Processor<T> {
     @SuppressWarnings("unchecked")
     public ProcessorResult<T> getResult(){
         ProcessorResult.Builder<T> builder = new ProcessorResult.Builder();
-        return builder.withResult(provideResult.apply(new KafkaMessageSet(messages,errors,invalids))).withProcessErrors(errors).withProcessInvalids(invalids).build();
+        return builder.withResult(provideResult.apply(new KafkaMessageSet(messages,errors))).withProcessErrors(errors).build();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 3c4310d..2cf9bbf 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -30,7 +30,7 @@ Data flows through the parser bolt via kafka and into the `enrichments`
 topology in kafka.  Errors are collected with the context of the error
 (e.g. stacktrace) and original message causing the error and sent to an
 `error` queue.  Invalid messages as determined by global validation
-functions are sent to an `invalid` queue. 
+functions are also treated as errors and sent to an `error` queue. 
  
 ##Message Format
 
@@ -277,9 +277,6 @@ usage: start_parser_topology.sh
  -ewp,--error_writer_p <PARALLELISM_HINT>       Error Writer Parallelism
                                                 Hint
  -h,--help                                      This screen
- -iwnt,--invalid_writer_num_tasks <NUM_TASKS>   Invalid Writer Num Tasks
- -iwp,--invalid_writer_p <PARALLELISM_HINT>     Invalid Message Writer
-                                                Parallelism Hint
  -k,--kafka <BROKER_URL>                        Kafka Broker URL
  -mt,--message_timeout <TIMEOUT_IN_SECS>        Message Timeout in Seconds
  -mtp,--max_task_parallelism <MAX_TASK>         Max task parallelism
@@ -365,9 +362,6 @@ be customized by modifying the arguments sent to this utility.
 * The Error Message Writer Bolt
   * `--error_writer_num_tasks` : The number of tasks for the error writer bolt
   * `--error_writer_p` : The parallelism hint for the error writer bolt
-* The Invalid Message Writer Bolt
-  * `--invalid_writer_num_tasks` : The number of tasks for the error writer bolt
-  * `--invalid_writer_p` : The parallelism hint for the error writer bolt
  
 Finally, if workers and executors are new to you, the following might be of use to you:
 * [Understanding the Parallelism of a Storm Topology](http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/)

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 20c15ce..0b1ea3d 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -18,29 +18,38 @@
 package org.apache.metron.parsers.bolt;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.common.configuration.FieldTransformer;
 import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.StellarFunctions;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
@@ -51,6 +60,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   private MessageFilter<JSONObject> filter;
   private WriterHandler writer;
   private org.apache.metron.common.dsl.Context stellarContext;
+  private transient MessageGetStrategy messageGetStrategy;
   public ParserBolt( String zookeeperUrl
                    , String sensorType
                    , MessageParser<JSONObject> parser
@@ -72,6 +82,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     super.prepare(stormConf, context, collector);
+    messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
     initializeStellar();
     if(getSensorParserConfig() != null && filter == null) {
@@ -109,7 +120,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
-    byte[] originalMessage = tuple.getBinary(0);
+    byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
     SensorParserConfig sensorParserConfig = getSensorParserConfig();
     try {
       //we want to ack the tuple in the situation where we have are not doing a bulk write
@@ -128,12 +139,22 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
           }
           if (parser.validate(message) && (filter == null || filter.emitTuple(message, stellarContext))) {
             numWritten++;
-            if(!isGloballyValid(message, fieldValidations)) {
-              message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");
-              collector.emit(Constants.INVALID_STREAM, new Values(message));
+            List<FieldValidator> failedValidators = getFailedValidators(message, fieldValidations);
+            if(failedValidators.size() > 0) {
+              MetronError error = new MetronError()
+                      .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                      .withSensorType(getSensorType())
+                      .addRawMessage(message);
+              Set<String> errorFields = failedValidators.stream()
+                      .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+                      .collect(Collectors.toSet());
+              if (!errorFields.isEmpty()) {
+                error.withErrorFields(errorFields);
+              }
+              ErrorUtils.handleError(collector, error);
             }
             else {
-              writer.write(getSensorType(), tuple, message, getConfigurations());
+              writer.write(getSensorType(), tuple, message, getConfigurations(), messageGetStrategy);
             }
           }
         }
@@ -145,28 +166,28 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
         collector.ack(tuple);
       }
     } catch (Throwable ex) {
-      ErrorUtils.handleError( collector
-                            , ex
-                            , Constants.ERROR_STREAM
-                            , Optional.of(getSensorType())
-                            , Optional.ofNullable(originalMessage)
-                            );
+      MetronError error = new MetronError()
+              .withErrorType(Constants.ErrorType.PARSER_ERROR)
+              .withThrowable(ex)
+              .withSensorType(getSensorType())
+              .addRawMessage(originalMessage);
+      ErrorUtils.handleError(collector, error);
       collector.ack(tuple);
     }
   }
 
-  private boolean isGloballyValid(JSONObject input, List<FieldValidator> validators) {
+  private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) {
+    List<FieldValidator> failedValidators = new ArrayList<>();
     for(FieldValidator validator : validators) {
       if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) {
-        return false;
+        failedValidators.add(validator);
       }
     }
-    return true;
+    return failedValidators;
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream(Constants.INVALID_STREAM, new Fields("message"));
     declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index 8eb0656..ef7288b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -18,23 +18,27 @@
 
 package org.apache.metron.parsers.bolt;
 
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.utils.ErrorUtils;
 import org.json.simple.JSONObject;
 
 import java.util.Map;
-import java.util.Optional;
 
 public class WriterBolt extends BaseRichBolt {
   private WriterHandler handler;
   private ParserConfigurations configuration;
   private String sensorType;
+  private Constants.ErrorType errorType = Constants.ErrorType.DEFAULT_ERROR;
+  private transient MessageGetStrategy messageGetStrategy;
   private transient OutputCollector collector;
   public WriterBolt(WriterHandler handler, ParserConfigurations configuration, String sensorType) {
     this.handler = handler;
@@ -42,9 +46,15 @@ public class WriterBolt extends BaseRichBolt {
     this.sensorType = sensorType;
   }
 
+  public WriterBolt withErrorType(Constants.ErrorType errorType) {
+    this.errorType = errorType;
+    return this;
+  }
+
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     this.collector = collector;
+    messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get();
     handler.init(stormConf, collector, configuration);
   }
 
@@ -65,18 +75,18 @@ public class WriterBolt extends BaseRichBolt {
   public void execute(Tuple tuple) {
     JSONObject message = null;
     try {
-      message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
-      handler.write(sensorType, tuple, message, configuration);
+      message = (JSONObject) messageGetStrategy.get(tuple);
+      handler.write(sensorType, tuple, message, configuration, messageGetStrategy);
       if(!handler.handleAck()) {
         collector.ack(tuple);
       }
     } catch (Throwable e) {
-      ErrorUtils.handleError( collector
-                            , e
-                            , Constants.ERROR_STREAM
-                            , Optional.of(sensorType)
-                            , Optional.ofNullable(message)
-                            );
+      MetronError error = new MetronError()
+              .withErrorType(errorType)
+              .withThrowable(e)
+              .withSensorType(sensorType)
+              .addRawMessage(message);
+      ErrorUtils.handleError(collector, error);
       collector.ack(tuple);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index 38425b5..3273ca7 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.parsers.bolt;
 
+import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.ParserConfigurations;
@@ -80,11 +81,12 @@ public class WriterHandler implements Serializable {
                    , Tuple tuple
                    , JSONObject message
                    , ParserConfigurations configurations
+                   , MessageGetStrategy messageGetStrategy
                    ) throws Exception {
-    writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations));
+    writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations), messageGetStrategy);
   }
 
-  public void errorAll(String sensorType, Throwable e) {
-    writerComponent.errorAll(sensorType, e);
+  public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) {
+    writerComponent.errorAll(sensorType, e, messageGetStrategy);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 5f1927e..aeac33c 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -57,8 +57,6 @@ public class ParserTopologyBuilder {
    * @param spoutNumTasks            Number of tasks for the spout
    * @param parserParallelism        Parallelism hint for the parser bolt
    * @param parserNumTasks           Number of tasks for the parser bolt
-   * @param invalidWriterParallelism Parallelism hint for the bolt that handles invalid data
-   * @param invalidWriterNumTasks    Number of tasks for the bolt that handles invalid data
    * @param errorWriterParallelism   Parallelism hint for the bolt that handles errors
    * @param errorWriterNumTasks      Number of tasks for the bolt that handles errors
    * @param kafkaSpoutConfig         Configuration options for the kafka spout
@@ -73,8 +71,6 @@ public class ParserTopologyBuilder {
                                       int spoutNumTasks,
                                       int parserParallelism,
                                       int parserNumTasks,
-                                      int invalidWriterParallelism,
-                                      int invalidWriterNumTasks,
                                       int errorWriterParallelism,
                                       int errorWriterNumTasks,
                                       EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfig
@@ -104,14 +100,6 @@ public class ParserTopologyBuilder {
               .shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
 
-    // create the invalid bolt, if needed
-    if (invalidWriterNumTasks > 0) {
-      WriterBolt invalidBolt = createInvalidBolt(brokerUrl, sensorType, configs, parserConfig);
-      builder.setBolt("invalidMessageWriter", invalidBolt, invalidWriterParallelism)
-              .setNumTasks(invalidWriterNumTasks)
-              .shuffleGrouping("parserBolt", Constants.INVALID_STREAM);
-    }
-
     return builder;
   }
 
@@ -162,29 +150,6 @@ public class ParserTopologyBuilder {
   }
 
   /**
-   * Create a bolt that handles invalid messages.
-   *
-   * @param brokerUrl    The Kafka Broker URL
-   * @param sensorType   Type of sensor that is being consumed.
-   * @param configs
-   * @param parserConfig
-   * @return A Storm bolt that handles invalid messages.
-   */
-  private static WriterBolt createInvalidBolt(String brokerUrl, String sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) {
-
-    // create writer - if not configured uses a sensible default
-    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null
-            ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC).withConfigPrefix("invalid")
-            : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
-    writer.configure(sensorType, new ParserWriterConfiguration(configs));
-
-    // create a writer handler
-    WriterHandler writerHandler = createWriterHandler(writer);
-
-    return new WriterBolt(writerHandler, configs, sensorType);
-  }
-
-  /**
    * Create a bolt that handles error messages.
    *
    * @param brokerUrl    Kafka Broker URL
@@ -197,14 +162,14 @@ public class ParserTopologyBuilder {
 
     // create writer - if not configured uses a sensible default
     AbstractWriter writer = parserConfig.getErrorWriterClassName() == null
-            ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC).withConfigPrefix("error")
+            ? new KafkaWriter(brokerUrl).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")).withConfigPrefix("error")
             : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
     // create a writer handler
     WriterHandler writerHandler = createWriterHandler(writer);
 
-    return new WriterBolt(writerHandler, configs, sensorType);
+    return new WriterBolt(writerHandler, configs, sensorType).withErrorType(Constants.ErrorType.PARSER_ERROR);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 5ea561c..2bf484e 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -302,8 +302,6 @@ public class ParserTopologyCLI {
               spoutNumTasks,
               parserParallelism,
               parserNumTasks,
-              invalidParallelism,
-              invalidNumTasks,
               errorParallelism,
               errorNumTasks,
               spoutConfig

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 6b00998..b3e15b2 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,8 +17,11 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.*;
 
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
@@ -151,6 +154,64 @@ public class ParserBoltTest extends BaseBoltTest {
     verify(parser, times(0)).validate(any());
     verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any());
     verify(outputCollector, times(1)).ack(tuple);
+
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(new NullPointerException())
+            .withSensorType(sensorType)
+            .addRawMessage(sampleBinary);
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+  }
+
+  @Test
+  public void testInvalid() throws Exception {
+    String sensorType = "yaf";
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+      @Override
+      protected ParserConfigurations defaultConfigurations() {
+        return new ParserConfigurations() {
+          @Override
+          public SensorParserConfig getSensorParserConfig(String sensorType) {
+            return new SensorParserConfig() {
+              @Override
+              public Map<String, Object> getParserConfig() {
+                return new HashMap<String, Object>() {{
+                }};
+              }
+
+
+            };
+          }
+        };
+      }
+
+    };
+
+    buildGlobalConfig(parserBolt);
+
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    byte[] sampleBinary = "some binary message".getBytes();
+
+    when(tuple.getBinary(0)).thenReturn(sampleBinary);
+    JSONObject parsedMessage = new JSONObject();
+    parsedMessage.put("field", "invalidValue");
+    List<JSONObject> messageList = new ArrayList<>();
+    messageList.add(parsedMessage);
+    when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messageList));
+    when(parser.validate(parsedMessage)).thenReturn(true);
+    parserBolt.execute(tuple);
+
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(sensorType)
+            .withErrorFields(new HashSet<String>() {{ add("field"); }})
+            .addRawMessage(new JSONObject(){{
+              put("field", "invalidValue");
+              put("source.type", "yaf");
+            }});
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
 
   @Test
@@ -531,6 +592,16 @@ public void testImplicitBatchOfOne() throws Exception {
 
   }
 
+  protected void buildGlobalConfig(ParserBolt parserBolt) {
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    Map<String, Object> fieldValidation = new HashMap<>();
+    fieldValidation.put("input", Arrays.asList("field"));
+    fieldValidation.put("validation", "STELLAR");
+    fieldValidation.put("config", new HashMap<String, String>(){{ put("condition", "field != 'invalidValue'"); }});
+    globalConfig.put("fieldValidations", Arrays.asList(fieldValidation));
+    parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
+  }
+
   private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
     bolt.execute(t);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index 4693829..4511b55 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -17,22 +17,23 @@
  */
 
 package org.apache.metron.parsers.bolt;
+
 import org.apache.log4j.Level;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.IndexingConfigurations;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.apache.metron.writer.BulkWriterComponent;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.test.bolt.BaseBoltTest;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -44,7 +45,12 @@ import java.util.Map;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class WriterBoltTest extends BaseBoltTest{
   @Mock
@@ -143,16 +149,24 @@ public class WriterBoltTest extends BaseBoltTest{
     ParserConfigurations configurations = getConfigurations(1);
     String sensorType = "test";
     Tuple t = mock(Tuple.class);
+    when(t.toString()).thenReturn("tuple");
     when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
     WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    doThrow(new Exception()).when(writer).write(any(), any(), any(), any());
+    doThrow(new Exception("write error")).when(writer).write(any(), any(), any(), any());
     verify(writer, times(1)).init();
     bolt.execute(t);
     verify(outputCollector, times(1)).ack(t);
     verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
     verify(outputCollector, times(1)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
+
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.DEFAULT_ERROR)
+            .withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}"))
+            .withSensorType(sensorType)
+            .addRawMessage(new JSONObject());
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index 4ba1c43..a170a2c 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 import java.util.*;
 
 public abstract class ParserIntegrationTest extends BaseIntegrationTest {
+  protected static final String ERROR_TOPIC = "parser_error";
   protected List<byte[]> inputMessages;
   @Test
   public void test() throws Exception {
@@ -47,8 +48,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(sensorType, 1));
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-      add(new KafkaComponent.Topic(Constants.INVALID_STREAM,1));
-      add(new KafkaComponent.Topic(Constants.ERROR_STREAM,1));
+      add(new KafkaComponent.Topic(ERROR_TOPIC,1));
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
 
@@ -115,13 +115,12 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
     return new KafkaProcessor<>()
             .withKafkaComponentName("kafka")
             .withReadTopic(Constants.ENRICHMENT_TOPIC)
-            .withErrorTopic(Constants.ERROR_STREAM)
-            .withInvalidTopic(Constants.INVALID_STREAM)
+            .withErrorTopic(ERROR_TOPIC)
             .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
               @Nullable
               @Override
               public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-                return (messageSet.getMessages().size() + messageSet.getErrors().size() + messageSet.getInvalids().size()) == inputMessages.size();
+                return (messageSet.getMessages().size() + messageSet.getErrors().size() == inputMessages.size());
               }
             })
             .withProvideResult(new Function<KafkaMessageSet,List<byte[]>>(){

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 80b6ebd..73d3827 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -34,6 +34,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
   private Properties topologyProperties;
   private String brokerUrl;
   private String sensorType;
+  private SpoutConfig.Offset offset = SpoutConfig.Offset.BEGINNING;
   private LocalCluster stormCluster;
 
   public static class Builder {
@@ -64,15 +65,17 @@ public class ParserTopologyComponent implements InMemoryComponent {
     this.sensorType = sensorType;
   }
 
+  public void setOffset(SpoutConfig.Offset offset) {
+    this.offset = offset;
+  }
+
   @Override
   public void start() throws UnableToStartException {
     try {
       TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk")
                                                                    , brokerUrl
                                                                    , sensorType
-                                                                   , SpoutConfig.Offset.BEGINNING
-                                                                   , 1
-                                                                   , 1
+                                                                   , offset
                                                                    , 1
                                                                    , 1
                                                                    , 1

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index f37b1fc..7476bcf 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.metron.writers.integration;
+
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -37,6 +37,8 @@ import org.apache.metron.parsers.csv.CSVParser;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,6 +47,8 @@ import java.io.IOException;
 import java.util.*;
 
 public class WriterBoltIntegrationTest extends BaseIntegrationTest {
+  private static final String ERROR_TOPIC = "parser_error";
+
   public static class MockValidator implements FieldValidation{
 
     @Override
@@ -65,7 +69,8 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
         {
           "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"
         }
-                         ]
+                         ],
+   "parser.error.topic":"parser_error"
    }
     */
   @Multiline
@@ -88,7 +93,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
   public static String parserConfig;
 
   @Test
-  public void test() throws UnableToStartException, IOException {
+  public void test() throws UnableToStartException, IOException, ParseException {
     UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL);
     final String sensorType = "dummy";
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
@@ -100,8 +105,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(Constants.DEFAULT_PARSER_ERROR_TOPIC, 1));
-      add(new KafkaComponent.Topic(Constants.DEFAULT_PARSER_INVALID_TOPIC, 1));
+      add(new KafkaComponent.Topic(ERROR_TOPIC, 1));
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
@@ -131,17 +135,20 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
       kafkaComponent.writeMessages(sensorType, inputMessages);
       ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor());
       Map<String,List<JSONObject>> outputMessages = result.getResult();
-      Assert.assertEquals(3, outputMessages.size());
+      Assert.assertEquals(2, outputMessages.size());
       Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
       Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action"));
-      Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).size());
-      Assert.assertEquals("error", outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage"));
-      Assert.assertTrue(Arrays.equals(listToBytes(outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage_bytes"))
-                                     , "error".getBytes()
-                                     )
-                      );
-      Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).size());
-      Assert.assertEquals("invalid", outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).get(0).get("action"));
+      Assert.assertEquals(2, outputMessages.get(ERROR_TOPIC).size());
+      JSONObject invalidMessage = outputMessages.get(ERROR_TOPIC).get(0);
+      Assert.assertEquals(Constants.ErrorType.PARSER_INVALID.getType(), invalidMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
+      JSONObject rawMessage = JSONUtils.INSTANCE.load((String) invalidMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()), JSONObject.class);
+      Assert.assertEquals("foo", rawMessage.get("dummy"));
+      Assert.assertEquals("invalid", rawMessage.get("action"));
+      JSONObject errorMessage = outputMessages.get(ERROR_TOPIC).get(1);
+      Assert.assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
+      Assert.assertEquals("error", errorMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
+      // It's unclear if we need a rawMessageBytes field so commenting out for now
+      //Assert.assertTrue(Arrays.equals(listToBytes(errorMessage.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName())), "error".getBytes()));
     }
     finally {
       if(runner != null) {
@@ -182,13 +189,12 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     return new KafkaProcessor<>()
             .withKafkaComponentName("kafka")
             .withReadTopic(Constants.ENRICHMENT_TOPIC)
-            .withErrorTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC)
-            .withInvalidTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC)
+            .withErrorTopic(ERROR_TOPIC)
             .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
               @Nullable
               @Override
               public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-                return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 1) && (messageSet.getInvalids().size() ==1);
+                return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2);
               }
             })
             .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){
@@ -197,8 +203,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
               public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
                 return new HashMap<String, List<JSONObject>>() {{
                   put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
-                  put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(messageSet.getErrors()));
-                  put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(messageSet.getInvalids()));
+                  put(ERROR_TOPIC, loadMessages(messageSet.getErrors()));
                 }};
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties
index 8489d8e..d8dd25f 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -26,7 +26,7 @@ kafka.start=WHERE_I_LEFT_OFF
 
 ##### Indexing #####
 index.input.topic=indexing
-index.error.topic=indexing_error
+index.error.topic=indexing
 writer.class.name=org.apache.metron.solr.writer.SolrWriter
 
 ##### Metrics #####

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
index abf3a8a..c209ef3 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -18,14 +18,17 @@
 package org.apache.metron.solr.integration;
 
 import com.google.common.base.Function;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.interfaces.FieldNameConverter;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.integration.utils.SampleUtil;
 import org.apache.metron.indexing.integration.IndexingIntegrationTest;
-import org.apache.metron.integration.*;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.solr.integration.components.SolrComponent;
@@ -89,7 +92,7 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest {
             throw new IllegalStateException("Unable to retrieve indexed documents.", e);
           }
           if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
-            errors = kafkaComponent.readMessages(Constants.INDEXING_ERROR_TOPIC);
+            errors = kafkaComponent.readMessages(ERROR_TOPIC);
             if(errors.size() > 0){
               return ReadinessState.READY;
             }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java
new file mode 100644
index 0000000..ad24283
--- /dev/null
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.test.error;
+
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.mockito.ArgumentMatcher;
+
+public class MetronErrorJSONMatcher extends ArgumentMatcher<Values> {
+
+  private JSONObject expected;
+
+  public MetronErrorJSONMatcher(JSONObject expected) {
+    this.expected = expected;
+  }
+
+  @Override
+  public boolean matches(Object o) {
+    Values values = (Values) o;
+    JSONObject actual = (JSONObject) values.get(0);
+    actual.remove("timestamp");
+    expected.remove("timestamp");
+    actual.remove("stack");
+    expected.remove("stack");
+    return actual.equals(expected);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 124ffd3..0a9e514 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -18,18 +18,26 @@
 
 package org.apache.metron.writer;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public class BulkWriterComponent<MESSAGE_T> {
   public static final Logger LOG = LoggerFactory
@@ -60,18 +68,23 @@ public class BulkWriterComponent<MESSAGE_T> {
     commit(response.getSuccesses());
   }
 
-  public void error(Throwable e, Iterable<Tuple> tuples) {
+  public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
     tuples.forEach(t -> collector.ack(t));
+    MetronError error = new MetronError()
+            .withSensorType(sensorType)
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e);
     if(!Iterables.isEmpty(tuples)) {
       LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
-      ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
     }
+    tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+    ErrorUtils.handleError(collector, error);
   }
 
-  public void error(BulkWriterResponse errors) {
+  public void error(String sensorType, BulkWriterResponse errors, MessageGetStrategy messageGetStrategy) {
     Map<Throwable, Collection<Tuple>> errorMap = errors.getErrors();
     for(Map.Entry<Throwable, Collection<Tuple>> entry : errorMap.entrySet()) {
-      error(entry.getKey(), entry.getValue());
+      error(sensorType, entry.getKey(), entry.getValue(), messageGetStrategy);
     }
   }
 
@@ -80,24 +93,25 @@ public class BulkWriterComponent<MESSAGE_T> {
   }
 
 
-  public void errorAll(Throwable e) {
-    for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) {
-      error(e, kv.getValue());
-      sensorTupleMap.remove(kv.getKey());
-      sensorMessageMap.remove(kv.getKey());
+  public void errorAll(Throwable e, MessageGetStrategy messageGetStrategy) {
+    for(String key : new HashSet<>(sensorTupleMap.keySet())) {
+      errorAll(key, e, messageGetStrategy);
     }
   }
 
-  public void errorAll(String sensorType, Throwable e) {
-    error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()));
+  public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) {
+    Collection<Tuple> tuples = Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>());
+    error(sensorType, e, tuples, messageGetStrategy);
     sensorTupleMap.remove(sensorType);
     sensorMessageMap.remove(sensorType);
   }
+
   public void write( String sensorType
                    , Tuple tuple
                    , MESSAGE_T message
                    , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
                    , WriterConfiguration configurations
+                   , MessageGetStrategy messageGetStrategy
                    ) throws Exception
   {
     if(!configurations.isEnabled(sensorType)) {
@@ -129,13 +143,13 @@ public class BulkWriterComponent<MESSAGE_T> {
         }
 
         if(handleError) {
-          error(response);
+          error(sensorType, response, messageGetStrategy);
         } else if (response.hasErrors()) {
           throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors());
         }
       } catch (Throwable e) {
         if(handleError) {
-          error(e, tupleList);
+          error(sensorType, e, tupleList, messageGetStrategy);
         }
         else {
           throw e;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 66c4c73..085ca5c 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,27 +17,27 @@
  */
 package org.apache.metron.writer.bolt;
 
-import org.apache.metron.common.bolt.ConfiguredIndexingBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredIndexingBolt;
 import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.metron.writer.WriterToBulkWriter;
-import org.apache.metron.writer.message.MessageGetter;
-import org.apache.metron.writer.message.MessageGetters;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Map;
 import java.util.function.Function;
 
 public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
@@ -46,8 +46,9 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
           .getLogger(BulkMessageWriterBolt.class);
   private BulkMessageWriter<JSONObject> bulkMessageWriter;
   private BulkWriterComponent<JSONObject> writerComponent;
-  private String messageGetterStr = MessageGetters.NAMED.name();
-  private transient MessageGetter messageGetter = null;
+  private String messageGetStrategyType = MessageGetters.DEFAULT_JSON_FROM_FIELD.name();
+  private String messageGetField;
+  private transient MessageGetStrategy messageGetStrategy;
   private transient OutputCollector collector;
   private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation;
   public BulkMessageWriterBolt(String zookeeperUrl) {
@@ -64,8 +65,13 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     return this;
   }
 
-  public BulkMessageWriterBolt withMessageGetter(String messageGetter) {
-    this.messageGetterStr = messageGetter;
+  public BulkMessageWriterBolt withMessageGetter(String messageGetStrategyType) {
+    this.messageGetStrategyType = messageGetStrategyType;
+    return this;
+  }
+
+  public BulkMessageWriterBolt withMessageGetterField(String messageGetField) {
+    this.messageGetField = messageGetField;
     return this;
   }
 
@@ -74,7 +80,11 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     this.writerComponent = new BulkWriterComponent<>(collector);
     this.collector = collector;
     super.prepare(stormConf, context, collector);
-    messageGetter = MessageGetters.valueOf(messageGetterStr);
+    if (messageGetField != null) {
+      messageGetStrategy = MessageGetters.valueOf(messageGetStrategyType).get(messageGetField);
+    } else {
+      messageGetStrategy = MessageGetters.valueOf(messageGetStrategyType).get();
+    }
     if(bulkMessageWriter instanceof WriterToBulkWriter) {
       configurationTransformation = WriterToBulkWriter.TRANSFORMATION;
     }
@@ -93,7 +103,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
-    JSONObject message = messageGetter.getMessage(tuple);
+    JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
     String sensorType = MessageUtils.getSensorType(message);
     try
     {
@@ -107,6 +117,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
                            , message
                            , bulkMessageWriter
                            , writerConfiguration
+                           , messageGetStrategy
                            );
       LOG.trace("Writing enrichment message: {}", message);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
deleted file mode 100644
index 99c825f..0000000
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.message;
-
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-
-public interface MessageGetter {
-  JSONObject getMessage(Tuple t);
-}