You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:25 UTC

[36/51] [abbrv] metron git commit: METRON-1761 Allow a grok statement to be applied to each line in a file. (ottobackwards) closes apache/metron#1184

METRON-1761 Allow a grok statement to be applied to each line in a file. (ottobackwards) closes apache/metron#1184


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3d923cde
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3d923cde
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3d923cde

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 3d923cde8711dcc463c2d27cb2b67275b9172112
Parents: 060d17e
Author: ottobackwards <ot...@gmail.com>
Authored: Wed Oct 10 11:31:31 2018 -0400
Committer: otto <ot...@apache.org>
Committed: Wed Oct 10 11:31:31 2018 -0400

----------------------------------------------------------------------
 .../src/main/sample/patterns/test               |   1 +
 metron-platform/metron-parsers/README.md        |  10 +-
 .../message-parser-implementation-notes.md      |  57 +++++
 .../parsers/DefaultMessageParserResult.java     |  76 +++++++
 .../org/apache/metron/parsers/GrokParser.java   | 156 ++++++++++---
 .../apache/metron/parsers/bolt/ParserBolt.java  |  57 +++--
 .../parsers/interfaces/MessageParser.java       |  28 ++-
 .../parsers/interfaces/MessageParserResult.java |  48 ++++
 .../parsers/websphere/GrokWebSphereParser.java  | 217 +++++++++----------
 .../apache/metron/parsers/GrokParserTest.java   |   1 +
 .../metron/parsers/MultiLineGrokParserTest.java | 146 +++++++++++++
 .../MultiLineWithErrorsGrokParserTest.java      | 146 +++++++++++++
 .../metron/parsers/SampleGrokParserTest.java    |   3 +
 .../apache/metron/parsers/SquidParserTest.java  |   2 +
 .../apache/metron/parsers/YafParserTest.java    |   3 +
 .../metron/parsers/bolt/ParserBoltTest.java     |  15 +-
 .../parsers/integration/ParserDriver.java       |   2 +-
 .../websphere/GrokWebSphereParserTest.java      |  13 --
 .../test/resources/logData/multi_elb_log.txt    |  10 +
 .../logData/multi_elb_with_errors_log.txt       |  13 ++
 20 files changed, 824 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-integration-test/src/main/sample/patterns/test
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/sample/patterns/test b/metron-platform/metron-integration-test/src/main/sample/patterns/test
index a88a255..ebbf9c4 100644
--- a/metron-platform/metron-integration-test/src/main/sample/patterns/test
+++ b/metron-platform/metron-integration-test/src/main/sample/patterns/test
@@ -1,2 +1,3 @@
 YAF_TIME_FORMAT %{YEAR:UNWANTED}-%{MONTHNUM:UNWANTED}-%{MONTHDAY:UNWANTED}[T ]%{HOUR:UNWANTED}:%{MINUTE:UNWANTED}:%{SECOND:UNWANTED}
 YAF_DELIMITED %{NUMBER:start_time}\|%{YAF_TIME_FORMAT:end_time}\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\|%{SPACE:UNWANTED}%{INT:protocol}\|%{SPACE:UNWANTED}%{IP:ip_src_addr}\|%{SPACE:UNWANTED}%{INT:ip_src_port}\|%{SPACE:UNWANTED}%{IP:ip_dst_addr}\|%{SPACE:UNWANTED}%{INT:ip_dst_port}\|%{SPACE:UNWANTED}%{DATA:iflags}\|%{SPACE:UNWANTED}%{DATA:uflags}\|%{SPACE:UNWANTED}%{DATA:riflags}\|%{SPACE:UNWANTED}%{DATA:ruflags}\|%{SPACE:UNWANTED}%{WORD:isn}\|%{SPACE:UNWANTED}%{DATA:risn}\|%{SPACE:UNWANTED}%{DATA:tag}\|%{GREEDYDATA:rtag}\|%{SPACE:UNWANTED}%{INT:pkt}\|%{SPACE:UNWANTED}%{INT:oct}\|%{SPACE:UNWANTED}%{INT:rpkt}\|%{SPACE:UNWANTED}%{INT:roct}\|%{SPACE:UNWANTED}%{INT:app}\|%{GREEDYDATA:end_reason}
+ELBACCESSLOGS %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport} (?:(%{IP:backendip}:?:%{INT:backendport})|-) %{NUMBER:request_processing_time} %{NUMBER:backend_processing_time} %{NUMBER:response_processing_time} (?:-|%{INT:elb_status_code}) (?:-|%{INT:backend_status_code}) %{INT:received_bytes} %{INT:sent_bytes} \"(?:-|(?:%{WORD:verb} %{URIPROTO:proto}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST:urihost})?(?:%{URIPATH:path}(?:%{URIPARAM:params})?)?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest}))\" \"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) (?:-|%{NOTSPACE:ssl_protocol})

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 381b0ee..cfcf6ed 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -29,10 +29,12 @@ There are two general types types of parsers:
   * Grok parser: `org.apache.metron.parsers.GrokParser` with possible `parserConfig` entries of 
     * `grokPath` : The path in HDFS (or in the Jar) to the grok statement
     * `patternLabel` : The pattern label to use from the grok statement
+    * `multiLine` : The raw data passed in should be handled as a long with multiple lines, with each line to be parsed separately. This setting's valid values are 'true' or 'false'.  The default if unset is 'false'. When set the parser will handle multiple lines with successfully processed lines emitted normally, and lines with errors sent to the error topic.
     * `timestampField` : The field to use for timestamp
     * `timeFields` : A list of fields to be treated as time
     * `dateFormat` : The date format to use to parse the time fields
     * `timezone` : The timezone to use. `UTC` is default.
+    * The Grok parser supports either 1 line to parse per incoming message, or incoming messages with multiple log lines, and will produce a json message per line
   * CSV Parser: `org.apache.metron.parsers.csv.CSVParser` with possible `parserConfig` entries of
     * `timestampFormat` : The date format of the timestamp to use.  If unspecified, the parser assumes the timestamp is ms since unix epoch.
     * `columns` : A map of column names you wish to extract from the CSV to their offsets (e.g. `{ 'name' : 1, 'profession' : 3}`  would be a column map for extracting the 2nd and 4th columns from a CSV)
@@ -513,9 +515,13 @@ Java parser adapters are intended for higher-velocity topologies and are not eas
 * org.apache.metron.parsers.syslog.Syslog5424Parser : Parse Syslog RFC 5424 messages
 
 ### Grok Parser Adapters
-Grok parser adapters are designed primarly for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies.  Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible.  Grok parsers are defined via a config file and the topplogy does not need to be recombiled in order to make changes to them.  An example of a Grok perser is:
+Grok parser adapters are designed primarily for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies.  Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible.  Grok parsers are defined via a config file and the topplogy does not need to be recompiled in order to make changes to them.  Example of a Grok parsers are:
 
-* org.apache.metron.parsers.GrokParser
+* org.apache.metron.parsers.GrokParser and org.apache.metron.parsers.websphere.GrokWebSphereParser
+
+Parsers that derive from GrokParser typically allow the GrokParser to parse the messages, and then override the methods for postParse to do further parsing.
+When this is the case, and the Parser has not overridden `parse(byte[])` or `parseResultOptional(byte[])` these parsers will gain support for treating byte[] input as multiple lines, with each line parsed as a separate message ( and returned as such).
+This is enabled by using the `"multiline":"true"` Parser configuration option.
 
 For more information on the Grok project please refer to the following link:
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/message-parser-implementation-notes.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/message-parser-implementation-notes.md b/metron-platform/metron-parsers/message-parser-implementation-notes.md
new file mode 100644
index 0000000..b8afe04
--- /dev/null
+++ b/metron-platform/metron-parsers/message-parser-implementation-notes.md
@@ -0,0 +1,57 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# `MessageParser` implementation notes 
+
+
+1. Supporting multiple JSONObject returns from a single byte[]
+The original `MessageParser` interface supported parsing a message and returning a `List<JSONObject>`.  Therefore explicitly supporting multiple messages from one input.
+While this is fine, it only allows for the complete failure of a message for any reason.  There can only be one exception thrown.  This means that if there _are_ multiple messages in the buffer, any one failure will necessarily fail all of them.
+To improve on this situation, a new method was added to the `MessageParser` interface (with a default implementation), that introduces a return type to provide not only the JSONObjects produced, but also a `Map` of messages -> throwable.
+
+To support this in your parser, you should:
+
+- Implement the new method
+
+```java
+ @Override
+  public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage)
+```
+
+- Implement the original `List<JSONObject> parse(byte[] message)` to delegate to that method such as below:
+
+```java
+ @Override
+  public List<JSONObject> parse(byte[] rawMessage) {
+    Optional<MessageParserResult<JSONObject>> resultOptional = parseOptionalResult(rawMessage);
+    if (!resultOptional.isPresent()) {
+      return Collections.EMPTY_LIST;
+    }
+    Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables();
+    if (!errors.isEmpty()) {
+      throw new RuntimeException(errors.entrySet().iterator().next().getValue());
+    }
+
+    return resultOptional.get().getMessages();
+  }
+```
+
+- You *may* want to govern treating the incoming buffer as multiline or not by adding a configuration option for your parser, such as `"multiline":"true"|"false"`
+
+- See the org.apache.metron.parsers.GrokParser for an example of this implementation.
+
+The Metron system itself will call the new `parseOptionalResult` method during processing.  The default implementation in the interface handles backwards compatability with previous implementations.

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
new file mode 100644
index 0000000..11d15eb
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers;
+
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class DefaultMessageParserResult<T> implements MessageParserResult<T> {
+  private List<T> messages = new ArrayList<>();
+  private Map<Object, Throwable> errors = new HashMap<>();
+  private Throwable masterThrowable;
+
+  public DefaultMessageParserResult() {
+  }
+
+  public DefaultMessageParserResult(Throwable masterThrowable) {
+    this.masterThrowable = masterThrowable;
+  }
+
+  public DefaultMessageParserResult(List<T> list) {
+    messages.addAll(list);
+  }
+
+  public DefaultMessageParserResult(Map<Object, Throwable> map) {
+    errors.putAll(map);
+  }
+
+  public DefaultMessageParserResult(List<T> list, Map<Object, Throwable> map) {
+    messages.addAll(list);
+    errors.putAll(map);
+  }
+
+  public void addMessage(T message) {
+    messages.add(message);
+  }
+
+  public void addError(Object message, Throwable throwable) {
+    errors.put(message, throwable);
+  }
+
+  @Override
+  public List<T> getMessages() {
+    return messages;
+  }
+
+  @Override
+  public Map<Object, Throwable> getMessageThrowables() {
+    return errors;
+  }
+
+  @Override
+  public Optional<Throwable> getMasterThrowable() {
+    return Optional.ofNullable(masterThrowable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index 99ac390..bead477 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,32 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.metron.parsers;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.Constants;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+import java.io.StringReader;
 import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TimeZone;
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.Constants;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 
 public class GrokParser implements MessageParser<JSONObject>, Serializable {
 
@@ -48,6 +59,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
 
   protected transient Grok grok;
   protected String grokPath;
+  protected boolean multiLine = false;
   protected String patternLabel;
   protected List<String> timeFields = new ArrayList<>();
   protected String timestampField;
@@ -55,8 +67,13 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
   protected String patternsCommonDir = "/patterns/common";
 
   @Override
+  @SuppressWarnings("unchecked")
   public void configure(Map<String, Object> parserConfig) {
     this.grokPath = (String) parserConfig.get("grokPath");
+    String multiLineString = (String) parserConfig.get("multiLine");
+    if (!StringUtils.isBlank(multiLineString)) {
+      multiLine = Boolean.parseBoolean(multiLineString);
+    }
     this.patternLabel = (String) parserConfig.get("patternLabel");
     this.timestampField = (String) parserConfig.get("timestampField");
     List<String> timeFieldsParam = (List<String>) parserConfig.get("timeFields");
@@ -127,10 +144,87 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
   @SuppressWarnings("unchecked")
   @Override
   public List<JSONObject> parse(byte[] rawMessage) {
+    Optional<MessageParserResult<JSONObject>> resultOptional = parseOptionalResult(rawMessage);
+    if (!resultOptional.isPresent()) {
+      return Collections.EMPTY_LIST;
+    }
+    Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables();
+    if (!errors.isEmpty()) {
+      throw new RuntimeException(errors.entrySet().iterator().next().getValue());
+    }
+
+    return resultOptional.get().getMessages();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) {
     if (grok == null) {
       init();
     }
+    if (multiLine) {
+      return parseMultiLine(rawMessage);
+    }
+    return parseSingleLine(rawMessage);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Optional<MessageParserResult<JSONObject>> parseMultiLine(byte[] rawMessage) {
+    List<JSONObject> messages = new ArrayList<>();
+    Map<Object,Throwable> errors = new HashMap<>();
+    String originalMessage = null;
+    // read the incoming raw data as if it may have multiple lines of logs
+    // if there is only only one line, it will just get processed.
+    try (BufferedReader reader = new BufferedReader(new StringReader(new String(rawMessage, StandardCharsets.UTF_8)))) {
+      while ((originalMessage = reader.readLine()) != null) {
+        LOG.debug("Grok parser parsing message: {}", originalMessage);
+        try {
+          Match gm = grok.match(originalMessage);
+          gm.captures();
+          JSONObject message = new JSONObject();
+          message.putAll(gm.toMap());
+
+          if (message.size() == 0) {
+            Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: "
+                    + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: "
+                    + grokPath);
+            errors.put(originalMessage, rte);
+            continue;
+          }
+          message.put("original_string", originalMessage);
+          for (String timeField : timeFields) {
+            String fieldValue = (String) message.get(timeField);
+            if (fieldValue != null) {
+              message.put(timeField, toEpoch(fieldValue));
+            }
+          }
+          if (timestampField != null) {
+            message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField)));
+          }
+          message.remove(patternLabel);
+          postParse(message);
+          messages.add(message);
+          LOG.debug("Grok parser parsed message: {}", message);
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          errors.put(originalMessage, e);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      Exception innerException = new IllegalStateException("Grok parser Error: "
+              + e.getMessage()
+              + " on "
+              + originalMessage, e);
+      return Optional.of(new DefaultMessageParserResult<>(innerException));
+    }
+    return Optional.of(new DefaultMessageParserResult<>(messages, errors));
+  }
+
+  @SuppressWarnings("unchecked")
+  private Optional<MessageParserResult<JSONObject>> parseSingleLine(byte[] rawMessage) {
     List<JSONObject> messages = new ArrayList<>();
+    Map<Object,Throwable> errors = new HashMap<>();
     String originalMessage = null;
     try {
       originalMessage = new String(rawMessage, "UTF-8");
@@ -140,30 +234,36 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
       JSONObject message = new JSONObject();
       message.putAll(gm.toMap());
 
-      if (message.size() == 0)
-        throw new RuntimeException("Grok statement produced a null message. Original message was: "
+      if (message.size() == 0) {
+        Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: "
                 + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: "
                 + grokPath);
-
-      message.put("original_string", originalMessage);
-      for (String timeField : timeFields) {
-        String fieldValue = (String) message.get(timeField);
-        if (fieldValue != null) {
-          message.put(timeField, toEpoch(fieldValue));
+        errors.put(originalMessage, rte);
+      } else {
+        message.put("original_string", originalMessage);
+        for (String timeField : timeFields) {
+          String fieldValue = (String) message.get(timeField);
+          if (fieldValue != null) {
+            message.put(timeField, toEpoch(fieldValue));
+          }
         }
+        if (timestampField != null) {
+          message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField)));
+        }
+        message.remove(patternLabel);
+        postParse(message);
+        messages.add(message);
+        LOG.debug("Grok parser parsed message: {}", message);
       }
-      if (timestampField != null) {
-        message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField)));
-      }
-      message.remove(patternLabel);
-      postParse(message);
-      messages.add(message);
-      LOG.debug("Grok parser parsed message: {}", message);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
-      throw new IllegalStateException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
+      Exception innerException = new IllegalStateException("Grok parser Error: "
+              + e.getMessage()
+              + " on "
+              + originalMessage, e);
+      return Optional.of(new DefaultMessageParserResult<>(innerException));
     }
-    return messages;
+    return Optional.of(new DefaultMessageParserResult<JSONObject>(messages, errors));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 213d02c..ff5c1d4 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,23 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.metron.parsers.bolt;
 
 
 import com.github.benmanes.caffeine.cache.Cache;
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
@@ -48,6 +36,7 @@ import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
 import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
@@ -66,6 +55,20 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
 
@@ -306,8 +309,17 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
         );
         metadata = rawMessage.getMetadata();
 
-        Optional<List<JSONObject>> messages = parser.parseOptional(rawMessage.getMessage());
-        for (JSONObject message : messages.orElse(Collections.emptyList())) {
+        Optional<MessageParserResult<JSONObject>> results = parser.parseOptionalResult(rawMessage.getMessage());
+
+        // check if there is a master error
+        if (results.isPresent() && results.get().getMasterThrowable().isPresent()) {
+          handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector);
+          return;
+        }
+
+        // Handle the message results
+        List<JSONObject> messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST;
+        for (JSONObject message : messages) {
           //we want to ack the tuple in the situation where we have are not doing a bulk write
           //otherwise we want to defer to the writerComponent who will ack on bulk commit.
           WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
@@ -370,6 +382,19 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
             }
           }
         }
+
+        // Handle the error results
+        Map<Object, Throwable> messageErrors = results.isPresent()
+                ? results.get().getMessageThrowables() : Collections.EMPTY_MAP;
+
+        for (Entry<Object,Throwable> entry : messageErrors.entrySet()) {
+          MetronError error = new MetronError()
+                  .withErrorType(Constants.ErrorType.PARSER_ERROR)
+                  .withThrowable(entry.getValue())
+                  .withSensorType(sensorToComponentMap.keySet())
+                  .addRawMessage(originalMessage);
+          ErrorUtils.handleError(collector, error);
+        }
       }
       //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
       //(meaning that none of the messages are valid either globally or locally)

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index e3b903e..f8243b9 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -17,7 +17,11 @@
  */
 package org.apache.metron.parsers.interfaces;
 
+import org.apache.metron.parsers.DefaultMessageParserResult;
+
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -31,14 +35,14 @@ public interface MessageParser<T> extends Configurable {
   /**
    * Take raw data and convert it to a list of messages.
    *
-   * @param rawMessage
+   * @param rawMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
   List<T> parse(byte[] rawMessage);
 
   /**
    * Take raw data and convert it to an optional list of messages.
-   * @param parseMessage
+   * @param parseMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
   default Optional<List<T>> parseOptional(byte[] parseMessage) {
@@ -46,8 +50,26 @@ public interface MessageParser<T> extends Configurable {
   }
 
   /**
+   * Take raw data and convert it to messages.  Each raw message may produce multiple messages and therefore
+   * multiple errors.  A {@link MessageParserResult} is returned, which will have both the messages produced
+   * and the errors.
+   * @param parseMessage the raw bytes of the message
+   * @return Optional of {@link MessageParserResult}
+   */
+  default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) {
+    List<T> list = new ArrayList<>();
+    try {
+      Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+      optionalMessages.ifPresent(list::addAll);
+    } catch (Throwable t) {
+      return Optional.of(new DefaultMessageParserResult<>(t));
+    }
+    return Optional.of(new DefaultMessageParserResult<T>(list));
+  }
+
+  /**
    * Validate the message to ensure that it's correct.
-   * @param message
+   * @param message the message to validate
    * @return true if the message is valid, false if not
    */
   boolean validate(T message);

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
new file mode 100644
index 0000000..891e94f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.interfaces;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Result object MessageParser calls.
+ * @param <T>
+ */
+public interface MessageParserResult<T> {
+  /**
+   * Returns the Message objects of {@code T}
+   * @return {@code List}
+   */
+  List<T> getMessages();
+
+  /**
+   * Returns a map of raw message objects to the {@code Throwable} they triggered.
+   * @return {@code Map}
+   */
+  Map<Object,Throwable> getMessageThrowables();
+
+  /**
+   * Returns a master {@code Throwable} for a parse call.  This represents a complete
+   * call failure, as opposed to one associated with a message.
+   * @return {@code Optional}{@code Throwable}
+   */
+  Optional<Throwable> getMasterThrowable();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
index 178719b..a58e0c9 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,117 +27,114 @@ import java.util.Iterator;
 
 public class GrokWebSphereParser extends GrokParser {
 
-	private static final long serialVersionUID = 4860439408055777358L;
+  private static final long serialVersionUID = 4860439408055777358L;
 
-	@Override
-	protected long formatTimestamp(Object value) {
-		long epochTimestamp = System.currentTimeMillis();
-		if (value != null) {
-			try {
-				epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR)  + " " + value);
-			} catch (ParseException e) {
-				//default to current time
-			}
-		}
-		return epochTimestamp;
-	}
+  @Override
+  protected long formatTimestamp(Object value) {
+    long epochTimestamp = System.currentTimeMillis();
+    if (value != null) {
+      try {
+        epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR) + " " + value);
+      } catch (ParseException e) {
+        //default to current time
+      }
+    }
+    return epochTimestamp;
+  }
 
-	@Override
-	protected void postParse(JSONObject message) {
-		removeEmptyFields(message);
-		message.remove("timestamp_string");
-		if (message.containsKey("message")) {
-			String messageValue = (String) message.get("message");
-			if (messageValue.contains("logged into")) {
-				parseLoginMessage(message);
-			}
-			else if (messageValue.contains("logged out")) {
-				parseLogoutMessage(message);
-			}
-			else if (messageValue.contains("rbm(")) {
-				parseRBMMessage(message);
-			}
-			else {
-				parseOtherMessage(message);
-			}
-		}
-	}
+  @Override
+  protected void postParse(JSONObject message) {
+    removeEmptyFields(message);
+    message.remove("timestamp_string");
+    if (message.containsKey("message")) {
+      String messageValue = (String) message.get("message");
+      if (messageValue.contains("logged into")) {
+        parseLoginMessage(message);
+      } else if (messageValue.contains("logged out")) {
+        parseLogoutMessage(message);
+      } else if (messageValue.contains("rbm(")) {
+        parseRBMMessage(message);
+      } else {
+        parseOtherMessage(message);
+      }
+    }
+  }
 
-	@SuppressWarnings("unchecked")
-	private void removeEmptyFields(JSONObject json) {
-		Iterator<Object> keyIter = json.keySet().iterator();
-		while (keyIter.hasNext()) {
-			Object key = keyIter.next();
-			Object value = json.get(key);
-			if (null == value || "".equals(value.toString())) {
-				keyIter.remove();
-			}
-		}
-	}
+  @SuppressWarnings("unchecked")
+  private void removeEmptyFields(JSONObject json) {
+    Iterator<Object> keyIter = json.keySet().iterator();
+    while (keyIter.hasNext()) {
+      Object key = keyIter.next();
+      Object value = json.get(key);
+      if (null == value || "".equals(value.toString())) {
+        keyIter.remove();
+      }
+    }
+  }
 
-	//Extracts the appropriate fields from login messages
-	@SuppressWarnings("unchecked")
-	private void parseLoginMessage(JSONObject json) {
-		json.put("event_subtype", "login");
-		String message = (String) json.get("message");
-		if (message.contains(":")){
-			String parts[] = message.split(":");
-			String user = parts[0];
-			String ip_src_addr = parts[1];
-			if (user.contains("user(") && user.contains(")")) {	
-				user = user.substring(user.indexOf("user(") + "user(".length());
-				user = user.substring(0, user.indexOf(")"));
-				json.put("username", user);
-			}
-			if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) {
-				ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
-				ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]"));
-				json.put("ip_src_addr", ip_src_addr);
-			}
-			json.remove("message");
-		}
-	}
+  //Extracts the appropriate fields from login messages
+  @SuppressWarnings("unchecked")
+  private void parseLoginMessage(JSONObject json) {
+    json.put("event_subtype", "login");
+    String message = (String) json.get("message");
+    if (message.contains(":")) {
+      String[] parts = message.split(":");
+      String user = parts[0];
+      String ip_src_addr = parts[1];
+      if (user.contains("user(") && user.contains(")")) {
+        user = user.substring(user.indexOf("user(") + "user(".length());
+        user = user.substring(0, user.indexOf(")"));
+        json.put("username", user);
+      }
+      if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) {
+        ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
+        ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]"));
+        json.put("ip_src_addr", ip_src_addr);
+      }
+      json.remove("message");
+    }
+  }
 
-	//Extracts the appropriate fields from logout messages
-	@SuppressWarnings("unchecked")
-	private void parseLogoutMessage(JSONObject json) {
-		json.put("event_subtype", "logout");
-		String message = (String) json.get("message");
-		if (message.matches(".*'.*'.*'.*'.*")) {
-			String parts[] = message.split("'");
-			String ip_src_addr = parts[0];
-			if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) {
-				ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
-				ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]"));
-				json.put("ip_src_addr", ip_src_addr);
-			}
-			json.put("username", parts[1]);
-			json.put("security_domain", parts[3]);
-			json.remove("message");
-		}
-	}
+  //Extracts the appropriate fields from logout messages
+  @SuppressWarnings("unchecked")
+  private void parseLogoutMessage(JSONObject json) {
+    json.put("event_subtype", "logout");
+    String message = (String) json.get("message");
+    if (message.matches(".*'.*'.*'.*'.*")) {
+      String parts[] = message.split("'");
+      String ip_src_addr = parts[0];
+      if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) {
+        ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
+        ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]"));
+        json.put("ip_src_addr", ip_src_addr);
+      }
+      json.put("username", parts[1]);
+      json.put("security_domain", parts[3]);
+      json.remove("message");
+    }
+  }
 
-	//Extracts the appropriate fields from RBM messages
-	@SuppressWarnings("unchecked")
-	private void parseRBMMessage(JSONObject json) {
-		String message = (String) json.get("message");
-		if (message.contains("(")) {
-			json.put("process", message.substring(0, message.indexOf("(")));
-			if (message.contains(":")) {
-				json.put("message", message.substring(message.indexOf(":") + 2));	
-			}
-		}
-	}
+  //Extracts the appropriate fields from RBM messages
+  @SuppressWarnings("unchecked")
+  private void parseRBMMessage(JSONObject json) {
+    String message = (String) json.get("message");
+    if (message.contains("(")) {
+      json.put("process", message.substring(0, message.indexOf("(")));
+      if (message.contains(":")) {
+        json.put("message", message.substring(message.indexOf(":") + 2));
+      }
+    }
+  }
 
-	//Extracts the appropriate fields from other messages
-	@SuppressWarnings("unchecked")
-	private void parseOtherMessage(JSONObject json) {
-		String message = (String) json.get("message");
-		if (message.contains("(")) {
-			json.put("process", message.substring(0, message.indexOf("(")));
-			if (message.contains(":")) {
-				json.put("message", message.substring(message.indexOf(":") + 2));	
-			}
-		}
-	}
+  //Extracts the appropriate fields from other messages
+  @SuppressWarnings("unchecked")
+  private void parseOtherMessage(JSONObject json) {
+    String message = (String) json.get("message");
+    if (message.contains("(")) {
+      json.put("process", message.substring(0, message.indexOf("(")));
+      if (message.contains(":")) {
+        json.put("message", message.substring(message.indexOf(":") + 2));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
index 1a50dea..7fa6a31 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
@@ -92,4 +92,5 @@ public abstract class GrokParserTest {
   public abstract List<String> getTimeFields();
   public abstract String getDateFormat();
   public abstract String getTimestampField();
+  public abstract String getMultiLine();
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
new file mode 100644
index 0000000..cc4d20f
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class MultiLineGrokParserTest {
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLegacyInterfaceReturnsMultiline() throws IOException, ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine", getMultiLine());
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      List<JSONObject> parsedList = grokParser.parse(rawMessage);
+      Assert.assertEquals(10, parsedList.size());
+    }
+  }
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages using the
+   * parseOptionalResult call.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testOptionalResultReturnsMultiline() throws IOException, ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine", getMultiLine());
+
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage);
+      Assert.assertTrue(resultOptional.isPresent());
+      Optional<Throwable> throwableOptional = resultOptional.get().getMasterThrowable();
+      List<JSONObject>  resultList = resultOptional.get().getMessages();
+      Map<Object,Throwable> errorMap = resultOptional.get().getMessageThrowables();
+      Assert.assertFalse(throwableOptional.isPresent());
+      Assert.assertEquals(0, errorMap.size());
+      Assert.assertEquals(10, resultList.size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map getTestData() {
+
+    Map testData = new HashMap<String, String>();
+    String input;
+    try (FileInputStream stream = new FileInputStream(new File("src/test/resources/logData/multi_elb_log.txt"))) {
+      input = IOUtils.toString(stream);
+    } catch (IOException ioe) {
+      throw new IllegalStateException("failed to open file", ioe);
+    }
+    // not checking values, just that we get the right number of messages
+    testData.put(input, "");
+    return testData;
+
+  }
+
+  public String getMultiLine() { return "true";}
+
+  public String getGrokPath() {
+    return "../metron-integration-test/src/main/sample/patterns/test";
+  }
+
+  public String getGrokPatternLabel() {
+    return "ELBACCESSLOGS";
+  }
+
+  public List<String> getTimeFields() {
+    return new ArrayList<String>() {{
+      add("timestamp");
+    }};
+  }
+
+  public String getDateFormat() {
+    return "yyyy-MM-dd'T'HH:mm:ss.S'Z'";
+  }
+
+  public String getTimestampField() {
+    return "timestamp";
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
new file mode 100644
index 0000000..8ab8246
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class MultiLineWithErrorsGrokParserTest {
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test(expected = RuntimeException.class)
+  @SuppressWarnings("unchecked")
+  public void testLegacyInterfaceThrowsOneExceptionWithMultiline() throws IOException, ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine",getMultiLine());
+
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      List<JSONObject> parsedList = grokParser.parse(rawMessage);
+    }
+  }
+
+  /**
+   * Test that if a byte[] with multiple lines of log is passed in
+   * it will be parsed into the correct number of messages using the
+   * parseOptionalResult call.
+   * @throws IOException if we can't read from disk
+   * @throws ParseException if we can't parse
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testResultInterfaceReturnsErrorsAndMessagesWithMultiline() throws IOException, ParseException {
+
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    parserConfig.put("multiLine",getMultiLine());
+
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
+    grokParser.init();
+
+    JSONParser jsonParser = new JSONParser();
+    Map<String, String> testData = getTestData();
+    for (Map.Entry<String, String> e : testData.entrySet()) {
+      byte[] rawMessage = e.getKey().getBytes();
+      Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage);
+      Assert.assertTrue(resultOptional.isPresent());
+      Optional<Throwable> throwableOptional = resultOptional.get().getMasterThrowable();
+      List<JSONObject>  resultList = resultOptional.get().getMessages();
+      Map<Object,Throwable> errorMap = resultOptional.get().getMessageThrowables();
+      Assert.assertFalse(throwableOptional.isPresent());
+      Assert.assertEquals(3, errorMap.size());
+      Assert.assertEquals(10, resultList.size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map getTestData() {
+
+    Map testData = new HashMap<String, String>();
+    String input;
+    try (FileInputStream stream = new FileInputStream(new File("src/test/resources/logData/multi_elb_with_errors_log.txt"))) {
+      input = IOUtils.toString(stream);
+    } catch (IOException ioe) {
+      throw new IllegalStateException("failed to open file", ioe);
+    }
+    // not checking values, just that we get the right number of messages
+    testData.put(input, "");
+    return testData;
+
+  }
+
+  public String getGrokPath() {
+    return "../metron-integration-test/src/main/sample/patterns/test";
+  }
+
+  public String getGrokPatternLabel() {
+    return "ELBACCESSLOGS";
+  }
+
+  public List<String> getTimeFields() {
+    return new ArrayList<String>() {{
+      add("timestamp");
+    }};
+  }
+
+  public String getMultiLine() { return "true"; }
+
+  public String getDateFormat() {
+    return "yyyy-MM-dd'T'HH:mm:ss.S'Z'";
+  }
+
+  public String getTimestampField() {
+    return "timestamp";
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
index e060559..35e07f8 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
@@ -69,6 +69,9 @@ public class SampleGrokParserTest extends GrokParserTest {
   }
 
   @Override
+  public String getMultiLine() { return "false"; }
+
+  @Override
   public String getGrokPath() {
     return "../metron-integration-test/src/main/sample/patterns/test";
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
index 93c8276..cb424fb 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
@@ -73,6 +73,8 @@ public class SquidParserTest extends GrokParserTest {
 
   }
 
+  @Override
+  public String getMultiLine() { return "false"; }
 
   @Override
   public String getGrokPath() {

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
index 8dd75a0..15ce19f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
@@ -69,6 +69,9 @@ public class YafParserTest extends GrokParserTest {
   }
 
   @Override
+  public String getMultiLine() { return "false"; }
+
+  @Override
   public String getGrokPath() {
     return "../metron-parsers/src/main/resources/patterns/yaf";
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 06f4cec..0ae2817 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -56,6 +56,7 @@ import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
 import org.apache.metron.parsers.BasicParser;
+import org.apache.metron.parsers.DefaultMessageParserResult;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.topology.ParserComponents;
@@ -254,7 +255,7 @@ public class ParserBoltTest extends BaseBoltTest {
     parsedMessage.put("guid", "this-is-unique-identifier-for-tuple");
     List<JSONObject> messageList = new ArrayList<>();
     messageList.add(parsedMessage);
-    when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messageList));
+    when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messageList)));
     when(parser.validate(parsedMessage)).thenReturn(true);
     parserBolt.execute(tuple);
 
@@ -303,7 +304,7 @@ public class ParserBoltTest extends BaseBoltTest {
     final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
     final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }");
     when(tuple.getBinary(0)).thenReturn(sampleBinary);
-    when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messages));
+    when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messages)));
     when(parser.validate(eq(messages.get(0)))).thenReturn(true);
     when(parser.validate(eq(messages.get(1)))).thenReturn(false);
     parserBolt.execute(tuple);
@@ -358,9 +359,9 @@ public class ParserBoltTest extends BaseBoltTest {
     when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1));
     when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse);
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
+    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
       put("field1", "blah");
-    }}))));
+    }})))));
     parserBolt.execute(t1);
     verify(batchWriter, times(1)).write(any(), any(), any(), any());
     verify(outputCollector, times(1)).ack(t1);
@@ -543,7 +544,7 @@ public class ParserBoltTest extends BaseBoltTest {
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
     int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1;
     BulkWriterResponse response = new BulkWriterResponse();
@@ -624,7 +625,7 @@ public class ParserBoltTest extends BaseBoltTest {
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
     Set<Tuple> tuples = Stream.of(t1, t2, t3, t4, t5).collect(Collectors.toSet());
     BulkWriterResponse response = new BulkWriterResponse();
@@ -671,7 +672,7 @@ public class ParserBoltTest extends BaseBoltTest {
 
     doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
+    when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject()))));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
     parserBolt.execute(t1);
     parserBolt.execute(t2);

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index 2cba40a..0d6eef8 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -130,11 +130,11 @@ public class ParserDriver implements Serializable {
       LOG.error("Error parsing message: " + ex.getMessage(), ex);
     }
 
+    @SuppressWarnings("unchecked")
     public ProcessorResult<List<byte[]>> getResults() {
       return new ProcessorResult.Builder<List<byte[]>>().withProcessErrors(errors)
                                                         .withResult(output)
                                                         .build();
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
index 230c147..2923a4f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
@@ -244,17 +244,4 @@ public class GrokWebSphereParserTest {
 		assertEquals("trans 191)  admindefaultsystem*): ntp-service 'NTP Service' - Operational state down:", parsedJSON.get("message"));
 	}
 	
-	
-	@Test(expected=RuntimeException.class)
-	public void testParseEmptyLine() throws Exception {
-		
-		//Set up parser, attempt to parse malformed message
-		GrokWebSphereParser parser = new GrokWebSphereParser();
-		parser.configure(parserConfig);
-		String testString = "";
-		UnitTestHelper.setLog4jLevel(GrokParser.class, Level.FATAL);
-		List<JSONObject> result = parser.parse(testString.getBytes());
-		UnitTestHelper.setLog4jLevel(GrokParser.class, Level.ERROR);
-	}
-		
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt
new file mode 100644
index 0000000..95d3fec
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt
@@ -0,0 +1,10 @@
+2018-08-30T21:02:32.047266Z vault-development 192.168.113.53:60002 192.168.82.130:8200 0.000695 0.000017 0.000015 - - 607 3078 "- - - " "-" - -
+2018-08-30T21:02:22.119595Z vault-development 192.168.113.29:57322 192.168.72.75:8200 0.00051 0.00001 0.000013 - - 607 3079 "- - - " "-" - -
+2018-08-30T21:05:58.275961Z vault-production 192.168.205.159:58390 192.168.68.196:8200 0.000767 0.000009 0.000012 - - 673 3210 "- - - " "-" - -
+2018-08-30T21:05:59.222277Z vault-production 192.168.228.182:26358 192.168.81.224:8200 0.000882 0.000014 0.000024 - - 519 3920 "- - - " "-" - -
+2018-08-30T21:05:59.234471Z vault-production 192.168.228.182:35506 192.168.79.6:8200 0.000377 0.000011 0.000009 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:05:59.237375Z vault-production 192.168.228.182:52516 192.168.68.196:8200 0.000628 0.000007 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.235460Z vault-production 192.168.228.182:41783 192.168.79.6:8200 0.000309 0.000006 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.226698Z vault-production 192.168.228.182:40008 192.168.81.224:8200 0.000955 0.000014 0.000013 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:06:05.237946Z vault-production 192.168.228.182:19261 192.168.68.196:8200 0.000661 0.000006 0.000009 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:11.229542Z vault-production 192.168.228.182:44082 192.168.81.224:8200 0.000912 0.000009 0.000014 - - 519 3919 "- - - " "-" - -

http://git-wip-us.apache.org/repos/asf/metron/blob/3d923cde/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
new file mode 100644
index 0000000..3525fc4
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
@@ -0,0 +1,13 @@
+2018-08-30T21:02:32.047266Z vault-development 192.168.113.53:60002 192.168.82.130:8200 0.000695 0.000017 0.000015 - - 607 3078 "- - - " "-" - -
+2018-08-30T21:02:22.119595Z vault-development 192.168.113.29:57322 192.168.72.75:8200 0.00051 0.00001 0.000013 - - 607 3079 "- - - " "-" - -
+2018-08-30T21:05:58.275961Z vault-production 192.168.205.159:58390 192.168.68.196:8200 0.000767 0.000009 0.000012 - - 673 3210 "- - - " "-" - -
+2018-08-30T21:05:59.222277Z vault-production 192.168.228.182:26358 192.168.81.224:8200 0.000882 0.000014 0.000024 - - 519 3920 "- - - " "-" - -
+BOOM
+BLAM
+BOP
+2018-08-30T21:05:59.234471Z vault-production 192.168.228.182:35506 192.168.79.6:8200 0.000377 0.000011 0.000009 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:05:59.237375Z vault-production 192.168.228.182:52516 192.168.68.196:8200 0.000628 0.000007 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.235460Z vault-production 192.168.228.182:41783 192.168.79.6:8200 0.000309 0.000006 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.226698Z vault-production 192.168.228.182:40008 192.168.81.224:8200 0.000955 0.000014 0.000013 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:06:05.237946Z vault-production 192.168.228.182:19261 192.168.68.196:8200 0.000661 0.000006 0.000009 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:11.229542Z vault-production 192.168.228.182:44082 192.168.81.224:8200 0.000912 0.000009 0.000014 - - 519 3919 "- - - " "-" - -