You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:33 UTC

[44/51] [abbrv] metron git commit: METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213

METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/28542ad6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/28542ad6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/28542ad6

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 28542ad64cf63f17b728b4b1c0e995a8973767f7
Parents: 08f3de0
Author: merrimanr <me...@gmail.com>
Authored: Thu Oct 18 13:59:52 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Thu Oct 18 13:59:52 2018 -0500

----------------------------------------------------------------------
 .../impl/SensorParserConfigServiceImpl.java     |  51 +-
 .../parsers/DefaultParserRunnerResults.java     |  71 ++
 .../org/apache/metron/parsers/GrokParser.java   |   3 +-
 .../org/apache/metron/parsers/ParserRunner.java |  60 ++
 .../apache/metron/parsers/ParserRunnerImpl.java | 322 +++++++
 .../metron/parsers/ParserRunnerResults.java     |  33 +
 .../apache/metron/parsers/bolt/ParserBolt.java  | 381 +++-----
 .../parsers/filters/BroMessageFilter.java       |   2 +-
 .../metron/parsers/filters/StellarFilter.java   |   2 +-
 .../parsers/interfaces/MessageFilter.java       |   2 +-
 .../parsers/interfaces/MessageParser.java       |  27 +-
 .../interfaces/MultilineMessageParser.java      |  51 --
 .../metron/parsers/syslog/Syslog5424Parser.java |   4 +-
 .../parsers/topology/ParserComponent.java       |  56 ++
 .../parsers/topology/ParserComponents.java      |  67 --
 .../parsers/topology/ParserTopologyBuilder.java |  39 +-
 .../org/apache/metron/filters/FiltersTest.java  |   4 +-
 .../metron/parsers/MessageParserTest.java       | 108 ++-
 .../metron/parsers/ParserRunnerImplTest.java    | 390 +++++++++
 .../metron/parsers/bolt/ParserBoltTest.java     | 859 ++++++-------------
 .../parsers/integration/ParserDriver.java       |  60 +-
 21 files changed, 1481 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index 4cd272e..d0e4b3d 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -20,12 +20,10 @@ package org.apache.metron.rest.service.impl;
 import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.configuration.ConfigurationType;
@@ -35,18 +33,14 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
 import org.apache.metron.rest.service.SensorParserConfigService;
 import org.apache.metron.rest.util.ParserIndex;
-import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.json.simple.JSONObject;
-import org.reflections.Reflections;
-import org.reflections.util.ConfigurationBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -141,53 +135,13 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
     } else if (sensorParserConfig.getParserClassName() == null) {
       throw new RestException("SensorParserConfig must have a parserClassName");
     } else {
-      MultilineMessageParser<JSONObject> parser;
-      Object parserObject;
+      MessageParser<JSONObject> parser;
       try {
-        parserObject = Class.forName(sensorParserConfig.getParserClassName())
+        parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
             .newInstance();
       } catch (Exception e) {
         throw new RestException(e.toString(), e.getCause());
       }
-
-      if (!(parserObject instanceof MultilineMessageParser)) {
-        parser = new MultilineMessageParser<JSONObject>() {
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public void configure(Map<String, Object> config)  {
-            ((MessageParser<JSONObject>)parserObject).configure(config);
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public void init() {
-            ((MessageParser<JSONObject>)parserObject).init();
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public boolean validate(JSONObject message) {
-            return ((MessageParser<JSONObject>)parserObject).validate(message);
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public List<JSONObject> parse(byte[] message) {
-            return ((MessageParser<JSONObject>)parserObject).parse(message);
-          }
-
-          @Override
-          @SuppressWarnings("unchecked")
-          public Optional<List<JSONObject>> parseOptional(byte[] message) {
-            return ((MessageParser<JSONObject>)parserObject).parseOptional(message);
-          }
-        };
-      } else {
-        parser = (MultilineMessageParser<JSONObject>)parserObject;
-      }
-
-
       Path temporaryGrokPath = null;
       if (isGrokConfig(sensorParserConfig)) {
         String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
@@ -195,7 +149,6 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
         sensorParserConfig.getParserConfig()
             .put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
       }
-
       parser.configure(sensorParserConfig.getParserConfig());
       parser.init();
 

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
new file mode 100644
index 0000000..79a9b5d
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Default implementation of ParserRunnerResults.
+ */
+public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> {
+
+  private List<JSONObject> messages = new ArrayList<>();
+  private List<MetronError> errors = new ArrayList<>();
+
+  public List<JSONObject> getMessages() {
+    return messages;
+  }
+
+  public List<MetronError> getErrors() {
+    return errors;
+  }
+
+  public void addMessage(JSONObject message) {
+    this.messages.add(message);
+  }
+
+  public void addError(MetronError error) {
+    this.errors.add(error);
+  }
+
+  public void addErrors(List<MetronError> errors) {
+    this.errors.addAll(errors);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ParserRunnerResults parserResult = (ParserRunnerResults) o;
+    return Objects.equals(messages, parserResult.getMessages()) &&
+            Objects.equals(errors, parserResult.getErrors());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = messages != null ? messages.hashCode() : 0;
+    result = 31 * result + (errors != null ? errors.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index a81149d..6bdfb81 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ import java.util.Optional;
 import java.util.TimeZone;
 
 
-public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable {
+public class GrokParser implements MessageParser<JSONObject>, Serializable {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
new file mode 100644
index 0000000..f9123b1
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.stellar.dsl.Context;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser.
+ * The information needed to initialize a MessageParser is supplied by the parser config supplier.  After the parsers
+ * are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object
+ * that contains a list of parsed messages and/or a list of errors.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunner<T> {
+
+  /**
+   * Return a list of all sensor types that can be parsed with this ParserRunner.
+   * @return Sensor types
+   */
+  Set<String> getSensorTypes();
+
+  /**
+   *
+   * @param parserConfigSupplier Supplies parser configurations
+   * @param stellarContext Stellar context used to apply Stellar functions during field transformations
+   */
+  void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext);
+
+  /**
+   * Parses a message and either returns the message or an error.
+   * @param sensorType Sensor type of the message
+   * @param rawMessage Raw message including metadata
+   * @param parserConfigurations Parser configurations
+   * @return ParserRunnerResults containing a list of messages and a list of errors
+   */
+  ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
new file mode 100644
index 0000000..a986db7
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.apache.metron.parsers.topology.ParserComponent;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * The default implemention of a ParserRunner.
+ */
+public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable {
+
+  class ProcessResult {
+
+    private JSONObject message;
+    private MetronError error;
+
+    public ProcessResult(JSONObject message) {
+      this.message = message;
+    }
+
+    public ProcessResult(MetronError error) {
+      this.error = error;
+    }
+
+    public JSONObject getMessage() {
+      return message;
+    }
+
+    public MetronError getError() {
+      return error;
+    }
+
+    public boolean isError() {
+      return error != null;
+    }
+  }
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected transient Consumer<ParserRunnerResults> onSuccess;
+  protected transient Consumer<MetronError> onError;
+
+  private HashSet<String> sensorTypes;
+  private Map<String, ParserComponent> sensorToParserComponentMap;
+
+  // Stellar variables
+  private transient Context stellarContext;
+
+  public ParserRunnerImpl(HashSet<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
+  }
+
+  public Map<String, ParserComponent> getSensorToParserComponentMap() {
+    return sensorToParserComponentMap;
+  }
+
+  public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) {
+    this.sensorToParserComponentMap = sensorToParserComponentMap;
+  }
+
+  public Context getStellarContext() {
+    return stellarContext;
+  }
+
+  @Override
+  public Set<String> getSensorTypes() {
+    return sensorTypes;
+  }
+
+  @Override
+  public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) {
+    if (parserConfigSupplier == null) {
+      throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner.");
+    }
+    if (stellarContext == null) {
+      throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner.");
+    }
+    this.stellarContext = stellarContext;
+    initializeParsers(parserConfigSupplier);
+  }
+
+  /**
+   * Parses messages with the appropriate MessageParser based on sensor type.  The resulting list of messages are then
+   * post-processed and added to the ParserRunnerResults message list.  Any errors that happen during post-processing are
+   * added to the ParserRunnerResults error list.  Any exceptions (including a master exception) thrown by the MessageParser
+   * are also added to the ParserRunnerResults error list.
+   *
+   * @param sensorType Sensor type of the message
+   * @param rawMessage Raw message including metadata
+   * @param parserConfigurations Parser configurations
+   * @return ParserRunnerResults containing a list of messages and a list of errors
+   */
+  @Override
+  public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
+    DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
+    SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+    if (sensorParserConfig != null) {
+      MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser();
+      Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage());
+      if (optionalMessageParserResult.isPresent()) {
+        MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get();
+
+        // Process each message returned from the MessageParser
+        messageParserResult.getMessages().forEach(message -> {
+                  Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations);
+                  if (processResult.isPresent()) {
+                    if (processResult.get().isError()) {
+                      parserRunnerResults.addError(processResult.get().getError());
+                    } else {
+                      parserRunnerResults.addMessage(processResult.get().getMessage());
+                    }
+                  }
+                });
+
+        // If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors
+        messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                .withThrowable(throwable)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(rawMessage.getMessage())));
+
+        // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors
+        parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                .withThrowable(entry.getValue())
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(entry.getKey())).collect(Collectors.toList()));
+      }
+    } else {
+      throw new IllegalStateException(String.format("Could not execute parser.  Cannot find configuration for sensor %s.",
+              sensorType));
+    }
+    return parserRunnerResults;
+  }
+
+  /**
+   * Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner.  Objects are created
+   * using reflection and the MessageParser configure and init methods are called.
+   * @param parserConfigSupplier Parser configurations
+   */
+  private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) {
+    LOG.info("Initializing parsers...");
+    sensorToParserComponentMap = new HashMap<>();
+    for(String sensorType: sensorTypes) {
+      if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) {
+        throw new IllegalStateException(String.format("Could not initialize parsers.  Cannot find configuration for sensor %s.",
+                sensorType));
+      }
+
+      SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType);
+
+      LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ",
+              sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName());
+
+      // create message parser
+      MessageParser<JSONObject> parser = ReflectionUtils
+              .createInstance(parserConfig.getParserClassName());
+
+      // create message filter
+      MessageFilter<JSONObject> filter = null;
+      parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext);
+      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+        filter = Filters.get(
+                parserConfig.getFilterClassName(),
+                parserConfig.getParserConfig()
+        );
+      }
+
+      parser.configure(parserConfig.getParserConfig());
+      parser.init();
+      sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter));
+    }
+  }
+
+  /**
+   * Post-processes parsed messages by:
+   * <ul>
+   *   <li>Applying field transformations defined in the sensor parser config</li>
+   *   <li>Filtering messages using the configured MessageFilter class</li>
+   *   <li>Validating messages using the MessageParser validate method</li>
+   * </ul>
+   * If a message is successfully processed a message is returned in a ProcessResult.  If a message fails
+   * validation, a MetronError object is created and returned in a ProcessResult.  If a message is
+   * filtered out an empty Optional is returned.
+   *
+   * @param sensorType Sensor type of the message
+   * @param message Message parsed by the MessageParser
+   * @param rawMessage Raw message including metadata
+   * @param parser MessageParser for the sensor type
+   * @param parserConfigurations Parser configurations
+   */
+  @SuppressWarnings("unchecked")
+  protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage,
+                                                  MessageParser<JSONObject> parser,
+                                                  ParserConfigurations parserConfigurations
+                                                  ) {
+    Optional<ProcessResult> processResult = Optional.empty();
+    SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+    sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+            message,
+            rawMessage.getMetadata(),
+            sensorParserConfig.getMergeMetadata(),
+            sensorParserConfig.getRawMessageStrategyConfig()
+    );
+    message.put(Constants.SENSOR_TYPE, sensorType);
+    applyFieldTransformations(message, rawMessage, sensorParserConfig);
+    if (!message.containsKey(Constants.GUID)) {
+      message.put(Constants.GUID, UUID.randomUUID().toString());
+    }
+    MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter();
+    if (filter == null || filter.emit(message, stellarContext)) {
+      boolean isInvalid = !parser.validate(message);
+      List<FieldValidator> failedValidators = null;
+      if (!isInvalid) {
+        failedValidators = getFailedValidators(message, parserConfigurations);
+        isInvalid = !failedValidators.isEmpty();
+      }
+      if (isInvalid) {
+        MetronError error = new MetronError()
+                .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                .withSensorType(Collections.singleton(sensorType))
+                .addRawMessage(message);
+        Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
+                .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+                .collect(Collectors.toSet());
+        if (errorFields != null && !errorFields.isEmpty()) {
+          error.withErrorFields(errorFields);
+        }
+        processResult = Optional.of(new ProcessResult(error));
+      } else {
+        processResult = Optional.of(new ProcessResult(message));
+      }
+    }
+    return processResult;
+  }
+
+  /**
+   * Applies Stellar field transformations defined in the sensor parser config.
+   * @param message Message parsed by the MessageParser
+   * @param rawMessage Raw message including metadata
+   * @param sensorParserConfig Sensor parser config
+   */
+  private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) {
+    for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+      if (handler != null) {
+        if (!sensorParserConfig.getMergeMetadata()) {
+          //if we haven't merged metadata, then we need to pass them along as configuration params.
+          handler.transformAndUpdate(
+                  message,
+                  stellarContext,
+                  sensorParserConfig.getParserConfig(),
+                  rawMessage.getMetadata()
+          );
+        } else {
+          handler.transformAndUpdate(
+                  message,
+                  stellarContext,
+                  sensorParserConfig.getParserConfig()
+          );
+        }
+      }
+    }
+  }
+
+  private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) {
+    List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations();
+    List<FieldValidator> failedValidators = new ArrayList<>();
+    for(FieldValidator validator : fieldValidations) {
+      if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) {
+        failedValidators.add(validator);
+      }
+    }
+    return failedValidators;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
new file mode 100644
index 0000000..7ca853c
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.metron.common.error.MetronError;
+
+import java.util.List;
+
+/**
+ * Container for the results of parsing a message with a ParserRunner.
+ * @param <T> The type of a successfully parsed message.
+ */
+public interface ParserRunnerResults<T> {
+
+  List<T> getMessages();
+
+  List<MetronError> getErrors();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 05334c2..a9ee305 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -18,13 +18,18 @@
 
 package org.apache.metron.parsers.bolt;
 
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
 
 import com.github.benmanes.caffeine.cache.Cache;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
-import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
@@ -33,12 +38,8 @@ import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
-import org.apache.metron.parsers.topology.ParserComponents;
+import org.apache.metron.parsers.ParserRunner;
+import org.apache.metron.parsers.ParserRunnerResults;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -56,45 +57,32 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputCollector collector;
-  private Map<String, ParserComponents> sensorToComponentMap;
+  private ParserRunner<JSONObject> parserRunner;
+  private Map<String, WriterHandler> sensorToWriterMap;
   private Map<String, String> topicToSensorMap = new HashMap<>();
 
-  private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
-  private transient Cache<CachingStellarProcessor.Key, Object> cache;
   private int requestedTickFreqSecs;
   private int defaultBatchTimeout;
   private int batchTimeoutDivisor = 1;
 
   public ParserBolt( String zookeeperUrl
-                   , Map<String, ParserComponents> sensorToComponentMap
+                   , ParserRunner parserRunner
+                     , Map<String, WriterHandler> sensorToWriterMap
   ) {
     super(zookeeperUrl);
-    this.sensorToComponentMap = sensorToComponentMap;
+    this.parserRunner = parserRunner;
+    this.sensorToWriterMap = sensorToWriterMap;
 
     // Ensure that all sensors are either bulk sensors or not bulk sensors.  Can't mix and match.
     Boolean handleAcks = null;
-    for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
-      boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+    for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+      boolean writerHandleAck = entry.getValue().handleAck();
       if (handleAcks == null) {
         handleAcks = writerHandleAck;
       } else if (!handleAcks.equals(writerHandleAck)) {
@@ -130,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
   /**
    * Used only for unit testing
-   * @param defaultBatchTimeout
    */
-  protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
-    this.defaultBatchTimeout = defaultBatchTimeout;
+  public int getBatchTimeoutDivisor() {
+    return batchTimeoutDivisor;
   }
 
   /**
    * Used only for unit testing
    */
-  public int getDefaultBatchTimeout() {
-    return defaultBatchTimeout;
+  protected void setSensorToWriterMap(Map<String, WriterHandler> sensorToWriterMap) {
+    this.sensorToWriterMap = sensorToWriterMap;
   }
 
-  public Map<String, ParserComponents> getSensorToComponentMap() {
-    return sensorToComponentMap;
+  /**
+   * Used only for unit testing
+   */
+  protected Map<String, String> getTopicToSensorMap() {
+    return topicToSensorMap;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) {
+    this.topicToSensorMap = topicToSensorMap;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) {
+    this.messageGetStrategy = messageGetStrategy;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  public void setOutputCollector(OutputCollector collector) {
+    this.collector = collector;
   }
 
   /**
@@ -159,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     // to get the valid WriterConfiguration.  But don't store any non-serializable objects,
     // else Storm will throw a runtime error.
     Function<WriterConfiguration, WriterConfiguration> configurationXform;
-    WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+    WriterHandler writer = sensorToWriterMap.entrySet().iterator().next().getValue();
     if (writer.isWriterToBulkWriter()) {
       configurationXform = WriterToBulkWriter.TRANSFORMATION;
     } else {
@@ -189,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
-
-    // Build the Stellar cache
-    Map<String, Object> cacheConfig = new HashMap<>();
-    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
-      String sensor = entry.getKey();
-      SensorParserConfig config = getSensorParserConfig(sensor);
-
-      if (config != null) {
-        cacheConfig.putAll(config.getCacheConfig());
-      }
-    }
-    cache = CachingStellarProcessor.createCache(cacheConfig);
+    this.parserRunner.init(this::getConfigurations, initializeStellar());
 
     // Need to prep all sensors
-    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+    for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) {
       String sensor = entry.getKey();
-      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
-
-      initializeStellar();
-      if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) {
-        getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext);
-        if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
-          MessageFilter<JSONObject> filter = Filters.get(
-              getSensorParserConfig(sensor).getFilterClassName(),
-              getSensorParserConfig(sensor).getParserConfig()
-          );
-          getSensorToComponentMap().get(sensor).setFilter(filter);
-        }
-      }
-
-      parser.init();
 
       SensorParserConfig config = getSensorParserConfig(sensor);
       if (config != null) {
@@ -229,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
         throw new IllegalStateException(
             "Unable to retrieve a parser config for " + sensor);
       }
-      parser.configure(config.getParserConfig());
 
-      WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+      WriterHandler writer = sensorToWriterMap.get(sensor);
       writer.init(stormConf, context, collector, getConfigurations());
       if (defaultBatchTimeout == 0) {
         //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
@@ -246,225 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     }
   }
 
-  protected void initializeStellar() {
-    Context.Builder builder = new Context.Builder()
-                                .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-                                .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                ;
-    if(cache != null) {
-      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
-    }
-    this.stellarContext = builder.build();
-    StellarFunctions.initialize(stellarContext);
-  }
-
 
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
     if (TupleUtils.isTick(tuple)) {
-      try {
-        for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
-          entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "This should have been caught in the writerHandler.  If you see this, file a JIRA", e);
-      } finally {
-        collector.ack(tuple);
-      }
+      handleTickTuple(tuple);
       return;
     }
-
     byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+    String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+    String sensorType = topicToSensorMap.get(topic);
     try {
-      SensorParserConfig sensorParserConfig;
-      MessageParser<JSONObject> parser;
-      String sensor;
-      Map<String, Object> metadata;
-      if (sensorToComponentMap.size() == 1) {
-        // There's only one parser, so grab info directly
-        Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator()
-            .next();
-        sensor = sensorParser.getKey();
-        parser = sensorParser.getValue().getMessageParser();
-        sensorParserConfig = getSensorParserConfig(sensor);
-      } else {
-        // There's multiple parsers, so pull the topic from the Tuple and look up the sensor
-        String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
-        sensor = topicToSensorMap.get(topic);
-        parser = sensorToComponentMap.get(sensor).getMessageParser();
-        sensorParserConfig = getSensorParserConfig(sensor);
-      }
+      ParserConfigurations parserConfigurations = getConfigurations();
+      SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
+      RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
+              , tuple
+              , originalMessage
+              , sensorParserConfig.getReadMetadata()
+              , sensorParserConfig.getRawMessageStrategyConfig()
+      );
+      ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations);
+      long numWritten = parserRunnerResults.getMessages().stream()
+              .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector))
+              .filter(result -> result)
+              .count();
+      parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error));
 
-      List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
-      boolean ackTuple = false;
-      int numWritten = 0;
-      if (sensorParserConfig != null) {
-        RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
-            , tuple
-            , originalMessage
-            , sensorParserConfig.getReadMetadata()
-            , sensorParserConfig.getRawMessageStrategyConfig()
-        );
-
-        metadata = rawMessage.getMetadata();
-
-        MultilineMessageParser mmp = null;
-        if (!(parser instanceof MultilineMessageParser)) {
-          mmp = new MultilineMessageParser<JSONObject>() {
-
-            @Override
-            public void configure(Map<String, Object> config) {
-              parser.configure(config);
-            }
-
-            @Override
-            public void init() {
-              parser.init();
-            }
-
-            @Override
-            public boolean validate(JSONObject message) {
-              return parser.validate(message);
-            }
-
-            @Override
-            public List<JSONObject> parse(byte[] message) {
-              return parser.parse(message);
-            }
-
-            @Override
-            public Optional<List<JSONObject>> parseOptional(byte[] message) {
-              return parser.parseOptional(message);
-            }
-          };
-        } else {
-          mmp = (MultilineMessageParser) parser;
-        }
-
-        Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage());
-
-        // check if there is a master error
-        if (results.isPresent() && results.get().getMasterThrowable().isPresent()) {
-          handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector);
-          return;
-        }
-
-        // Handle the message results
-        List<JSONObject> messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST;
-        for (JSONObject message : messages) {
-          //we want to ack the tuple in the situation where we have are not doing a bulk write
-          //otherwise we want to defer to the writerComponent who will ack on bulk commit.
-          WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
-          ackTuple = !writer.handleAck();
-
-          sensorParserConfig.getRawMessageStrategy().mergeMetadata(
-              message,
-              metadata,
-              sensorParserConfig.getMergeMetadata(),
-              sensorParserConfig.getRawMessageStrategyConfig()
-          );
-          message.put(Constants.SENSOR_TYPE, sensor);
-
-          for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
-            if (handler != null) {
-              if (!sensorParserConfig.getMergeMetadata()) {
-                //if we haven't merged metadata, then we need to pass them along as configuration params.
-                handler.transformAndUpdate(
-                    message,
-                    stellarContext,
-                    sensorParserConfig.getParserConfig(),
-                    metadata
-                );
-              } else {
-                handler.transformAndUpdate(
-                    message,
-                    stellarContext,
-                    sensorParserConfig.getParserConfig()
-                );
-              }
-            }
-          }
-          if (!message.containsKey(Constants.GUID)) {
-            message.put(Constants.GUID, UUID.randomUUID().toString());
-          }
-
-          MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter();
-          if (filter == null || filter.emitTuple(message, stellarContext)) {
-            boolean isInvalid = !parser.validate(message);
-            List<FieldValidator> failedValidators = null;
-            if (!isInvalid) {
-              failedValidators = getFailedValidators(message, fieldValidations);
-              isInvalid = !failedValidators.isEmpty();
-            }
-            if (isInvalid) {
-              MetronError error = new MetronError()
-                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
-                  .withSensorType(Collections.singleton(sensor))
-                  .addRawMessage(message);
-              Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
-                  .flatMap(fieldValidator -> fieldValidator.getInput().stream())
-                  .collect(Collectors.toSet());
-              if (errorFields != null && !errorFields.isEmpty()) {
-                error.withErrorFields(errorFields);
-              }
-              ErrorUtils.handleError(collector, error);
-            } else {
-              numWritten++;
-              writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy);
-            }
-          }
-        }
-
-        // Handle the error results
-        Map<Object, Throwable> messageErrors = results.isPresent()
-                ? results.get().getMessageThrowables() : Collections.EMPTY_MAP;
-
-        for (Entry<Object,Throwable> entry : messageErrors.entrySet()) {
-          MetronError error = new MetronError()
-                  .withErrorType(Constants.ErrorType.PARSER_ERROR)
-                  .withThrowable(entry.getValue())
-                  .withSensorType(sensorToComponentMap.keySet())
-                  .addRawMessage(originalMessage);
-          ErrorUtils.handleError(collector, error);
-        }
-      }
       //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
       //(meaning that none of the messages are valid either globally or locally)
       //then we want to handle the ack ourselves.
-      if (ackTuple || numWritten == 0) {
+      if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) {
         collector.ack(tuple);
       }
 
     } catch (Throwable ex) {
-      handleError(originalMessage, tuple, ex, collector);
+      handleError(sensorType, originalMessage, tuple, ex, collector);
+      collector.ack(tuple);
     }
   }
 
-  protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+  protected Context initializeStellar() {
+    Map<String, Object> cacheConfig = new HashMap<>();
+    for (String sensorType: this.parserRunner.getSensorTypes()) {
+      SensorParserConfig config = getSensorParserConfig(sensorType);
+
+      if (config != null) {
+        cacheConfig.putAll(config.getCacheConfig());
+      }
+    }
+    Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(cacheConfig);
+
+    Context.Builder builder = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+            .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
+            ;
+    if(cache != null) {
+      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+    }
+    Context stellarContext = builder.build();
+    StellarFunctions.initialize(stellarContext);
+    return stellarContext;
+  }
+
+  protected void handleTickTuple(Tuple tuple) {
+    try {
+      for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
+        entry.getValue().flush(getConfigurations(), messageGetStrategy);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+              "This should have been caught in the writerHandler.  If you see this, file a JIRA", e);
+    } finally {
+      collector.ack(tuple);
+    }
+  }
+
+  protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) {
+    WriterHandler writer = sensorToWriterMap.get(sensorType);
+    try {
+      writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy);
+      return true;
+    } catch (Exception ex) {
+      handleError(sensorType, originalMessage, tuple, ex, collector);
+      return false;
+    }
+  }
+
+  protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(ex)
-            .withSensorType(sensorToComponentMap.keySet())
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(originalMessage);
     ErrorUtils.handleError(collector, error);
-    collector.ack(tuple);
-  }
-
-  private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) {
-    List<FieldValidator> failedValidators = new ArrayList<>();
-    for(FieldValidator validator : validators) {
-      if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) {
-        failedValidators.add(validator);
-      }
-    }
-    return failedValidators;
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
     declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index 9cdafa3..1fa1feb 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -67,7 +67,7 @@ public class BroMessageFilter implements MessageFilter<JSONObject>{
    */
 
   @Override
-  public boolean emitTuple(JSONObject message, Context context) {
+  public boolean emit(JSONObject message, Context context) {
     String protocol = (String) message.get(_key);
     return _known_protocols.contains(protocol);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
index 15a035a..8300ff4 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
@@ -54,7 +54,7 @@ public class StellarFilter implements MessageFilter<JSONObject> {
   }
 
   @Override
-  public boolean emitTuple(JSONObject message, Context context) {
+  public boolean emit(JSONObject message, Context context) {
     VariableResolver resolver = new MapVariableResolver(message);
     return processor.parse(query, resolver, functionResolver, context);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index b7b91c0..207c070 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context;
 
 public interface MessageFilter<T> extends Configurable{
 
-	boolean emitTuple(T message, Context context);
+	boolean emit(T message, Context context);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 665076b..c9f8351 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.interfaces;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.metron.parsers.DefaultMessageParserResult;
 
 import java.io.Serializable;
@@ -38,18 +39,42 @@ public interface MessageParser<T> extends Configurable {
    * @param rawMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
-  List<T> parse(byte[] rawMessage);
+  @Deprecated
+  default List<T> parse(byte[] rawMessage) {
+    throw new NotImplementedException("parse is not implemented");
+  }
 
   /**
    * Take raw data and convert it to an optional list of messages.
    * @param parseMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
+  @Deprecated
   default Optional<List<T>> parseOptional(byte[] parseMessage) {
     return Optional.ofNullable(parse(parseMessage));
   }
 
   /**
+   * Take raw data and convert it to messages.  Each raw message may produce multiple messages and therefore
+   * multiple errors.  A {@link MessageParserResult} is returned, which will have both the messages produced
+   * and the errors.
+   * @param parseMessage the raw bytes of the message
+   * @return Optional of {@link MessageParserResult}
+   */
+  default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
+    Optional<MessageParserResult<T>> result = Optional.empty();
+    try {
+      Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+      if (optionalMessages.isPresent()) {
+        result = Optional.of(new DefaultMessageParserResult<>(optionalMessages.get()));
+      }
+    } catch (Throwable t) {
+      return Optional.of(new DefaultMessageParserResult<>(t));
+    }
+    return result;
+  }
+
+  /**
    * Validate the message to ensure that it's correct.
    * @param message the message to validate
    * @return true if the message is valid, false if not

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
deleted file mode 100644
index 7818f9a..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.interfaces;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.metron.parsers.DefaultMessageParserResult;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public interface MultilineMessageParser<T> extends MessageParser<T> {
-
-  default List<T> parse(byte[] rawMessage) {
-    throw new NotImplementedException("parse is not implemented");
-  }
-
-  /**
-   * Take raw data and convert it to messages.  Each raw message may produce multiple messages and therefore
-   * multiple errors.  A {@link MessageParserResult} is returned, which will have both the messages produced
-   * and the errors.
-   * @param parseMessage the raw bytes of the message
-   * @return Optional of {@link MessageParserResult}
-   */
-  default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
-    List<T> list = new ArrayList<>();
-    try {
-      Optional<List<T>> optionalMessages = parseOptional(parseMessage);
-      optionalMessages.ifPresent(list::addAll);
-    } catch (Throwable t) {
-      return Optional.of(new DefaultMessageParserResult<>(t));
-    }
-    return Optional.of(new DefaultMessageParserResult<T>(list));
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
index 79a082a..5b62e85 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
@@ -26,8 +26,8 @@ import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.parsers.BasicParser;
 import org.apache.metron.parsers.DefaultMessageParserResult;
+import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.interfaces.MessageParserResult;
-import org.apache.metron.parsers.interfaces.MultilineMessageParser;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import java.util.Optional;
 /**
  * Parser for well structured RFC 5424 messages.
  */
-public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable {
+public class Syslog5424Parser implements MessageParser<JSONObject>, Serializable {
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String NIL_POLICY_CONFIG = "nilPolicy";
   private transient SyslogParser syslogParser;

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
new file mode 100644
index 0000000..eb5ff9f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+public class ParserComponent implements Serializable {
+  private static final long serialVersionUID = 7880346740026374665L;
+
+  private MessageParser<JSONObject> messageParser;
+  private MessageFilter<JSONObject> filter;
+
+  public ParserComponent(
+      MessageParser<JSONObject> messageParser,
+      MessageFilter<JSONObject> filter) {
+    this.messageParser = messageParser;
+    this.filter = filter;
+  }
+
+  public MessageParser<JSONObject> getMessageParser() {
+    return messageParser;
+  }
+
+  public MessageFilter<JSONObject> getFilter() {
+    return filter;
+  }
+
+  public void setMessageParser(
+      MessageParser<JSONObject> messageParser) {
+    this.messageParser = messageParser;
+  }
+
+  public void setFilter(
+      MessageFilter<JSONObject> filter) {
+    this.filter = filter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
deleted file mode 100644
index 32d56b9..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.topology;
-
-import java.io.Serializable;
-import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-
-public class ParserComponents implements Serializable {
-  private static final long serialVersionUID = 7880346740026374665L;
-
-  private MessageParser<JSONObject> messageParser;
-  private MessageFilter<JSONObject> filter;
-  private WriterHandler writer;
-
-  public ParserComponents(
-      MessageParser<JSONObject> messageParser,
-      MessageFilter<JSONObject> filter,
-      WriterHandler writer) {
-    this.messageParser = messageParser;
-    this.filter = filter;
-    this.writer = writer;
-  }
-
-  public MessageParser<JSONObject> getMessageParser() {
-    return messageParser;
-  }
-
-  public MessageFilter<JSONObject> getFilter() {
-    return filter;
-  }
-
-  public WriterHandler getWriter() {
-    return writer;
-  }
-
-  public void setMessageParser(
-      MessageParser<JSONObject> messageParser) {
-    this.messageParser = messageParser;
-  }
-
-  public void setFilter(
-      MessageFilter<JSONObject> filter) {
-    this.filter = filter;
-  }
-
-  public void setWriter(WriterHandler writer) {
-    this.writer = writer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d20e1a5..9dc7b88 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,11 +21,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.common.Constants;
@@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
@@ -268,29 +266,14 @@ public class ParserTopologyBuilder {
                                               Optional<String> securityProtocol,
                                               ParserConfigurations configs,
                                               Optional<String> outputTopic) {
-
-    Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+    Map<String, WriterHandler> writerConfigs = new HashMap<>();
     for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
       String sensorType = entry.getKey();
       SensorParserConfig parserConfig = entry.getValue();
-      // create message parser
-      MessageParser<JSONObject> parser = ReflectionUtils
-          .createInstance(parserConfig.getParserClassName());
-      parser.configure(parserConfig.getParserConfig());
-
-      // create message filter
-      MessageFilter<JSONObject> filter = null;
-      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
-        filter = Filters.get(
-            parserConfig.getFilterClassName(),
-            parserConfig.getParserConfig()
-        );
-      }
 
       // create a writer
       AbstractWriter writer;
       if (parserConfig.getWriterClassName() == null) {
-
         // if not configured, use a sensible default
         writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
             .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
@@ -304,16 +287,10 @@ public class ParserTopologyBuilder {
 
       // create a writer handler
       WriterHandler writerHandler = createWriterHandler(writer);
-
-      ParserComponents components = new ParserComponents(
-         parser,
-         filter,
-         writerHandler
-      );
-      parserBoltConfigs.put(sensorType, components);
+      writerConfigs.put(sensorType, writerHandler);
     }
 
-    return new ParserBolt(zookeeperUrl, parserBoltConfigs);
+    return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
index 8441409..2f3784a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -42,8 +42,8 @@ public class FiltersTest {
         put("filter.query", "exists(foo)");
       }};
       MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config);
-      Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
-      Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
+      Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
+      Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
index 9769baa..4842a1f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
@@ -19,68 +19,118 @@
 package org.apache.metron.parsers;
 
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 public class MessageParserTest {
-  @Test
-  public void testNullable() throws Exception {
-    MessageParser parser = new MessageParser() {
-      @Override
-      public void init() {
 
-      }
+  abstract class TestMessageParser implements MessageParser<JSONObject> {
+    @Override
+    public void init() {
+    }
 
-      @Override
-      public List parse(byte[] rawMessage) {
-        return null;
-      }
+    @Override
+    public boolean validate(JSONObject message) {
+      return false;
+    }
 
-      @Override
-      public boolean validate(Object message) {
-        return false;
-      }
+    @Override
+    public void configure(Map<String, Object> config) {
 
-      @Override
-      public void configure(Map<String, Object> config) {
+    }
+  }
 
+  @Test
+  public void testNullable() throws Exception {
+    MessageParser parser = new TestMessageParser() {
+      @Override
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return null;
       }
     };
-    Assert.assertNotNull(parser.parseOptional(null));
-    Assert.assertFalse(parser.parseOptional(null).isPresent());
+    Assert.assertNotNull(parser.parseOptionalResult(null));
+    Assert.assertFalse(parser.parseOptionalResult(null).isPresent());
   }
 
   @Test
   public void testNotNullable() throws Exception {
-    MessageParser parser = new MessageParser() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public void init() {
-
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return new ArrayList<>();
       }
+    };
+    Assert.assertNotNull(parser.parseOptionalResult(null));
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult(null);
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(0, ret.get().getMessages().size());
+  }
 
+  @Test
+  public void testParse() {
+    JSONObject message = new JSONObject();
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public List parse(byte[] rawMessage) {
-        return new ArrayList<>();
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return Collections.singletonList(message);
       }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(1, ret.get().getMessages().size());
+    Assert.assertEquals(message, ret.get().getMessages().get(0));
+  }
 
+  @Test
+  public void testParseOptional() {
+    JSONObject message = new JSONObject();
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public boolean validate(Object message) {
-        return false;
+      public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+        return Optional.of(Collections.singletonList(message));
       }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(1, ret.get().getMessages().size());
+    Assert.assertEquals(message, ret.get().getMessages().get(0));
+  }
 
+  @Test
+  public void testParseException() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public void configure(Map<String, Object> config) {
+      public List<JSONObject> parse(byte[] rawMessage) {
+        throw new RuntimeException("parse exception");
+      }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+    Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
+  }
 
+  @Test
+  public void testParseOptionalException() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
+      @Override
+      public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+        throw new RuntimeException("parse exception");
       }
     };
-    Assert.assertNotNull(parser.parseOptional(null));
-    Optional<List> ret = parser.parseOptional(null);
+    Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes());
     Assert.assertTrue(ret.isPresent());
-    Assert.assertEquals(0, ret.get().size());
+    Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+    Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage());
   }
+
 }