You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/10/18 19:00:06 UTC

[1/2] metron git commit: METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213

Repository: metron
Updated Branches:
  refs/heads/master 08f3de0fe -> 28542ad64


http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
new file mode 100644
index 0000000..5f05b24
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.filters.StellarFilter;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.parsers.ParserRunnerImpl.ProcessResult;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ParserRunnerImpl.class, ReflectionUtils.class, Filters.class})
+public class ParserRunnerImplTest {
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  /**
+   {
+   "fieldValidations" : [
+     {
+       "input" : [ "ip_src_addr", "ip_dst_addr"],
+       "validation" : "IP"
+     }
+   ]
+   }
+   */
+  @Multiline
+  private String globalConfigString;
+
+  /**
+   {
+     "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+     "filterClassName":"org.apache.metron.parsers.filters.StellarFilter",
+     "sensorTopic":"bro",
+     "parserConfig": {
+       "field": "value"
+     },
+     "fieldTransformations" : [
+       {
+         "input" : "field1",
+         "transformation" : "REMOVE"
+       }
+     ]
+   }
+   */
+  @Multiline
+  private String broConfigString;
+
+  /**
+   {
+     "parserClassName":"org.apache.metron.parsers.snort.BasicSnortParser",
+     "sensorTopic":"snort",
+     "parserConfig": {}
+   }
+   */
+  @Multiline
+  private String snortConfigString;
+
+  private ParserConfigurations parserConfigurations;
+  private MessageParser<JSONObject> broParser;
+  private MessageParser<JSONObject> snortParser;
+  private MessageFilter<JSONObject> stellarFilter;
+  private ParserRunnerImpl parserRunner;
+
+
+  @Before
+  public void setup() throws IOException {
+    parserConfigurations = new ParserConfigurations();
+    SensorParserConfig broConfig = SensorParserConfig.fromBytes(broConfigString.getBytes());
+    SensorParserConfig snortConfig = SensorParserConfig.fromBytes(snortConfigString.getBytes());
+    parserConfigurations.updateSensorParserConfig("bro", broConfig);
+    parserConfigurations.updateSensorParserConfig("snort", snortConfig);
+    parserConfigurations.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfigString, JSONUtils.MAP_SUPPLIER));
+    parserRunner = new ParserRunnerImpl(new HashSet<>(Arrays.asList("bro", "snort")));
+    broParser = mock(MessageParser.class);
+    snortParser = mock(MessageParser.class);
+    stellarFilter = mock(StellarFilter.class);
+    mockStatic(ReflectionUtils.class);
+    mockStatic(Filters.class);
+
+    when(ReflectionUtils.createInstance("org.apache.metron.parsers.bro.BasicBroParser")).thenReturn(broParser);
+    when(ReflectionUtils.createInstance("org.apache.metron.parsers.snort.BasicSnortParser")).thenReturn(snortParser);
+    when(Filters.get("org.apache.metron.parsers.filters.StellarFilter", broConfig.getParserConfig()))
+            .thenReturn(stellarFilter);
+
+  }
+
+  @Test
+  public void shouldThrowExceptionOnEmptyParserSupplier() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("A parser config supplier must be set before initializing the ParserRunner.");
+
+    parserRunner.init(null, null);
+  }
+
+  @Test
+  public void shouldThrowExceptionOnEmptyStellarContext() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("A stellar context must be set before initializing the ParserRunner.");
+
+    parserRunner.init(() -> parserConfigurations, null);
+  }
+
+  @Test
+  public void initShouldThrowExceptionOnMissingSensorParserConfig() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Could not initialize parsers.  Cannot find configuration for sensor test.");
+
+    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+      add("test");
+    }});
+
+    parserRunner.init(() -> parserConfigurations, mock(Context.class));
+  }
+
+  @Test
+  public void executeShouldThrowExceptionOnMissingSensorParserConfig() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Could not execute parser.  Cannot find configuration for sensor test.");
+
+    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+      add("test");
+    }});
+
+    parserRunner.execute("test", mock(RawMessage.class), parserConfigurations);
+  }
+
+  @Test
+  public void shouldInit() throws Exception {
+    Context stellarContext = mock(Context.class);
+    Map<String, Object> broParserConfig = parserConfigurations.getSensorParserConfig("bro").getParserConfig();
+    Map<String, Object> snortParserConfig = parserConfigurations.getSensorParserConfig("snort").getParserConfig();
+
+    parserRunner.init(() -> parserConfigurations, stellarContext);
+
+    {
+      // Verify Stellar context
+      Assert.assertEquals(stellarContext, parserRunner.getStellarContext());
+    }
+
+    Map<String, ParserComponent> sensorToParserComponentMap = parserRunner.getSensorToParserComponentMap();
+    {
+      // Verify Bro parser initialization
+      Assert.assertEquals(2, sensorToParserComponentMap.size());
+      ParserComponent broComponent = sensorToParserComponentMap.get("bro");
+      Assert.assertEquals(broParser, broComponent.getMessageParser());
+      Assert.assertEquals(stellarFilter, broComponent.getFilter());
+      verify(broParser, times(1)).init();
+      verify(broParser, times(1)).configure(broParserConfig);
+      verifyNoMoreInteractions(broParser);
+      verifyNoMoreInteractions(stellarFilter);
+    }
+    {
+      // Verify Snort parser initialization
+      ParserComponent snortComponent = sensorToParserComponentMap.get("snort");
+      Assert.assertEquals(snortParser, snortComponent.getMessageParser());
+      Assert.assertNull(snortComponent.getFilter());
+      verify(snortParser, times(1)).init();
+      verify(snortParser, times(1)).configure(snortParserConfig);
+      verifyNoMoreInteractions(snortParser);
+    }
+  }
+
+  @Test
+  public void shouldExecute() {
+    parserRunner = spy(parserRunner);
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+    JSONObject parsedMessage1 = new JSONObject();
+    parsedMessage1.put("field", "parsedMessage1");
+    JSONObject parsedMessage2 = new JSONObject();
+    parsedMessage2.put("field", "parsedMessage2");
+    Object rawMessage1 = new RawMessage("raw_message1".getBytes(), new HashMap<>());
+    Object rawMessage2 = new RawMessage("raw_message2".getBytes(), new HashMap<>());
+    Throwable throwable1 = mock(Throwable.class);
+    Throwable throwable2 = mock(Throwable.class);
+    MessageParserResult<JSONObject> messageParserResult = new DefaultMessageParserResult<>(Arrays.asList(parsedMessage1, parsedMessage2),
+            new HashMap<Object, Throwable>(){{
+              put(rawMessage1, throwable1);
+              put(rawMessage2, throwable2);
+            }});
+    JSONObject processedMessage = new JSONObject();
+    processedMessage.put("field", "processedMessage1");
+    MetronError processedError = new MetronError().withMessage("processedError");
+    ProcessResult processedMessageResult = mock(ProcessResult.class);
+    ProcessResult processedErrorResult = mock(ProcessResult.class);
+
+    when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult));
+    when(processedMessageResult.getMessage()).thenReturn(processedMessage);
+    when(processedErrorResult.isError()).thenReturn(true);
+    when(processedErrorResult.getError()).thenReturn(processedError);
+    doReturn(Optional.of(processedMessageResult)).when(parserRunner)
+            .processMessage("bro", parsedMessage1, rawMessage, broParser, parserConfigurations);
+    doReturn(Optional.of(processedErrorResult)).when(parserRunner)
+            .processMessage("bro", parsedMessage2, rawMessage, broParser, parserConfigurations);
+
+    MetronError expectedParseError1 = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(throwable1)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(rawMessage1);
+    MetronError expectedParseError2 = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(throwable2)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(rawMessage2);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+    ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute("bro", rawMessage, parserConfigurations);
+
+    Assert.assertEquals(1, parserRunnerResults.getMessages().size());
+    Assert.assertTrue(parserRunnerResults.getMessages().contains(processedMessage));
+    Assert.assertEquals(3, parserRunnerResults.getErrors().size());
+    Assert.assertTrue(parserRunnerResults.getErrors().contains(processedError));
+    Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError1));
+    Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError2));
+  }
+
+  @Test
+  public void shouldExecuteWithMasterThrowable() {
+    parserRunner = spy(parserRunner);
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+    Throwable masterThrowable = mock(Throwable.class);
+    MessageParserResult<JSONObject> messageParserResult = new DefaultMessageParserResult<>(masterThrowable);
+
+
+    when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult));
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+    ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute("bro", rawMessage, parserConfigurations);
+
+    verify(parserRunner, times(0))
+            .processMessage(any(), any(), any(), any(), any());
+
+    MetronError expectedError = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(masterThrowable)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(rawMessage.getMessage());
+    Assert.assertEquals(1, parserRunnerResults.getErrors().size());
+    Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedError));
+  }
+
+  @Test
+  public void shouldPopulateMessagesOnProcessMessage() {
+    JSONObject inputMessage = new JSONObject();
+    inputMessage.put("guid", "guid");
+    inputMessage.put("ip_src_addr", "192.168.1.1");
+    inputMessage.put("ip_dst_addr", "192.168.1.2");
+    inputMessage.put("field1", "value");
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+
+    JSONObject expectedOutput  = new JSONObject();
+    expectedOutput.put("guid", "guid");
+    expectedOutput.put("source.type", "bro");
+    expectedOutput.put("ip_src_addr", "192.168.1.1");
+    expectedOutput.put("ip_dst_addr", "192.168.1.2");
+
+    when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
+    when(broParser.validate(expectedOutput)).thenReturn(true);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+
+    Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations);
+
+    Assert.assertTrue(processResult.isPresent());
+    Assert.assertFalse(processResult.get().isError());
+    Assert.assertEquals(expectedOutput, processResult.get().getMessage());
+  }
+
+  @Test
+  public void shouldReturnMetronErrorOnInvalidMessage() {
+    JSONObject inputMessage = new JSONObject();
+    inputMessage.put("guid", "guid");
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+
+    JSONObject expectedOutput  = new JSONObject();
+    expectedOutput.put("guid", "guid");
+    expectedOutput.put("source.type", "bro");
+    MetronError expectedMetronError = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(inputMessage);
+
+    when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
+    when(broParser.validate(expectedOutput)).thenReturn(false);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+
+    Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations);
+
+    Assert.assertTrue(processResult.isPresent());
+    Assert.assertTrue(processResult.get().isError());
+    Assert.assertEquals(expectedMetronError, processResult.get().getError());
+  }
+
+  @Test
+  public void shouldReturnMetronErrorOnFailedFieldValidator() {
+    JSONObject inputMessage = new JSONObject();
+    inputMessage.put("guid", "guid");
+    inputMessage.put("ip_src_addr", "test");
+    inputMessage.put("ip_dst_addr", "test");
+    RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>());
+
+    JSONObject expectedOutput  = new JSONObject();
+    expectedOutput.put("guid", "guid");
+    expectedOutput.put("ip_src_addr", "test");
+    expectedOutput.put("ip_dst_addr", "test");
+    expectedOutput.put("source.type", "bro");
+    MetronError expectedMetronError = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(Collections.singleton("bro"))
+            .addRawMessage(inputMessage)
+            .withErrorFields(new HashSet<>(Arrays.asList("ip_src_addr", "ip_dst_addr")));
+
+    when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true);
+    when(broParser.validate(expectedOutput)).thenReturn(true);
+
+    parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{
+      put("bro", new ParserComponent(broParser, stellarFilter));
+    }});
+
+    Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations);
+
+    Assert.assertTrue(processResult.isPresent());
+    Assert.assertTrue(processResult.get().isError());
+    Assert.assertEquals(expectedMetronError, processResult.get().getError());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index e5e7180..9f58d1c 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,719 +17,428 @@
  */
 package org.apache.metron.parsers.bolt;
 
-import com.google.common.collect.ImmutableList;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.parsers.DefaultMessageParserResult;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.DefaultParserRunnerResults;
+import org.apache.metron.parsers.ParserRunnerResults;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
+import org.apache.storm.Config;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.function.Supplier;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ParserBoltTest extends BaseBoltTest {
 
-  @Mock
-  private MultilineMessageParser<JSONObject> parser;
-
-  @Mock
-  private MessageWriter<JSONObject> writer;
-
-  @Mock
-  private BulkMessageWriter<JSONObject> batchWriter;
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
 
   @Mock
-  private MessageFilter<JSONObject> filter;
+  private Tuple t1;
 
   @Mock
-  private Tuple t1;
+  private ParserRunnerImpl parserRunner;
 
   @Mock
-  private Tuple t2;
+  private WriterHandler writerHandler;
 
   @Mock
-  private Tuple t3;
+  private WriterHandler writerHandlerHandleAck;
 
   @Mock
-  private Tuple t4;
+  private MessageGetStrategy messageGetStrategy;
 
   @Mock
-  private Tuple t5;
+  private Context stellarContext;
 
-  private static class RecordingWriter implements BulkMessageWriter<JSONObject> {
-    List<JSONObject> records = new ArrayList<>();
+  private class MockParserRunner extends ParserRunnerImpl {
 
-    @Override
-    public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception {
+    private boolean isInvalid = false;
+    private RawMessage rawMessage;
+    private JSONObject message;
 
+    public MockParserRunner(HashSet<String> sensorTypes) {
+      super(sensorTypes);
     }
 
     @Override
-    public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
-      records.addAll(messages);
-      BulkWriterResponse ret = new BulkWriterResponse();
-      ret.addAllSuccesses(tuples);
-      return ret;
+    public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+      DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
+      this.rawMessage = rawMessage;
+      if (!isInvalid) {
+        parserRunnerResults.addMessage(message);
+      } else {
+        MetronError error = new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(message);
+        parserRunnerResults.addError(error);
+      }
+      return parserRunnerResults;
     }
 
-    @Override
-    public String getName() {
-      return "recording";
+    protected void setInvalid(boolean isInvalid) {
+      this.isInvalid = isInvalid;
     }
 
-    @Override
-    public void close() throws Exception {
-
+    protected void setMessage(JSONObject message) {
+      this.message = message;
     }
 
-    public List<JSONObject> getRecords() {
-      return records;
+    protected RawMessage getRawMessage() {
+      return rawMessage;
     }
   }
 
-  private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-    return createUpdater(Optional.empty());
-  }
-  private static ConfigurationsUpdater<ParserConfigurations> createUpdater(Optional<Integer> batchSize) {
-    return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
-      @Override
-      public void update(CuratorFramework client, String path, byte[] data) throws IOException { }
+  @Before
+  public void setup() {
+    when(writerHandler.handleAck()).thenReturn(false);
+    when(writerHandlerHandleAck.handleAck()).thenReturn(true);
 
-      @Override
-      public void delete(CuratorFramework client, String path, byte[] data) throws IOException { }
-
-      @Override
-      public ConfigurationType getType() {
-        return ConfigurationType.PARSER;
-      }
-
-      @Override
-      public void update(String name, byte[] data) throws IOException { }
-
-      @Override
-      public void delete(String name) { }
-
-      @Override
-      public Class<ParserConfigurations> getConfigurationClass() {
-        return ParserConfigurations.class;
-      }
-
-      @Override
-      public void forceUpdate(CuratorFramework client) { }
-
-      @Override
-      public ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  if(batchSize.isPresent()) {
-                    put(IndexingConfigurations.BATCH_SIZE_CONF, batchSize.get());
-                  }
-                }};
-              }
-            };
-          }
-        };
-      }
-    };
   }
 
-
   @Test
-  public void testEmpty() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(writer)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
-      }
-    };
-
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(writer, times(1)).init();
-    byte[] sampleBinary = "some binary message".getBytes();
-
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parseOptionalResult(sampleBinary)).thenReturn(null);
-    parserBolt.execute(tuple);
-    verify(parser, times(0)).validate(any());
-    verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any());
-    verify(outputCollector, times(1)).ack(tuple);
-
-    MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.PARSER_ERROR)
-            .withThrowable(new NullPointerException())
-            .withSensorType(Collections.singleton(sensorType))
-            .addRawMessage(sampleBinary);
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+  public void shouldThrowExceptionOnDifferentHandleAck() {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("All writers must match when calling handleAck()");
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+      put("bro", writerHandlerHandleAck);
+    }});
   }
 
   @Test
-  public void testInvalid() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(writer)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
-      }
-    };
+  public void withBatchTimeoutDivisorShouldSetBatchTimeoutDivisor() {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}).withBatchTimeoutDivisor(5);
 
-    buildGlobalConfig(parserBolt);
+    Assert.assertEquals(5, parserBolt.getBatchTimeoutDivisor());
+  }
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    byte[] sampleBinary = "some binary message".getBytes();
-
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    JSONObject parsedMessage = new JSONObject();
-    parsedMessage.put("field", "invalidValue");
-    parsedMessage.put("guid", "this-is-unique-identifier-for-tuple");
-    List<JSONObject> messageList = new ArrayList<>();
-    messageList.add(parsedMessage);
-    when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messageList)));
-    when(parser.validate(parsedMessage)).thenReturn(true);
-    parserBolt.execute(tuple);
+  @Test
+  public void shouldThrowExceptionOnInvalidBatchTimeoutDivisor() {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("batchTimeoutDivisor must be positive. Value provided was -1");
 
-    MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.PARSER_INVALID)
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorFields(new HashSet<String>() {{ add("field"); }})
-            .addRawMessage(new JSONObject(){{
-              put("field", "invalidValue");
-              put("source.type", "yaf");
-              put("guid", "this-is-unique-identifier-for-tuple");
-            }});
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}).withBatchTimeoutDivisor(-1);
   }
 
   @Test
-  public void test() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(writer)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldGetComponentConfiguration() {
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        ParserConfigurations configurations = new ParserConfigurations();
+        SensorParserConfig sensorParserConfig = new SensorParserConfig();
+        sensorParserConfig.setParserConfig(new HashMap<String, Object>() {{
+            put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
+        }});
+        configurations.updateSensorParserConfig("yaf", sensorParserConfig);
+        return configurations;
       }
     };
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(writer, times(1)).init();
-    byte[] sampleBinary = "some binary message".getBytes();
-    JSONParser jsonParser = new JSONParser();
-    final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
-    final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
-    List<JSONObject> messages = new ArrayList<JSONObject>() {{
-      add(sampleMessage1);
-      add(sampleMessage2);
-    }};
-    final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
-    final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
-    when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messages)));
-    when(parser.validate(eq(messages.get(0)))).thenReturn(true);
-    when(parser.validate(eq(messages.get(1)))).thenReturn(false);
-    parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage1));
-    verify(outputCollector, times(1)).ack(tuple);
-    when(parser.validate(eq(messages.get(0)))).thenReturn(true);
-    when(parser.validate(eq(messages.get(1)))).thenReturn(true);
-    when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false);
-    when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true);
-    parserBolt.execute(tuple);
-    verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
-    verify(outputCollector, times(2)).ack(tuple);
-    doThrow(new Exception()).when(writer).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
-    parserBolt.execute(tuple);
-    verify(outputCollector, times(1)).reportError(any(Throwable.class));
-  }
 
-  /**
-   {
-    "filterClassName" : "STELLAR"
-   ,"parserConfig" : {
-    "filter.query" : "exists(field1)"
-    }
-   }
-   */
-  @Multiline
-  public static String sensorParserConfig;
-
-  /**
-   * Tests to ensure that a message that is unfiltered results in one write and an ack.
-   * @throws Exception
-   */
-  @Test
-  public void testFilterSuccess() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig);
-
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    BulkWriterResponse successResponse = mock(BulkWriterResponse.class);
-    when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1));
-    when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse);
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
-      put("field1", "blah");
-    }})))));
-    parserBolt.execute(t1);
-    verify(batchWriter, times(1)).write(any(), any(), any(), any());
-    verify(outputCollector, times(1)).ack(t1);
+    Map<String, Object> componentConfiguration = parserBolt.getComponentConfiguration();
+    Assert.assertEquals(1, componentConfiguration.size());
+    Assert.assertEquals( 14, componentConfiguration.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS));
   }
 
-
-  /**
-   * Tests to ensure that a message filtered out results in no writes, but an ack.
-   * @throws Exception
-   */
   @Test
-  public void testFilterFailure() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            null,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldPrepare() {
+    Map stormConf = mock(Map.class);
+    SensorParserConfig yafConfig = mock(SensorParserConfig.class);
+    when(yafConfig.getSensorTopic()).thenReturn("yafTopic");
+    when(yafConfig.getParserConfig()).thenReturn(new HashMap<String, Object>() {{
+      put(IndexingConfigurations.BATCH_SIZE_CONF, 10);
+    }});
+    ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+
+    ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
       protected SensorParserConfig getSensorParserConfig(String sensorType) {
-        try {
-          return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+        if ("yaf".equals(sensorType)) {
+          return yafConfig;
         }
+        return null;
       }
 
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
-    };
+    });
+    doReturn(stellarContext).when(parserBolt).initializeStellar();
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
-      put("field2", "blah");
-    }})))));
-    parserBolt.execute(t1);
-    verify(batchWriter, times(0)).write(any(), any(), any(), any());
-    verify(outputCollector, times(1)).ack(t1);
-  }
-  /**
-  {
-     "sensorTopic":"dummy"
-     ,"parserConfig": {
-      "batchSize" : 1
-     }
-      ,"fieldTransformations" : [
-          {
-           "transformation" : "STELLAR"
-          ,"output" : "timestamp"
-          ,"config" : {
-            "timestamp" : "TO_EPOCH_TIMESTAMP(timestampstr, 'yyyy-MM-dd HH:mm:ss', 'UTC')"
-                      }
-          }
-                               ]
-   }
-   */
-  @Multiline
-  public static String csvWithFieldTransformations;
 
-  @Test
-  public void testFieldTransformationPriorToValidation() {
-    String sensorType = "dummy";
-    RecordingWriter recordingWriter = new RecordingWriter();
-    //create a parser which acts like a basic parser but returns no timestamp field.
-    MultilineMessageParser<JSONObject> dummyParser = new MultilineMessageParser<JSONObject>() {
-      @Override
-      public void configure(Map<String, Object> config) {
-      }
+    parserBolt.prepare(stormConf, topologyContext, outputCollector);
 
-      @Override
-      public void init() {
-      }
+    verify(parserRunner, times(1)).init(any(Supplier.class), eq(stellarContext));
+    verify(yafConfig, times(1)).init();
+    Map<String, String> topicToSensorMap = parserBolt.getTopicToSensorMap();
+    Assert.assertEquals(1, topicToSensorMap.size());
+    Assert.assertEquals("yaf", topicToSensorMap.get("yafTopic"));
+    verify(writerHandler).init(stormConf, topologyContext, outputCollector, parserConfigurations);
+    verify(writerHandler).setDefaultBatchTimeout(14);
+  }
 
-      @Override
-      public boolean validate(JSONObject message) {
-        Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName());
-        if (timestampObject instanceof Long) {
-          Long timestamp = (Long) timestampObject;
-          return timestamp > 0;
-        }
-        return false;
-      }
+  @Test
+  public void shouldThrowExceptionOnMissingConfig() {
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Unable to retrieve a parser config for yaf");
 
-      @Override
-      @SuppressWarnings("unchecked")
-      public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) {
-        return Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject() {{
-          put("data", "foo");
-          put("timestampstr", "2016-01-05 17:02:30");
-          put("original_string", "blah");
-        }})));
-      }
-    };
+    Map stormConf = mock(Map.class);
 
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            dummyParser,
-            null,
-            new WriterHandler(recordingWriter)
-        )
-    );
-    ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations);
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }});
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    when(t1.getBinary(0)).thenReturn(new byte[] {});
-    parserBolt.execute(t1);
-    Assert.assertEquals(1, recordingWriter.getRecords().size());
-    long expected = 1452013350000L;
-    Assert.assertEquals(expected, recordingWriter.getRecords().get(0).get("timestamp"));
+
+    parserBolt.prepare(stormConf, topologyContext, outputCollector);
   }
 
+
   @Test
-  public void testDefaultBatchSize() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void executeShouldHandleTickTuple() throws Exception {
+    when(t1.getSourceComponent()).thenReturn("__system");
+    when(t1.getSourceStreamId()).thenReturn("__tick");
+    ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        // this uses default batch size
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    BulkWriterResponse response = new BulkWriterResponse();
-    Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE];
-    for (int i=0; i < uniqueTuples.length; i++) {
-      uniqueTuples[i] = mock(Tuple.class);
-      response.addSuccess(uniqueTuples[i]);
-    }
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response);
-    for (Tuple tuple : uniqueTuples) {
-      parserBolt.execute(tuple);
-    }
-    for (Tuple uniqueTuple : uniqueTuples) {
-      verify(outputCollector, times(1)).ack(uniqueTuple);
-    }
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+
+    parserBolt.execute(t1);
+
+    verify(writerHandler, times(1)).flush(parserConfigurations, messageGetStrategy);
+    verify(outputCollector, times(1)).ack(t1);
   }
 
   @Test
-  public void testLessRecordsThanDefaultBatchSize() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldExecuteOnSuccess() throws Exception {
+    when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        // this uses default batch size
-        return ParserBoltTest.createUpdater();
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1;
-    BulkWriterResponse response = new BulkWriterResponse();
-    Tuple[] uniqueTuples = new Tuple[oneLessThanDefaultBatchSize];
-    for (int i=0; i < uniqueTuples.length; i++) {
-      uniqueTuples[i] = mock(Tuple.class);
-      response.addSuccess(uniqueTuples[i]);
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    JSONObject message = new JSONObject();
+    message.put("field", "value");
+    mockParserRunner.setMessage(message);
+    RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+
+    {
+      // Verify the correct message is written and ack is handled
+      parserBolt.execute(t1);
+
+      Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
+      verify(writerHandler, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy);
+      verify(outputCollector, times(1)).ack(t1);
     }
-    for (Tuple tuple : uniqueTuples) {
-      parserBolt.execute(tuple);
+    {
+      // Verify the tuple is not acked when the writer is set to handle ack
+      reset(outputCollector);
+      parserBolt.setSensorToWriterMap(new HashMap<String, WriterHandler>() {{
+        put("yaf", writerHandlerHandleAck);
+      }});
+
+      parserBolt.execute(t1);
+
+      verify(writerHandlerHandleAck, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy);
+      verify(outputCollector, times(0)).ack(t1);
     }
-    // should have no acking yet - batch size not fulfilled
-    verify(outputCollector, never()).ack(any(Tuple.class));
-    response.addSuccess(t1); // used to achieve count in final verify
-    Iterable<Tuple> tuples = new HashSet(Arrays.asList(uniqueTuples)) {{
-      add(t1);
-    }};
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
-    // meet batch size requirement and now it should ack
-    parserBolt.execute(t1);
-    verify(outputCollector, times(ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE)).ack(any(Tuple.class));
   }
 
   @Test
-  public void testBatchOfOne() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
+  public void shouldExecuteOnError() throws Exception {
+    when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{
+      add("yaf");
+    }});
+    mockParserRunner.setInvalid(true);
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
+
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addSuccess(t1);
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response);
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    JSONObject message = new JSONObject();
+    message.put("field", "value");
+    mockParserRunner.setMessage(message);
+    RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_INVALID)
+            .withSensorType(Collections.singleton("yaf"))
+            .addRawMessage(message);
+
     parserBolt.execute(t1);
+
+    Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
     verify(outputCollector, times(1)).ack(t1);
+
   }
 
   @Test
-  public void testBatchOfFive() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(5));
-      }
-    } ;
-
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    Set<Tuple> tuples = Stream.of(t1, t2, t3, t4, t5).collect(Collectors.toSet());
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tuples);
-    when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
-    writeNonBatch(outputCollector, parserBolt, t1);
-    writeNonBatch(outputCollector, parserBolt, t2);
-    writeNonBatch(outputCollector, parserBolt, t3);
-    writeNonBatch(outputCollector, parserBolt, t4);
-    parserBolt.execute(t5);
-    verify(batchWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any());
-    verify(outputCollector, times(1)).ack(t1);
-    verify(outputCollector, times(1)).ack(t2);
-    verify(outputCollector, times(1)).ack(t3);
-    verify(outputCollector, times(1)).ack(t4);
-    verify(outputCollector, times(1)).ack(t5);
+  public void shouldThrowExceptionOnFailedExecute() {
+    when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
 
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+    doThrow(new IllegalStateException("parserRunner.execute failed")).when(parserRunner).execute(eq("yaf"), any(), eq(parserConfigurations));
 
-  }
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
 
-  @Test
-  public void testBatchOfFiveWithError() throws Exception {
-    String sensorType = "yaf";
-    Map<String, ParserComponents> parserMap = Collections.singletonMap(
-        sensorType,
-        new ParserComponents(
-            parser,
-            filter,
-            new WriterHandler(batchWriter)
-        )
-    );
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(5));
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
 
-    parserBolt.setCuratorFramework(client);
-    parserBolt.setZKCache(cache);
-    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any(), any());
-
-    doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
-    when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
-    when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    parserBolt.execute(t1);
-    parserBolt.execute(t2);
-    parserBolt.execute(t3);
-    parserBolt.execute(t4);
-    parserBolt.execute(t5);
-    verify(batchWriter, times(1)).write(any(), any(), any(), any());
-    verify(outputCollector, times(1)).ack(t1);
-    verify(outputCollector, times(1)).ack(t2);
-    verify(outputCollector, times(1)).ack(t3);
-    verify(outputCollector, times(1)).ack(t4);
-    verify(outputCollector, times(1)).ack(t5);
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(new IllegalStateException("parserRunner.execute failed"))
+            .withSensorType(Collections.singleton("yaf"))
+            .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
 
-  }
+    parserBolt.execute(t1);
 
-  protected void buildGlobalConfig(ParserBolt parserBolt) {
-    HashMap<String, Object> globalConfig = new HashMap<>();
-    Map<String, Object> fieldValidation = new HashMap<>();
-    fieldValidation.put("input", Arrays.asList("field"));
-    fieldValidation.put("validation", "STELLAR");
-    fieldValidation.put("config", new HashMap<String, String>(){{ put("condition", "field != 'invalidValue'"); }});
-    globalConfig.put("fieldValidations", Arrays.asList(fieldValidation));
-    parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
+    verify(outputCollector, times(1)).ack(t1);
   }
 
-  private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap,
-      String csvWithFieldTransformations) {
-    return new ParserBolt("zookeeperUrl", parserMap) {
-      @Override
-      protected SensorParserConfig getSensorParserConfig(String sensorType) {
-        try {
-          return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
+  @Test
+  public void shouldThrowExceptionOnFailedWrite() throws Exception {
+    when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+    doThrow(new IllegalStateException("write failed")).when(writerHandler).write(any(), any(), any(), any(), any());
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
 
       @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
       }
     };
-  }
 
-  private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
-    bolt.execute(t);
-  }
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    JSONObject message = new JSONObject();
+    message.put("field", "value");
+    mockParserRunner.setMessage(message);
+
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(new IllegalStateException("write failed"))
+            .withSensorType(Collections.singleton("yaf"))
+            .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8));
 
+    parserBolt.execute(t1);
+
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
+    verify(outputCollector, times(1)).ack(t1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index 0d6eef8..31d87a7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -25,21 +25,20 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
@@ -83,41 +82,13 @@ public class ParserDriver implements Serializable {
     List<byte[]> errors = new ArrayList<>();
 
     public ShimParserBolt(List<byte[]> output) {
-      super(null
-          , Collections.singletonMap(
-              sensorType == null ? config.getSensorTopic() : sensorType,
-              new ParserComponents(
-              ReflectionUtils.createInstance(config.getParserClassName()),
-                  null,
-                  new WriterHandler(new CollectingWriter(output))
-              )
-         )
-      );
+      super(null, parserRunner, Collections.singletonMap(sensorType, new WriterHandler(new CollectingWriter(output))));
       this.output = output;
-      Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap();
-      for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) {
-        sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig());
-      }
     }
 
     @Override
     public ParserConfigurations getConfigurations() {
-      return new ParserConfigurations() {
-        @Override
-        public SensorParserConfig getSensorParserConfig(String sensorType) {
-          return config;
-        }
-
-        @Override
-        public Map<String, Object> getGlobalConfig() {
-          return globalConfig;
-        }
-
-        @Override
-        public List<FieldValidator> getFieldValidations() {
-          return new ArrayList<>();
-        }
-      };
+      return config;
     }
 
     @Override
@@ -125,7 +96,7 @@ public class ParserDriver implements Serializable {
     }
 
     @Override
-    protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+    protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
       errors.add(originalMessage);
       LOG.error("Error parsing message: " + ex.getMessage(), ex);
     }
@@ -139,14 +110,20 @@ public class ParserDriver implements Serializable {
   }
 
 
-  private SensorParserConfig config;
-  private Map<String, Object> globalConfig;
+  private ParserConfigurations config;
   private String sensorType;
+  private ParserRunner parserRunner;
 
   public ParserDriver(String sensorType, String parserConfig, String globalConfig) throws IOException {
-    config = SensorParserConfig.fromBytes(parserConfig.getBytes());
-    this.sensorType = sensorType;
-    this.globalConfig = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER);
+    SensorParserConfig sensorParserConfig = SensorParserConfig.fromBytes(parserConfig.getBytes());
+    this.sensorType = sensorType == null ? sensorParserConfig.getSensorTopic() : sensorType;
+    config = new ParserConfigurations();
+    config.updateSensorParserConfig(this.sensorType, SensorParserConfig.fromBytes(parserConfig.getBytes()));
+    config.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER));
+
+    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
+      add(sensorType);
+    }});
   }
 
   public ProcessorResult<List<byte[]>> run(Iterable<byte[]> in) {
@@ -163,6 +140,7 @@ public class ParserDriver implements Serializable {
 
   public Tuple toTuple(byte[] record) {
     Tuple ret = mock(Tuple.class);
+    when(ret.getStringByField("topic")).thenReturn(sensorType);
     when(ret.getBinary(eq(0))).thenReturn(record);
     return ret;
   }


[2/2] metron git commit: METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213

Posted by rm...@apache.org.
METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/28542ad6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/28542ad6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/28542ad6

Branch: refs/heads/master
Commit: 28542ad64cf63f17b728b4b1c0e995a8973767f7
Parents: 08f3de0
Author: merrimanr <me...@gmail.com>
Authored: Thu Oct 18 13:59:52 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Thu Oct 18 13:59:52 2018 -0500

----------------------------------------------------------------------
 .../impl/SensorParserConfigServiceImpl.java     |  51 +-
 .../parsers/DefaultParserRunnerResults.java     |  71 ++
 .../org/apache/metron/parsers/GrokParser.java   |   3 +-
 .../org/apache/metron/parsers/ParserRunner.java |  60 ++
 .../apache/metron/parsers/ParserRunnerImpl.java | 322 +++++++
 .../metron/parsers/ParserRunnerResults.java     |  33 +
 .../apache/metron/parsers/bolt/ParserBolt.java  | 381 +++-----
 .../parsers/filters/BroMessageFilter.java       |   2 +-
 .../metron/parsers/filters/StellarFilter.java   |   2 +-
 .../parsers/interfaces/MessageFilter.java       |   2 +-
 .../parsers/interfaces/MessageParser.java       |  27 +-
 .../interfaces/MultilineMessageParser.java      |  51 --
 .../metron/parsers/syslog/Syslog5424Parser.java |   4 +-
 .../parsers/topology/ParserComponent.java       |  56 ++
 .../parsers/topology/ParserComponents.java      |  67 --
 .../parsers/topology/ParserTopologyBuilder.java |  39 +-
 .../org/apache/metron/filters/FiltersTest.java  |   4 +-
 .../metron/parsers/MessageParserTest.java       | 108 ++-
 .../metron/parsers/ParserRunnerImplTest.java    | 390 +++++++++
 .../metron/parsers/bolt/ParserBoltTest.java     | 859 ++++++-------------
 .../parsers/integration/ParserDriver.java       |  60 +-
 21 files changed, 1481 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index 4cd272e..d0e4b3d 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -20,12 +20,10 @@ package org.apache.metron.rest.service.impl;
 import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.configuration.ConfigurationType;
@@ -35,18 +33,14 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
 import org.apache.metron.rest.service.SensorParserConfigService;
 import org.apache.metron.rest.util.ParserIndex;
-import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.json.simple.JSONObject;
-import org.reflections.Reflections;
-import org.reflections.util.ConfigurationBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -141,53 +135,13 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
     } else if (sensorParserConfig.getParserClassName() == null) {
       throw new RestException("SensorParserConfig must have a parserClassName");
     } else {
-      MultilineMessageParser<JSONObject> parser;
-      Object parserObject;
+      MessageParser<JSONObject> parser;
       try {
-        parserObject = Class.forName(sensorParserConfig.getParserClassName())
+        parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
             .newInstance();
       } catch (Exception e) {
         throw new RestException(e.toString(), e.getCause());
       }
-
-      if (!(parserObject instanceof MultilineMessageParser)) {
-        parser = new MultilineMessageParser<JSONObject>() {
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public void configure(Map<String, Object> config)  {
-            ((MessageParser<JSONObject>)parserObject).configure(config);
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public void init() {
-            ((MessageParser<JSONObject>)parserObject).init();
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public boolean validate(JSONObject message) {
-            return ((MessageParser<JSONObject>)parserObject).validate(message);
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public List<JSONObject> parse(byte[] message) {
-            return ((MessageParser<JSONObject>)parserObject).parse(message);
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public Optional<List<JSONObject>> parseOptional(byte[] message) {
-            return ((MessageParser<JSONObject>)parserObject).parseOptional(message);
-          }
-        };
-      } else {
-        parser = (MultilineMessageParser<JSONObject>)parserObject;
-      }
-
-
       Path temporaryGrokPath = null;
       if (isGrokConfig(sensorParserConfig)) {
         String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
@@ -195,7 +149,6 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
         sensorParserConfig.getParserConfig()
             .put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
       }
-
       parser.configure(sensorParserConfig.getParserConfig());
       parser.init();
 

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
new file mode 100644
index 0000000..79a9b5d
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Default implementation of ParserRunnerResults.
+ */
+public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> {
+
+  private List<JSONObject> messages = new ArrayList<>();
+  private List<MetronError> errors = new ArrayList<>();
+
+  public List<JSONObject> getMessages() {
+    return messages;
+  }
+
+  public List<MetronError> getErrors() {
+    return errors;
+  }
+
+  public void addMessage(JSONObject message) {
+    this.messages.add(message);
+  }
+
+  public void addError(MetronError error) {
+    this.errors.add(error);
+  }
+
+  public void addErrors(List<MetronError> errors) {
+    this.errors.addAll(errors);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ParserRunnerResults parserResult = (ParserRunnerResults) o;
+    return Objects.equals(messages, parserResult.getMessages()) &&
+            Objects.equals(errors, parserResult.getErrors());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = messages != null ? messages.hashCode() : 0;
+    result = 31 * result + (errors != null ? errors.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index a81149d..6bdfb81 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ import java.util.Optional;
 import java.util.TimeZone;
 
 
-public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable {
+public class GrokParser implements MessageParser<JSONObject>, Serializable {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
new file mode 100644
index 0000000..f9123b1
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.stellar.dsl.Context;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser.
+ * The information needed to initialize a MessageParser is supplied by the parser config supplier.  After the parsers
+ * are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object
+ * that contains a list of parsed messages and/or a list of errors.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunner<T> {
+
+  /**
+   * Return a list of all sensor types that can be parsed with this ParserRunner.
+   * @return Sensor types
+   */
+  Set<String> getSensorTypes();
+
+  /**
+   *
+   * @param parserConfigSupplier Supplies parser configurations
+   * @param stellarContext Stellar context used to apply Stellar functions during field transformations
+   */
+  void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext);
+
+  /**
+   * Parses a message and either returns the message or an error.
+   * @param sensorType Sensor type of the message
+   * @param rawMessage Raw message including metadata
+   * @param parserConfigurations Parser configurations
+   * @return ParserRunnerResults containing a list of messages and a list of errors
+   */
+  ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
new file mode 100644
index 0000000..a986db7
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * The default implemention of a ParserRunner.
+ */
+public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable {
+
+  class ProcessResult {
+
+    private JSONObject message;
+    private MetronError error;
+
+    public ProcessResult(JSONObject message) {
+      this.message = message;
+    }
+
+    public ProcessResult(MetronError error) {
+      this.error = error;
+    }
+
+    public JSONObject getMessage() {
+      return message;
+    }
+
+    public MetronError getError() {
+      return error;
+    }
+
+    public boolean isError() {
+      return error != null;
+    }
+  }
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected transient Consumer<ParserRunnerResults> onSuccess;
+  protected transient Consumer<MetronError> onError;
+
+  private HashSet<String> sensorTypes;
+  private Map<String, ParserComponent> sensorToParserComponentMap;
+
+  // Stellar variables
+  private transient Context stellarContext;
+
+  public ParserRunnerImpl(HashSet<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
+  }
+
+  public Map<String, ParserComponent> getSensorToParserComponentMap() {
+    return sensorToParserComponentMap;
+  }
+
+  public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) {
+    this.sensorToParserComponentMap = sensorToParserComponentMap;
+  }
+
+  public Context getStellarContext() {
+    return stellarContext;
+  }
+
+  @Override
+  public Set<String> getSensorTypes() {
+    return sensorTypes;
+  }
+
+  @Override
+  public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) {
+    if (parserConfigSupplier == null) {
+      throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner.");
+    }
+    if (stellarContext == null) {
+      throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner.");
+    }
+    this.stellarContext = stellarContext;
+    initializeParsers(parserConfigSupplier);
+  }
+
+  /**
+   * Parses messages with the appropriate MessageParser based on sensor type.  The resulting list of messages are then
+   * post-processed and added to the ParserRunnerResults message list.  Any errors that happen during post-processing are
+   * added to the ParserRunnerResults error list.  Any exceptions (including a master exception) thrown by the MessageParser
+   * are also added to the ParserRunnerResults error list.
+   *
+   * @param sensorType Sensor type of the message
+   * @param rawMessage Raw message including metadata
+   * @param parserConfigurations Parser configurations
+   * @return ParserRunnerResults containing a list of messages and a list of errors
+   */
+  @Override
+  public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+    DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
+    SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+    if (sensorParserConfig != null) {
+      MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser();
+      Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage());
+      if (optionalMessageParserResult.isPresent()) {
+        MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get();
+
+        // Process each message returned from the MessageParser
+        messageParserResult.getMessages().forEach(message -> {
+                  Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations);
+                  if (processResult.isPresent()) {
+                    if (processResult.get().isError()) {
+                      parserRunnerResults.addError(processResult.get().getError());
+                    } else {
+                      parserRunnerResults.addMessage(processResult.get().getMessage());
+                    }
+                  }
+                });
+
+        // If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors
+        messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                .withThrowable(throwable)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(rawMessage.getMessage())));
+
+        // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors
+        parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                .withThrowable(entry.getValue())
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(entry.getKey())).collect(Collectors.toList()));
+      }
+    } else {
+      throw new IllegalStateException(String.format("Could not execute parser.  Cannot find configuration for sensor %s.",
+              sensorType));
+    }
+    return parserRunnerResults;
+  }
+
+  /**
+   * Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner.  Objects are created
+   * using reflection and the MessageParser configure and init methods are called.
+   * @param parserConfigSupplier Parser configurations
+   */
+  private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) {
+    LOG.info("Initializing parsers...");
+    sensorToParserComponentMap = new HashMap<>();
+    for(String sensorType: sensorTypes) {
+      if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) {
+        throw new IllegalStateException(String.format("Could not initialize parsers.  Cannot find configuration for sensor %s.",
+                sensorType));
+      }
+
+      SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType);
+
+      LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ",
+              sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName());
+
+      // create message parser
+      MessageParser<JSONObject> parser = ReflectionUtils
+              .createInstance(parserConfig.getParserClassName());
+
+      // create message filter
+      MessageFilter<JSONObject> filter = null;
+      parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext);
+      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+        filter = Filters.get(
+                parserConfig.getFilterClassName(),
+                parserConfig.getParserConfig()
+        );
+      }
+
+      parser.configure(parserConfig.getParserConfig());
+      parser.init();
+      sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter));
+    }
+  }
+
+  /**
+   * Post-processes parsed messages by:
+   * <ul>
+   *   <li>Applying field transformations defined in the sensor parser config</li>
+   *   <li>Filtering messages using the configured MessageFilter class</li>
+   *   <li>Validating messages using the MessageParser validate method</li>
+   * </ul>
+   * If a message is successfully processed a message is returned in a ProcessResult.  If a message fails
+   * validation, a MetronError object is created and returned in a ProcessResult.  If a message is
+   * filtered out an empty Optional is returned.
+   *
+   * @param sensorType Sensor type of the message
+   * @param message Message parsed by the MessageParser
+   * @param rawMessage Raw message including metadata
+   * @param parser MessageParser for the sensor type
+   * @param parserConfigurations Parser configurations
+   */
+  @SuppressWarnings("unchecked")
+  protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage,
+                                                  MessageParser<JSONObject> parser,
+                                                  ParserConfigurations parserConfigurations
+                                                  ) {
+    Optional<ProcessResult> processResult = Optional.empty();
+    SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+    sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+            message,
+            rawMessage.getMetadata(),
+            sensorParserConfig.getMergeMetadata(),
+            sensorParserConfig.getRawMessageStrategyConfig()
+    );
+    message.put(Constants.SENSOR_TYPE, sensorType);
+    applyFieldTransformations(message, rawMessage, sensorParserConfig);
+    if (!message.containsKey(Constants.GUID)) {
+      message.put(Constants.GUID, UUID.randomUUID().toString());
+    }
+    MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter();
+    if (filter == null || filter.emit(message, stellarContext)) {
+      boolean isInvalid = !parser.validate(message);
+      List<FieldValidator> failedValidators = null;
+      if (!isInvalid) {
+        failedValidators = getFailedValidators(message, parserConfigurations);
+        isInvalid = !failedValidators.isEmpty();
+      }
+      if (isInvalid) {
+        MetronError error = new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(message);
+        Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
+                .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+                .collect(Collectors.toSet());
+        if (errorFields != null && !errorFields.isEmpty()) {
+          error.withErrorFields(errorFields);
+        }
+        processResult = Optional.of(new ProcessResult(error));
+      } else {
+        processResult = Optional.of(new ProcessResult(message));
+      }
+    }
+    return processResult;
+  }
+
+  /**
+   * Applies Stellar field transformations defined in the sensor parser config.
+   * @param message Message parsed by the MessageParser
+   * @param rawMessage Raw message including metadata
+   * @param sensorParserConfig Sensor parser config
+   */
+  private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) {
+    for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+      if (handler != null) {
+        if (!sensorParserConfig.getMergeMetadata()) {
+          //if we haven't merged metadata, then we need to pass them along as configuration params.
+          handler.transformAndUpdate(
+                  message,
+                  stellarContext,
+                  sensorParserConfig.getParserConfig(),
+                  rawMessage.getMetadata()
+          );
+        } else {
+          handler.transformAndUpdate(
+                  message,
+                  stellarContext,
+                  sensorParserConfig.getParserConfig()
+          );
+        }
+      }
+    }
+  }
+
+  private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) {
+    List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations();
+    List<FieldValidator> failedValidators = new ArrayList<>();
+    for(FieldValidator validator : fieldValidations) {
+      if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) {
+        failedValidators.add(validator);
+      }
+    }
+    return failedValidators;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
new file mode 100644
index 0000000..7ca853c
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+
+import java.util.List;
+
+/**
+ * Container for the results of parsing a message with a ParserRunner.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunnerResults<T> {
+
+  List<T> getMessages();
+
+  List<MetronError> getErrors();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 05334c2..a9ee305 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -18,13 +18,18 @@
 
 package org.apache.metron.parsers.bolt;
 
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
 
 import com.github.benmanes.caffeine.cache.Cache;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
-import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
@@ -33,12 +38,8 @@ import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerResults;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -56,45 +57,32 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputCollector collector;
-  private Map<String, ParserComponents> sensorToComponentMap;
+  private ParserRunner<JSONObject> parserRunner;
+  private Map<String, WriterHandler> sensorToWriterMap;
   private Map<String, String> topicToSensorMap = new HashMap<>();
 
-  private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
-  private transient Cache<CachingStellarProcessor.Key, Object> cache;
   private int requestedTickFreqSecs;
   private int defaultBatchTimeout;
   private int batchTimeoutDivisor = 1;
 
   public ParserBolt( String zookeeperUrl
-                   , Map<String, ParserComponents> sensorToComponentMap
+                   , ParserRunner parserRunner
+                     , Map<String, WriterHandler> sensorToWriterMap
   ) {
     super(zookeeperUrl);
-    this.sensorToComponentMap = sensorToComponentMap;
+    this.parserRunner = parserRunner;
+    this.sensorToWriterMap = sensorToWriterMap;
 
     // Ensure that all sensors are either bulk sensors or not bulk sensors.  Can't mix and match.
     Boolean handleAcks = null;
-    for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
-      boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+    for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+      boolean writerHandleAck = entry.getValue().handleAck();
       if (handleAcks == null) {
         handleAcks = writerHandleAck;
       } else if (!handleAcks.equals(writerHandleAck)) {
@@ -130,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
   /**
    * Used only for unit testing
-   * @param defaultBatchTimeout
    */
-  protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
-    this.defaultBatchTimeout = defaultBatchTimeout;
+  public int getBatchTimeoutDivisor() {
+    return batchTimeoutDivisor;
   }
 
   /**
    * Used only for unit testing
    */
-  public int getDefaultBatchTimeout() {
-    return defaultBatchTimeout;
+  protected void setSensorToWriterMap(Map<String, WriterHandler> sensorToWriterMap) {
+    this.sensorToWriterMap = sensorToWriterMap;
   }
 
-  public Map<String, ParserComponents> getSensorToComponentMap() {
-    return sensorToComponentMap;
+  /**
+   * Used only for unit testing
+   */
+  protected Map<String, String> getTopicToSensorMap() {
+    return topicToSensorMap;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) {
+    this.topicToSensorMap = topicToSensorMap;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) {
+    this.messageGetStrategy = messageGetStrategy;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  public void setOutputCollector(OutputCollector collector) {
+    this.collector = collector;
   }
 
   /**
@@ -159,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     // to get the valid WriterConfiguration.  But don't store any non-serializable objects,
     // else Storm will throw a runtime error.
     Function<WriterConfiguration, WriterConfiguration> configurationXform;
-    WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+    WriterHandler writer = sensorToWriterMap.entrySet().iterator().next().getValue();
     if (writer.isWriterToBulkWriter()) {
       configurationXform = WriterToBulkWriter.TRANSFORMATION;
     } else {
@@ -189,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
-
-    // Build the Stellar cache
-    Map<String, Object> cacheConfig = new HashMap<>();
-    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
-      String sensor = entry.getKey();
-      SensorParserConfig config = getSensorParserConfig(sensor);
-
-      if (config != null) {
-        cacheConfig.putAll(config.getCacheConfig());
-      }
-    }
-    cache = CachingStellarProcessor.createCache(cacheConfig);
+    this.parserRunner.init(this::getConfigurations, initializeStellar());
 
     // Need to prep all sensors
-    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+    for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) {
       String sensor = entry.getKey();
-      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
-
-      initializeStellar();
-      if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) {
-        getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext);
-        if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
-          MessageFilter<JSONObject> filter = Filters.get(
-              getSensorParserConfig(sensor).getFilterClassName(),
-              getSensorParserConfig(sensor).getParserConfig()
-          );
-          getSensorToComponentMap().get(sensor).setFilter(filter);
-        }
-      }
-
-      parser.init();
 
       SensorParserConfig config = getSensorParserConfig(sensor);
       if (config != null) {
@@ -229,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
         throw new IllegalStateException(
             "Unable to retrieve a parser config for " + sensor);
       }
-      parser.configure(config.getParserConfig());
 
-      WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+      WriterHandler writer = sensorToWriterMap.get(sensor);
       writer.init(stormConf, context, collector, getConfigurations());
       if (defaultBatchTimeout == 0) {
         //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
@@ -246,225 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     }
   }
 
-  protected void initializeStellar() {
-    Context.Builder builder = new Context.Builder()
-                                .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-                                .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                ;
-    if(cache != null) {
-      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
-    }
-    this.stellarContext = builder.build();
-    StellarFunctions.initialize(stellarContext);
-  }
-
 
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
     if (TupleUtils.isTick(tuple)) {
-      try {
-        for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
-          entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "This should have been caught in the writerHandler.  If you see this, file a JIRA", e);
-      } finally {
-        collector.ack(tuple);
-      }
+      handleTickTuple(tuple);
       return;
     }
-
     byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+    String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+    String sensorType = topicToSensorMap.get(topic);
     try {
-      SensorParserConfig sensorParserConfig;
-      MessageParser<JSONObject> parser;
-      String sensor;
-      Map<String, Object> metadata;
-      if (sensorToComponentMap.size() == 1) {
-        // There's only one parser, so grab info directly
-        Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator()
-            .next();
-        sensor = sensorParser.getKey();
-        parser = sensorParser.getValue().getMessageParser();
-        sensorParserConfig = getSensorParserConfig(sensor);
-      } else {
-        // There's multiple parsers, so pull the topic from the Tuple and look up the sensor
-        String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
-        sensor = topicToSensorMap.get(topic);
-        parser = sensorToComponentMap.get(sensor).getMessageParser();
-        sensorParserConfig = getSensorParserConfig(sensor);
-      }
+      ParserConfigurations parserConfigurations = getConfigurations();
+      SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+      RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
+              , tuple
+              , originalMessage
+              , sensorParserConfig.getReadMetadata()
+              , sensorParserConfig.getRawMessageStrategyConfig()
+      );
+      ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations);
+      long numWritten = parserRunnerResults.getMessages().stream()
+              .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector))
+              .filter(result -> result)
+              .count();
+      parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error));
 
-      List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
-      boolean ackTuple = false;
-      int numWritten = 0;
-      if (sensorParserConfig != null) {
-        RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
-            , tuple
-            , originalMessage
-            , sensorParserConfig.getReadMetadata()
-            , sensorParserConfig.getRawMessageStrategyConfig()
-        );
-
-        metadata = rawMessage.getMetadata();
-
-        MultilineMessageParser mmp = null;
-        if (!(parser instanceof MultilineMessageParser)) {
-          mmp = new MultilineMessageParser<JSONObject>() {
-
-            @Override
-            public void configure(Map<String, Object> config) {
-              parser.configure(config);
-            }
-
-            @Override
-            public void init() {
-              parser.init();
-            }
-
-            @Override
-            public boolean validate(JSONObject message) {
-              return parser.validate(message);
-            }
-
-            @Override
-            public List<JSONObject> parse(byte[] message) {
-              return parser.parse(message);
-            }
-
-            @Override
-            public Optional<List<JSONObject>> parseOptional(byte[] message) {
-              return parser.parseOptional(message);
-            }
-          };
-        } else {
-          mmp = (MultilineMessageParser) parser;
-        }
-
-        Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage());
-
-        // check if there is a master error
-        if (results.isPresent() && results.get().getMasterThrowable().isPresent()) {
-          handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector);
-          return;
-        }
-
-        // Handle the message results
-        List<JSONObject> messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST;
-        for (JSONObject message : messages) {
-          //we want to ack the tuple in the situation where we have are not doing a bulk write
-          //otherwise we want to defer to the writerComponent who will ack on bulk commit.
-          WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
-          ackTuple = !writer.handleAck();
-
-          sensorParserConfig.getRawMessageStrategy().mergeMetadata(
-              message,
-              metadata,
-              sensorParserConfig.getMergeMetadata(),
-              sensorParserConfig.getRawMessageStrategyConfig()
-          );
-          message.put(Constants.SENSOR_TYPE, sensor);
-
-          for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
-            if (handler != null) {
-              if (!sensorParserConfig.getMergeMetadata()) {
-                //if we haven't merged metadata, then we need to pass them along as configuration params.
-                handler.transformAndUpdate(
-                    message,
-                    stellarContext,
-                    sensorParserConfig.getParserConfig(),
-                    metadata
-                );
-              } else {
-                handler.transformAndUpdate(
-                    message,
-                    stellarContext,
-                    sensorParserConfig.getParserConfig()
-                );
-              }
-            }
-          }
-          if (!message.containsKey(Constants.GUID)) {
-            message.put(Constants.GUID, UUID.randomUUID().toString());
-          }
-
-          MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter();
-          if (filter == null || filter.emitTuple(message, stellarContext)) {
-            boolean isInvalid = !parser.validate(message);
-            List<FieldValidator> failedValidators = null;
-            if (!isInvalid) {
-              failedValidators = getFailedValidators(message, fieldValidations);
-              isInvalid = !failedValidators.isEmpty();
-            }
-            if (isInvalid) {
-              MetronError error = new MetronError()
-                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
-                  .withSensorType(Collections.singleton(sensor))
-                  .addRawMessage(message);
-              Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
-                  .flatMap(fieldValidator -> fieldValidator.getInput().stream())
-                  .collect(Collectors.toSet());
-              if (errorFields != null && !errorFields.isEmpty()) {
-                error.withErrorFields(errorFields);
-              }
-              ErrorUtils.handleError(collector, error);
-            } else {
-              numWritten++;
-              writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy);
-            }
-          }
-        }
-
-        // Handle the error results
-        Map<Object, Throwable> messageErrors = results.isPresent()
-                ? results.get().getMessageThrowables() : Collections.EMPTY_MAP;
-
-        for (Entry<Object,Throwable> entry : messageErrors.entrySet()) {
-          MetronError error = new MetronError()
-                  .withErrorType(Constants.ErrorType.PARSER_ERROR)
-                  .withThrowable(entry.getValue())
-                  .withSensorType(sensorToComponentMap.keySet())
-                  .addRawMessage(originalMessage);
-          ErrorUtils.handleError(collector, error);
-        }
-      }
       //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
       //(meaning that none of the messages are valid either globally or locally)
       //then we want to handle the ack ourselves.
-      if (ackTuple || numWritten == 0) {
+      if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) {
         collector.ack(tuple);
       }
 
     } catch (Throwable ex) {
-      handleError(originalMessage, tuple, ex, collector);
+      handleError(sensorType, originalMessage, tuple, ex, collector);
+      collector.ack(tuple);
     }
   }
 
-  protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+  protected Context initializeStellar() {
+    Map<String, Object> cacheConfig = new HashMap<>();
+    for (String sensorType: this.parserRunner.getSensorTypes()) {
+      SensorParserConfig config = getSensorParserConfig(sensorType);
+
+      if (config != null) {
+        cacheConfig.putAll(config.getCacheConfig());
+      }
+    }
+    Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(cacheConfig);
+
+    Context.Builder builder = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+            .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
+            ;
+    if(cache != null) {
+      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+    }
+    Context stellarContext = builder.build();
+    StellarFunctions.initialize(stellarContext);
+    return stellarContext;
+  }
+
+  protected void handleTickTuple(Tuple tuple) {
+    try {
+      for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+        entry.getValue().flush(getConfigurations(), messageGetStrategy);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+              "This should have been caught in the writerHandler.  If you see this, file a JIRA", e);
+    } finally {
+      collector.ack(tuple);
+    }
+  }
+
+  protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) {
+    WriterHandler writer = sensorToWriterMap.get(sensorType);
+    try {
+      writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy);
+      return true;
+    } catch (Exception ex) {
+      handleError(sensorType, originalMessage, tuple, ex, collector);
+      return false;
+    }
+  }
+
+  protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(ex)
-            .withSensorType(sensorToComponentMap.keySet())
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(originalMessage);
     ErrorUtils.handleError(collector, error);
-    collector.ack(tuple);
-  }
-
-  private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) {
-    List<FieldValidator> failedValidators = new ArrayList<>();
-    for(FieldValidator validator : validators) {
-      if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) {
-        failedValidators.add(validator);
-      }
-    }
-    return failedValidators;
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
     declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index 9cdafa3..1fa1feb 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -67,7 +67,7 @@ public class BroMessageFilter implements MessageFilter<JSONObject>{
    */
 
   @Override
-  public boolean emitTuple(JSONObject message, Context context) {
+  public boolean emit(JSONObject message, Context context) {
     String protocol = (String) message.get(_key);
     return _known_protocols.contains(protocol);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
index 15a035a..8300ff4 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
@@ -54,7 +54,7 @@ public class StellarFilter implements MessageFilter<JSONObject> {
   }
 
   @Override
-  public boolean emitTuple(JSONObject message, Context context) {
+  public boolean emit(JSONObject message, Context context) {
     VariableResolver resolver = new MapVariableResolver(message);
     return processor.parse(query, resolver, functionResolver, context);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index b7b91c0..207c070 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context;
 
 public interface MessageFilter<T> extends Configurable{
 
-	boolean emitTuple(T message, Context context);
+	boolean emit(T message, Context context);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 665076b..c9f8351 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.interfaces;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.metron.parsers.DefaultMessageParserResult;
 
 import java.io.Serializable;
@@ -38,18 +39,42 @@ public interface MessageParser<T> extends Configurable {
    * @param rawMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
-  List<T> parse(byte[] rawMessage);
+  @Deprecated
+  default List<T> parse(byte[] rawMessage) {
+    throw new NotImplementedException("parse is not implemented");
+  }
 
   /**
    * Take raw data and convert it to an optional list of messages.
    * @param parseMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
+  @Deprecated
   default Optional<List<T>> parseOptional(byte[] parseMessage) {
     return Optional.ofNullable(parse(parseMessage));
   }
 
   /**
+   * Take raw data and convert it to messages.  Each raw message may produce multiple messages and therefore
+   * multiple errors.  A {@link MessageParserResult} is returned, which will have both the messages produced
+   * and the errors.
+   * @param parseMessage the raw bytes of the message
+   * @return Optional of {@link MessageParserResult}
+   */
+  default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
+    Optional<MessageParserResult<T>> result = Optional.empty();
+    try {
+      Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+      if (optionalMessages.isPresent()) {
+        result = Optional.of(new DefaultMessageParserResult<>(optionalMessages.get()));
+      }
+    } catch (Throwable t) {
+      return Optional.of(new DefaultMessageParserResult<>(t));
+    }
+    return result;
+  }
+
+  /**
    * Validate the message to ensure that it's correct.
    * @param message the message to validate
    * @return true if the message is valid, false if not

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
deleted file mode 100644
index 7818f9a..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.interfaces;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.metron.parsers.DefaultMessageParserResult;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public interface MultilineMessageParser<T> extends MessageParser<T> {
-
-  default List<T> parse(byte[] rawMessage) {
-    throw new NotImplementedException("parse is not implemented");
-  }
-
-  /**
-   * Take raw data and convert it to messages.  Each raw message may produce multiple messages and therefore
-   * multiple errors.  A {@link MessageParserResult} is returned, which will have both the messages produced
-   * and the errors.
-   * @param parseMessage the raw bytes of the message
-   * @return Optional of {@link MessageParserResult}
-   */
-  default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
-    List<T> list = new ArrayList<>();
-    try {
-      Optional<List<T>> optionalMessages = parseOptional(parseMessage);
-      optionalMessages.ifPresent(list::addAll);
-    } catch (Throwable t) {
-      return Optional.of(new DefaultMessageParserResult<>(t));
-    }
-    return Optional.of(new DefaultMessageParserResult<T>(list));
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
index 79a082a..5b62e85 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
@@ -26,8 +26,8 @@ import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.parsers.BasicParser;
 import org.apache.metron.parsers.DefaultMessageParserResult;
+import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import java.util.Optional;
 /**
  * Parser for well structured RFC 5424 messages.
  */
-public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable {
+public class Syslog5424Parser implements MessageParser<JSONObject>, Serializable {
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String NIL_POLICY_CONFIG = "nilPolicy";
   private transient SyslogParser syslogParser;

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
new file mode 100644
index 0000000..eb5ff9f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+public class ParserComponent implements Serializable {
+  private static final long serialVersionUID = 7880346740026374665L;
+
+  private MessageParser<JSONObject> messageParser;
+  private MessageFilter<JSONObject> filter;
+
+  public ParserComponent(
+      MessageParser<JSONObject> messageParser,
+      MessageFilter<JSONObject> filter) {
+    this.messageParser = messageParser;
+    this.filter = filter;
+  }
+
+  public MessageParser<JSONObject> getMessageParser() {
+    return messageParser;
+  }
+
+  public MessageFilter<JSONObject> getFilter() {
+    return filter;
+  }
+
+  public void setMessageParser(
+      MessageParser<JSONObject> messageParser) {
+    this.messageParser = messageParser;
+  }
+
+  public void setFilter(
+      MessageFilter<JSONObject> filter) {
+    this.filter = filter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
deleted file mode 100644
index 32d56b9..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.topology;
-
-import java.io.Serializable;
-import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-
-public class ParserComponents implements Serializable {
-  private static final long serialVersionUID = 7880346740026374665L;
-
-  private MessageParser<JSONObject> messageParser;
-  private MessageFilter<JSONObject> filter;
-  private WriterHandler writer;
-
-  public ParserComponents(
-      MessageParser<JSONObject> messageParser,
-      MessageFilter<JSONObject> filter,
-      WriterHandler writer) {
-    this.messageParser = messageParser;
-    this.filter = filter;
-    this.writer = writer;
-  }
-
-  public MessageParser<JSONObject> getMessageParser() {
-    return messageParser;
-  }
-
-  public MessageFilter<JSONObject> getFilter() {
-    return filter;
-  }
-
-  public WriterHandler getWriter() {
-    return writer;
-  }
-
-  public void setMessageParser(
-      MessageParser<JSONObject> messageParser) {
-    this.messageParser = messageParser;
-  }
-
-  public void setFilter(
-      MessageFilter<JSONObject> filter) {
-    this.filter = filter;
-  }
-
-  public void setWriter(WriterHandler writer) {
-    this.writer = writer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d20e1a5..9dc7b88 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,11 +21,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.common.Constants;
@@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
@@ -268,29 +266,14 @@ public class ParserTopologyBuilder {
                                               Optional<String> securityProtocol,
                                               ParserConfigurations configs,
                                               Optional<String> outputTopic) {
-
-    Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+    Map<String, WriterHandler> writerConfigs = new HashMap<>();
     for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
       String sensorType = entry.getKey();
       SensorParserConfig parserConfig = entry.getValue();
-      // create message parser
-      MessageParser<JSONObject> parser = ReflectionUtils
-          .createInstance(parserConfig.getParserClassName());
-      parser.configure(parserConfig.getParserConfig());
-
-      // create message filter
-      MessageFilter<JSONObject> filter = null;
-      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
-        filter = Filters.get(
-            parserConfig.getFilterClassName(),
-            parserConfig.getParserConfig()
-        );
-      }
 
       // create a writer
       AbstractWriter writer;
       if (parserConfig.getWriterClassName() == null) {
-
         // if not configured, use a sensible default
         writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
             .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
@@ -304,16 +287,10 @@ public class ParserTopologyBuilder {
 
       // create a writer handler
       WriterHandler writerHandler = createWriterHandler(writer);
-
-      ParserComponents components = new ParserComponents(
-         parser,
-         filter,
-         writerHandler
-      );
-      parserBoltConfigs.put(sensorType, components);
+      writerConfigs.put(sensorType, writerHandler);
     }
 
-    return new ParserBolt(zookeeperUrl, parserBoltConfigs);
+    return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
index 8441409..2f3784a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -42,8 +42,8 @@ public class FiltersTest {
         put("filter.query", "exists(foo)");
       }};
       MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config);
-      Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
-      Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
+      Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
+      Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
index 9769baa..4842a1f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
@@ -19,68 +19,118 @@
 package org.apache.metron.parsers;
 
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 public class MessageParserTest {
-  @Test
-  public void testNullable() throws Exception {
-    MessageParser parser = new MessageParser() {
-      @Override
-      public void init() {
 
-      }
+  abstract class TestMessageParser implements MessageParser<JSONObject> {
+    @Override
+    public void init() {
+    }
 
-      @Override
-      public List parse(byte[] rawMessage) {
-        return null;
-      }
+    @Override
+    public boolean validate(JSONObject message) {
+      return false;
+    }
 
-      @Override
-      public boolean validate(Object message) {
-        return false;
-      }
+    @Override
+    public void configure(Map<String, Object> config) {
 
-      @Override
-      public void configure(Map<String, Object> config) {
+    }
+  }
 
+  @Test
+  public void testNullable() throws Exception {
+    MessageParser parser = new TestMessageParser() {
+      @Override
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return null;
       }
     };
-    Assert.assertNotNull(parser.parseOptional(null));
-    Assert.assertFalse(parser.parseOptional(null).isPresent());
+    Assert.assertNotNull(parser.parseOptionalResult(null));
+    Assert.assertFalse(parser.parseOptionalResult(null).isPresent());
   }
 
   @Test
   public void testNotNullable() throws Exception {
-    MessageParser parser = new MessageParser() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public void init() {
-
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return new ArrayList<>();
       }
+    };
+    Assert.assertNotNull(parser.parseOptionalResult(null));
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult(null);
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(0, ret.get().getMessages().size());
+  }
 
+  @Test
+  public void testParse() {
+    JSONObject message = new JSONObject();
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public List parse(byte[] rawMessage) {
-        return new ArrayList<>();
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return Collections.singletonList(message);
       }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(1, ret.get().getMessages().size());
+    Assert.assertEquals(message, ret.get().getMessages().get(0));
+  }
 
+  @Test
+  public void testParseOptional() {
+    JSONObject message = new JSONObject();
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public boolean validate(Object message) {
-        return false;
+      public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+        return Optional.of(Collections.singletonList(message));
       }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(1, ret.get().getMessages().size());
+    Assert.assertEquals(message, ret.get().getMessages().get(0));
+  }
 
+  @Test
+  public void testParseException() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public void configure(Map<String, Object> config) {
+      public List<JSONObject> parse(byte[] rawMessage) {
+        throw new RuntimeException("parse exception");
+      }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+    Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
+  }
 
+  @Test
+  public void testParseOptionalException() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
+      @Override
+      public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+        throw new RuntimeException("parse exception");
       }
     };
-    Assert.assertNotNull(parser.parseOptional(null));
-    Optional<List> ret = parser.parseOptional(null);
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
     Assert.assertTrue(ret.isPresent());
-    Assert.assertEquals(0, ret.get().size());
+    Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+    Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
   }
+
 }