You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2018/07/10 20:25:29 UTC

[2/2] metron git commit: METRON-1644: Support parser chaining closes apache/incubator-metron#1084

METRON-1644: Support parser chaining closes apache/incubator-metron#1084


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cbdaee17
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cbdaee17
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cbdaee17

Branch: refs/heads/master
Commit: cbdaee174dc81280efb0f2aaa8e0028f0c930281
Parents: b2a22b8
Author: cstella <ce...@gmail.com>
Authored: Tue Jul 10 16:24:32 2018 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Jul 10 16:24:32 2018 -0400

----------------------------------------------------------------------
 .../org/apache/metron/common/Constants.java     |   1 -
 .../configuration/SensorParserConfig.java       |  69 ++++-
 .../metadata/DefaultRawMessageStrategy.java     |  86 ++++++
 .../metadata/EnvelopedRawMessageStrategy.java   | 146 ++++++++++
 .../common/message/metadata/MetadataUtil.java   | 124 ++++++++
 .../common/message/metadata/RawMessage.java     |  85 ++++++
 .../message/metadata/RawMessageStrategies.java  |  81 ++++++
 .../message/metadata/RawMessageStrategy.java    |  77 +++++
 .../common/message/metadata/RawMessageUtil.java |  59 ++++
 .../common/bolt/ConfiguredParserBoltTest.java   |  31 +-
 .../message/metadata/RawMessageUtilTest.java    | 282 +++++++++++++++++++
 .../components/FluxTopologyComponent.java       |   9 +
 .../metron-parsers/ParserChaining.md            | 179 ++++++++++++
 metron-platform/metron-parsers/README.md        |  37 ++-
 .../apache/metron/parsers/bolt/ParserBolt.java  |  60 ++--
 .../EnvelopedParserIntegrationTest.java         | 209 ++++++++++++++
 .../parsers/integration/ParserDriver.java       |   2 +-
 .../src/test/resources/patterns/cisco_patterns  |   6 +
 pom.xml                                         |   1 +
 use-cases/parser_chaining/README.md             | 235 ++++++++++++++++
 .../message_routing_high_level.svg              |  14 +
 .../message_routing_high_level.xml              |  14 +
 22 files changed, 1741 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 4a8bea2..5054508 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 public class Constants {
 
-  public static final String METADATA_PREFIX = "metron.metadata.";
   public static final String ZOOKEEPER_ROOT = "/metron";
   public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology";
   public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 1dfb045..01630c1 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.metron.common.message.metadata.RawMessageStrategy;
+import org.apache.metron.common.message.metadata.RawMessageStrategies;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
@@ -29,6 +31,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * The configuration object that defines a parser for a given sensor.  Each
@@ -86,18 +89,25 @@ public class SensorParserConfig implements Serializable {
    * transformations. If true, the parser field transformations can access
    * parser metadata values.
    *
-   * <p>By default, this is false and parser metadata is not available
-   * to the field transformations.
+   * <p>The default is dependent upon the raw message strategy used:
+   * <ul>
+   * <li>The default strategy sets this to false and metadata is not read by default.</li>
+   * <li>The envelope strategy sets this to true and metadata is read by default.</li>
+   * </ul>
    */
-  private Boolean readMetadata = false;
+  private Boolean readMetadata = null;
 
   /**
    * Determines if parser metadata is automatically merged into the message.  If
    * true, parser metadata values will appear as fields within the message.
    *
-   * <p>By default, this is false and metadata is not merged.
+   * <p>The default is dependent upon the raw message strategy used:
+   * <ul>
+   * <li>The default strategy sets this to false and metadata is not merged by default.</li>
+   * <li>The envelope strategy sets this to true and metadata is merged by default.</li>
+   * </ul>
    */
-  private Boolean mergeMetadata = false;
+  private Boolean mergeMetadata = null;
 
   /**
    * The number of workers for the topology.
@@ -195,6 +205,33 @@ public class SensorParserConfig implements Serializable {
    */
   private Map<String, Object> cacheConfig = new HashMap<>();
 
+  /**
+   * Return the raw message supplier.  This is the strategy to use to extract the raw message and metadata from
+   * the tuple.
+   */
+  private RawMessageStrategy rawMessageStrategy = RawMessageStrategies.DEFAULT;
+
+  /**
+   * The config for the raw message supplier.
+   */
+  private Map<String, Object> rawMessageStrategyConfig = new HashMap<>();
+
+  public RawMessageStrategy getRawMessageStrategy() {
+    return rawMessageStrategy;
+  }
+
+  public void setRawMessageStrategy(String rawMessageSupplierName) {
+    this.rawMessageStrategy = RawMessageStrategies.valueOf(rawMessageSupplierName);
+  }
+
+  public Map<String, Object> getRawMessageStrategyConfig() {
+    return rawMessageStrategyConfig;
+  }
+
+  public void setRawMessageStrategyConfig(Map<String, Object> rawMessageStrategyConfig) {
+    this.rawMessageStrategyConfig = rawMessageStrategyConfig;
+  }
+
   public Map<String, Object> getCacheConfig() {
     return cacheConfig;
   }
@@ -292,7 +329,7 @@ public class SensorParserConfig implements Serializable {
   }
 
   public Boolean getMergeMetadata() {
-    return mergeMetadata;
+    return Optional.ofNullable(mergeMetadata).orElse(getRawMessageStrategy().mergeMetadataDefault());
   }
 
   public void setMergeMetadata(Boolean mergeMetadata) {
@@ -300,7 +337,7 @@ public class SensorParserConfig implements Serializable {
   }
 
   public Boolean getReadMetadata() {
-    return readMetadata;
+    return Optional.ofNullable(readMetadata).orElse(getRawMessageStrategy().readMetadataDefault());
   }
 
   public void setReadMetadata(Boolean readMetadata) {
@@ -414,8 +451,8 @@ public class SensorParserConfig implements Serializable {
             .append(errorTopic, that.errorTopic)
             .append(writerClassName, that.writerClassName)
             .append(errorWriterClassName, that.errorWriterClassName)
-            .append(readMetadata, that.readMetadata)
-            .append(mergeMetadata, that.mergeMetadata)
+            .append(getReadMetadata(), that.getReadMetadata())
+            .append(getMergeMetadata(), that.getMergeMetadata())
             .append(numWorkers, that.numWorkers)
             .append(numAckers, that.numAckers)
             .append(spoutParallelism, that.spoutParallelism)
@@ -430,6 +467,8 @@ public class SensorParserConfig implements Serializable {
             .append(cacheConfig, that.cacheConfig)
             .append(parserConfig, that.parserConfig)
             .append(fieldTransformations, that.fieldTransformations)
+            .append(rawMessageStrategy, that.rawMessageStrategy)
+            .append(rawMessageStrategyConfig, that.rawMessageStrategyConfig)
             .isEquals();
   }
 
@@ -443,8 +482,8 @@ public class SensorParserConfig implements Serializable {
             .append(errorTopic)
             .append(writerClassName)
             .append(errorWriterClassName)
-            .append(readMetadata)
-            .append(mergeMetadata)
+            .append(getReadMetadata())
+            .append(getMergeMetadata())
             .append(numWorkers)
             .append(numAckers)
             .append(spoutParallelism)
@@ -459,6 +498,8 @@ public class SensorParserConfig implements Serializable {
             .append(cacheConfig)
             .append(parserConfig)
             .append(fieldTransformations)
+            .append(rawMessageStrategy)
+            .append(rawMessageStrategyConfig)
             .toHashCode();
   }
 
@@ -472,8 +513,8 @@ public class SensorParserConfig implements Serializable {
             .append("errorTopic", errorTopic)
             .append("writerClassName", writerClassName)
             .append("errorWriterClassName", errorWriterClassName)
-            .append("readMetadata", readMetadata)
-            .append("mergeMetadata", mergeMetadata)
+            .append("readMetadata", getReadMetadata())
+            .append("mergeMetadata", getMergeMetadata())
             .append("numWorkers", numWorkers)
             .append("numAckers", numAckers)
             .append("spoutParallelism", spoutParallelism)
@@ -488,6 +529,8 @@ public class SensorParserConfig implements Serializable {
             .append("cacheConfig", cacheConfig)
             .append("parserConfig", parserConfig)
             .append("fieldTransformations", fieldTransformations)
+            .append("rawMessageStrategy", rawMessageStrategy)
+            .append("rawMessageStrategyConfig", rawMessageStrategyConfig)
             .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/DefaultRawMessageStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/DefaultRawMessageStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/DefaultRawMessageStrategy.java
new file mode 100644
index 0000000..77ce340
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/DefaultRawMessageStrategy.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+
+/**
+ * The default implementation, which defines:
+ * <ul>
+ *   <li>Metadata: The data which comes in via the
+ *   kafka key and the other bits of the tuple from the storm spout (e.g. the topic, etc).
+ *   </li>
+ *   <li>Data: The byte[] that comes across as the kafka value</li>
+ * </ul>
+ */
+public class DefaultRawMessageStrategy implements RawMessageStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+  /**
+   * The default behavior is to use the raw data from kafka value as the message and the raw metadata as the metadata.
+   *
+   * @param rawMetadata The metadata read from kafka Key (e.g. the topic, index, etc.)
+   * @param rawMessage The raw message from the kafka value
+   * @param readMetadata True if we want to read read the metadata
+   * @param config The config for the RawMessageStrategy (See the rawMessageStrategyConfig in the SensorParserConfig)
+   * @return
+   */
+  @Override
+  public RawMessage get(Map<String, Object> rawMetadata, byte[] rawMessage, boolean readMetadata, Map<String, Object> config) {
+    return new RawMessage(rawMessage, rawMetadata);
+  }
+
+  /**
+   * Simple merging of metadata by adding the metadata into the message (if mergeMetadata is set to true).
+   *
+   * @param message The parsed message (note: prior to the field transformations)
+   * @param metadata The metadata passed along
+   * @param mergeMetadata Whether to merge the metadata or not
+   * @param config The config for the message strategy.
+   */
+  @Override
+  public void mergeMetadata(JSONObject message, Map<String, Object> metadata, boolean mergeMetadata, Map<String, Object> config) {
+    if(mergeMetadata) {
+      message.putAll(metadata);
+    }
+  }
+
+  /**
+   * The default mergeMetadata is false.
+   * @return
+   */
+  @Override
+  public boolean mergeMetadataDefault() {
+    return false;
+  }
+
+  /**
+   * The default readMetadata is false.
+   * @return
+   */
+  @Override
+  public boolean readMetadataDefault() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/EnvelopedRawMessageStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/EnvelopedRawMessageStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/EnvelopedRawMessageStrategy.java
new file mode 100644
index 0000000..9ea2e9a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/EnvelopedRawMessageStrategy.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An alternative strategy whereby
+ * <ul>
+ *  <li>The raw data is presumed to be a JSON Map</li>
+ *  <li>The data to be parsed is the contents of one of the fields.</li>
+ *  <li>The non-data fields are considered metadata</li>
+ * </ul>
+ *
+ * Additionally, the defaults around merging and reading metadata are adjusted to be on by default.
+ * Note, this strategy allows for parser chaining and for a fully worked example, check the parser chaining use-case.
+ */
+public class EnvelopedRawMessageStrategy implements RawMessageStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * The field from the rawMessageStrategyConfig in the SensorParserConfig that defines the field to use to
+   * define the data to be parsed.
+   */
+  public static final String MESSAGE_FIELD_CONFIG = "messageField";
+
+  /**
+   * Retrieve the raw message by parsing the JSON Map in the kafka value and pulling the appropriate field.
+   * Also, augment the default metadata with the non-data fields in the JSON Map.
+   *
+   * Note: The data field in the JSON Map is not considered metadata.
+   *
+   * @param rawMetadata The metadata read from kafka Key (e.g. the topic, index, etc.)
+   * @param rawMessage The raw message from the kafka value
+   * @param readMetadata True if we want to read read the metadata
+   * @param config The config for the RawMessageStrategy (See the rawMessageStrategyConfig in the SensorParserConfig)
+   * @return
+   */
+  @Override
+  public RawMessage get(Map<String, Object> rawMetadata, byte[] rawMessage, boolean readMetadata, Map<String, Object> config) {
+    String messageField = (String)config.get(MESSAGE_FIELD_CONFIG);
+    if(messageField == null) {
+      throw new IllegalStateException("You must specify a message field in the message supplier config.  " +
+              "\"messageField\" field was expected but wasn't in the config.");
+    }
+    byte[] envelope = rawMessage;
+
+    try {
+      String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
+      Map<String, Object> extraMetadata = JSONUtils.INSTANCE.load(new String(envelope), JSONUtils.MAP_SUPPLIER);
+      String message = null;
+      if(extraMetadata != null) {
+        for(Map.Entry<String, Object> kv : extraMetadata.entrySet()) {
+          if(kv.getKey().equals(messageField)) {
+            message = (String)kv.getValue();
+          }
+          rawMetadata.put(MetadataUtil.INSTANCE.prefixKey(prefix, kv.getKey()), kv.getValue());
+        }
+      }
+      if(message != null) {
+        if(!readMetadata) {
+          LOG.debug("Ignoring metadata; Message: " + message + " rawMetadata: " + rawMetadata + " and field = " + messageField);
+          return new RawMessage(message.getBytes(), new HashMap<>());
+        }
+        else {
+          //remove the message field from the metadata since it's data, not metadata.
+          rawMetadata.remove(MetadataUtil.INSTANCE.prefixKey(prefix, messageField));
+          LOG.debug("Attaching metadata; Message: " + message + " rawMetadata: " + rawMetadata + " and field = " + messageField);
+          return new RawMessage(message.getBytes(), rawMetadata);
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Expected a JSON Map as the envelope.", e);
+    }
+    return null;
+  }
+
+  /**
+   * Merge the metadata into the original message.  The strategy around duplicate keys is as follows:
+   * <ul>
+   *   <li>If the string is the "original_string" field, then we choose the oldest original string</li>
+   *   <li>For all other fields, the fields from the message hold precidence against metadata fields on collision.</li>
+   * </ul>
+   * @param message The parsed message (note: prior to the field transformations)
+   * @param metadata The metadata passed along
+   * @param mergeMetadata Whether to merge the metadata or not
+   * @param config The config for the message strategy.
+   */
+  @Override
+  public void mergeMetadata(JSONObject message, Map<String, Object> metadata, boolean mergeMetadata, Map<String, Object> config) {
+    //we want to ensure the original string from the metadata, if provided is used
+    String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
+    String originalStringFromMetadata = (String)metadata.get(MetadataUtil.INSTANCE.prefixKey(prefix, Constants.Fields.ORIGINAL.getName()));
+    if(mergeMetadata) {
+      for (Map.Entry<String, Object> kv : metadata.entrySet()) {
+        //and that otherwise we prefer fields from the current message, not the metadata
+        message.putIfAbsent(kv.getKey(), kv.getValue());
+      }
+    }
+    if(originalStringFromMetadata != null) {
+      message.put(Constants.Fields.ORIGINAL.getName(), originalStringFromMetadata);
+    }
+  }
+
+  /**
+   * By default merge metadata.
+   *
+   * @return
+   */
+  @Override
+  public boolean mergeMetadataDefault() {
+    return true;
+  }
+
+  /**
+   * By default read metadata.
+   * @return
+   */
+  @Override
+  public boolean readMetadataDefault() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
new file mode 100644
index 0000000..bfd1622
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Captures some common utility methods around metadata manipulation.
+ */
+public enum MetadataUtil {
+  INSTANCE;
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * The default metadata prefix.
+   */
+  public static final String METADATA_PREFIX = "metron.metadata";
+  /**
+   * The config key for defining the prefix.
+   */
+  public static final String METADATA_PREFIX_CONFIG = "metadataPrefix";
+  static final int KEY_INDEX = 1;
+
+  /**
+   * Return the prefix that we want to use for metadata keys.  This comes from the config and is defaulted to
+   * 'metron.metadata'.
+   *
+   * @param config The rawMessageStrategyConfig
+   * @return
+   */
+  public String getMetadataPrefix(Map<String, Object> config) {
+    String prefix = (String) config.getOrDefault(METADATA_PREFIX_CONFIG, METADATA_PREFIX);
+    if(StringUtils.isEmpty(prefix)) {
+      return null;
+    }
+    return prefix;
+  }
+
+  /**
+   * Take a field and prefix it with the metadata key.
+   *
+   * @param prefix The metadata prefix to use (e.g. 'foo')
+   * @param key The key name (e.g. my_field)
+   * @return The prefixed key separated by a . (e.g. foo.my_field)
+   */
+  public String prefixKey(String prefix, String key) {
+    if(StringUtils.isEmpty(prefix)) {
+      return key;
+    }
+    else {
+      return prefix + "." + key;
+    }
+  }
+
+  /**
+   * Default extraction of metadata.  This handles looking in the normal places for metadata
+   * <ul>
+   *   <li>The kafka key</li>
+   *   <li>The tuple fields outside of the value (e.g. the topic)</li>
+   * </ul>
+   * In addition to extracting the metadata into a map, it applies the appropriate prefix (as configured in the rawMessageStrategyConfig).
+   * @param prefix
+   * @param t
+   * @return
+   */
+  public Map<String, Object> extractMetadata(String prefix, Tuple t) {
+    Map<String, Object> metadata = new HashMap<>();
+    if(t == null) {
+      return metadata;
+    }
+    Fields tupleFields = t.getFields();
+    if(tupleFields == null) {
+      return metadata;
+    }
+    for (int i = 2; i < tupleFields.size(); ++i) {
+      String envMetadataFieldName = tupleFields.get(i);
+      Object envMetadataFieldValue = t.getValue(i);
+      if (!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue != null) {
+        metadata.put(prefixKey(prefix, envMetadataFieldName), envMetadataFieldValue);
+      }
+    }
+    byte[] keyObj = t.getBinary(KEY_INDEX);
+    String keyStr = null;
+    try {
+      keyStr = keyObj == null ? null : new String(keyObj);
+      if (!StringUtils.isEmpty(keyStr)) {
+        Map<String, Object> rawMetadata = JSONUtils.INSTANCE.load(keyStr, JSONUtils.MAP_SUPPLIER);
+        for (Map.Entry<String, Object> kv : rawMetadata.entrySet()) {
+          metadata.put(prefixKey(prefix, kv.getKey()), kv.getValue());
+        }
+
+      }
+    } catch (IOException e) {
+      String reason = "Unable to parse metadata; expected JSON Map: " + (keyStr == null ? "NON-STRING!" : keyStr);
+      LOG.error(reason, e);
+      throw new IllegalStateException(reason, e);
+    }
+    return metadata;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessage.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessage.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessage.java
new file mode 100644
index 0000000..5fc9971
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessage.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * A holder class for the message and metadata
+ */
+public class RawMessage {
+  byte[] message;
+  Map<String, Object> metadata;
+
+  public RawMessage(byte[] message, Map<String, Object> metadata) {
+    this.message = message;
+    this.metadata = metadata;
+  }
+
+  /**
+   * Get the data to be parsed.
+   * @return
+   */
+  public byte[] getMessage() {
+    return message;
+  }
+
+  public void setMessage(byte[] message) {
+    this.message = message;
+  }
+
+  /**
+   * Get the metadata to use based on the RawMessageStrategy.
+   * @return
+   */
+  public Map<String, Object> getMetadata() {
+    return metadata;
+  }
+
+  public void setMetadata(Map<String, Object> metadata) {
+    this.metadata = metadata;
+  }
+
+  @Override
+  public String toString() {
+    return "RawMessage{" +
+            "message=" + Arrays.toString(message) +
+            ", metadata=" + metadata +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RawMessage that = (RawMessage) o;
+
+    if (!Arrays.equals(getMessage(), that.getMessage())) return false;
+    return getMetadata() != null ? getMetadata().equals(that.getMetadata()) : that.getMetadata() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Arrays.hashCode(getMessage());
+    result = 31 * result + (getMetadata() != null ? getMetadata().hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategies.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategies.java
new file mode 100644
index 0000000..1b2081e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategies.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import org.json.simple.JSONObject;
+
+import java.util.Map;
+
+/**
+ * The strategies which we can use to interpret data and metadata.  This fits the normal enum pattern
+ * that we use elsewhere for strategy pattern.
+ */
+public enum RawMessageStrategies implements RawMessageStrategy {
+  /**
+   * The default strategy
+   */
+  DEFAULT(new DefaultRawMessageStrategy()),
+  /**
+   * Enveloping strategy, used for parser chaining.
+   */
+  ENVELOPE(new EnvelopedRawMessageStrategy())
+  ;
+  RawMessageStrategy supplier;
+  RawMessageStrategies(RawMessageStrategy supplier) {
+    this.supplier = supplier;
+  }
+
+  /**
+   * Retrieve the raw message given the strategy specified. Note the javadocs for the individual strategy for more info.
+   *
+   * @param rawMetadata The metadata read from kafka Key (e.g. the topic, index, etc.)
+   * @param originalMessage
+   * @param readMetadata True if we want to read read the metadata
+   * @param config The config for the RawMessageStrategy (See the rawMessageStrategyConfig in the SensorParserConfig)
+   * @return
+   */
+  @Override
+  public RawMessage get(Map<String, Object> rawMetadata, byte[] originalMessage, boolean readMetadata, Map<String, Object> config) {
+    return this.supplier.get(rawMetadata, originalMessage, readMetadata, config);
+  }
+
+  /**
+   * Merge metadata given the strategy specified. Note the javadocs for the individual strategy for more info.
+   *
+   * @param message The parsed message (note: prior to the field transformations)
+   * @param metadata The metadata passed along
+   * @param mergeMetadata Whether to merge the metadata or not
+   * @param config The config for the message strategy.
+   */
+  @Override
+  public void mergeMetadata(JSONObject message, Map<String, Object> metadata, boolean mergeMetadata, Map<String, Object> config) {
+    this.supplier.mergeMetadata(message, metadata, mergeMetadata, config);
+  }
+
+  @Override
+  public boolean mergeMetadataDefault() {
+    return this.supplier.mergeMetadataDefault();
+  }
+
+  @Override
+  public boolean readMetadataDefault() {
+    return this.supplier.readMetadataDefault();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategy.java
new file mode 100644
index 0000000..c93389d
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageStrategy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * This is a strategy which defines how parsers:
+ * <ul>
+ *   <li>Define what data constitutes the parseable message</li>
+ *   <li>Define what data constitutes the metadata</li>
+ * </ul>
+ * Also, each strategy has the ability to define its own defaults around whether
+ * metadata is read or merged.
+ */
+public interface RawMessageStrategy extends Serializable {
+  /**
+   * Retrieve the RawMessage (e.g. the data and metadata) given raw data and raw metadata read from kafka.
+   * Note that the base metadata from kafka key and tuples, etc. along with prefixing is handled in the MetadataUtil.
+   * This is intended for individual strategies to append OTHER metadata.
+   *
+   * @param rawMetadata The metadata read from kafka Key (e.g. the topic, index, etc.)
+   * @param rawMessage The raw message from the kafka value
+   * @param readMetadata True if we want to read read the metadata
+   * @param config The config for the RawMessageStrategy (See the rawMessageStrategyConfig in the SensorParserConfig)
+   * @return The RawMessage, which defines the data and metadata
+   */
+  RawMessage get( Map<String, Object> rawMetadata
+                , byte[] rawMessage
+                , boolean readMetadata
+                , Map<String, Object> config
+                );
+
+  /**
+   * Merge the metadata into the message. Note: Each strategy may merge differently based on their own config.
+   *
+   * @param message The parsed message (note: prior to the field transformations)
+   * @param metadata The metadata passed along
+   * @param mergeMetadata Whether to merge the metadata or not
+   * @param config The config for the message strategy.
+   */
+  void mergeMetadata( JSONObject message
+                    , Map<String, Object> metadata
+                    , boolean mergeMetadata
+                    , Map<String, Object> config
+                    );
+
+  /**
+   * The default value for merging metadata.
+   * @return
+   */
+  boolean mergeMetadataDefault();
+
+  /**
+   * The default value for reading metadata.
+   * @return
+   */
+  boolean readMetadataDefault();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
new file mode 100644
index 0000000..91a80f1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum RawMessageUtil {
+
+  INSTANCE;
+
+
+  /**
+   * Extract the raw message given the strategy, the tuple and the metadata configs.
+   * @param strategy
+   * @param t
+   * @param rawMessage
+   * @param readMetadata
+   * @param config
+   * @return
+   */
+  public RawMessage getRawMessage(RawMessageStrategy strategy, Tuple t, byte[] rawMessage, boolean readMetadata, Map<String, Object> config) {
+    Map<String, Object> metadata = new HashMap<>();
+    if(readMetadata) {
+      String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
+      metadata = MetadataUtil.INSTANCE.extractMetadata(prefix, t);
+    }
+    return strategy.get(metadata, rawMessage, readMetadata, config);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index 06603a3..27b0469 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -18,6 +18,8 @@
 package org.apache.metron.common.bolt;
 
 import org.apache.log4j.Level;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.field.validation.FieldValidation;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
@@ -126,7 +128,34 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
     sampleConfigurations.updateSensorParserConfig(sensorType, testSensorConfig);
     ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
     waitForConfigUpdate(sensorType);
-    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.getConfigurations());
+    ParserConfigurations configuredBoltConfigs = configuredBolt.getConfigurations();
+    if(!sampleConfigurations.equals(configuredBoltConfigs)) {
+      //before we fail, let's try to dump out some info.
+      if(sampleConfigurations.getFieldValidations().size() != configuredBoltConfigs.getFieldValidations().size()) {
+        System.out.println("Field validations don't line up");
+      }
+      for(int i = 0;i < sampleConfigurations.getFieldValidations().size();++i) {
+        FieldValidator l = sampleConfigurations.getFieldValidations().get(i);
+        FieldValidator r = configuredBoltConfigs.getFieldValidations().get(i);
+        if(!l.equals(r)) {
+          System.out.println(l + " != " + r);
+        }
+      }
+      if(sampleConfigurations.getConfigurations().size() != configuredBoltConfigs.getConfigurations().size()) {
+        System.out.println("Configs don't line up");
+      }
+      for(Map.Entry<String, Object> kv : sampleConfigurations.getConfigurations().entrySet() ) {
+        Object l = kv.getValue();
+        Object r = configuredBoltConfigs.getConfigurations().get(kv.getKey());
+        if(!l.equals(r)) {
+          System.out.println(kv.getKey() + " config does not line up: " );
+          System.out.println(l);
+          System.out.println(r);
+        }
+      }
+      Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBoltConfigs);
+    }
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBoltConfigs);
     configuredBolt.cleanup();
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
new file mode 100644
index 0000000..e5fd80f
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message.metadata;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.eq;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class RawMessageUtilTest {
+
+  private static Tuple createTuple(Map<String, Object> kafkaFields, String metadata) throws Exception {
+    List<Map.Entry<String, Object>> fields = new ArrayList<>();
+    for(Map.Entry<String, Object> kv : kafkaFields.entrySet()) {
+      fields.add(kv);
+    }
+
+    Tuple t = mock(Tuple.class);
+    Fields f = mock(Fields.class);
+    when(f.size()).thenReturn(fields.size()+2);
+
+    for(int i = 0;i < fields.size();++i) {
+      when(f.get(eq(i + 2))).thenReturn(fields.get(i).getKey());
+      when(t.getValue(eq(i + 2))).thenReturn(fields.get(i).getValue());
+    }
+
+    when(t.getFields()).thenReturn(f);
+    when(t.getBinary(eq(MetadataUtil.KEY_INDEX))).thenReturn(metadata.getBytes());
+    return t;
+  }
+
+  private void checkKafkaMetadata(RawMessage m, boolean isEmpty) {
+    if(!isEmpty) {
+      Assert.assertEquals("kafka_meta_1_val", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + ".kafka_meta_1"));
+      Assert.assertEquals("kafka_meta_2_val", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + ".kafka_meta_2"));
+    }
+    else {
+      Assert.assertFalse(m.getMetadata().containsKey(MetadataUtil.METADATA_PREFIX + ".kafka_meta_1"));
+      Assert.assertFalse(m.getMetadata().containsKey(MetadataUtil.METADATA_PREFIX + ".kafka_meta_2"));
+    }
+  }
+
+  private void checkAppMetadata(RawMessage m, boolean isEmpty) {
+    if(!isEmpty) {
+      Assert.assertEquals("app_meta_1_val", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + ".app_meta_1"));
+      Assert.assertEquals("app_meta_2_val", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + ".app_meta_2"));
+    }
+    else {
+      Assert.assertFalse(m.getMetadata().containsKey(MetadataUtil.METADATA_PREFIX + ".app_meta_1"));
+      Assert.assertFalse(m.getMetadata().containsKey(MetadataUtil.METADATA_PREFIX + ".app_meta_2"));
+    }
+  }
+
+  public static Map<String, Object> kafkaMetadata = ImmutableMap.of("kafka_meta_1", "kafka_meta_1_val", "kafka_meta_2", "kafka_meta_2_val");
+
+  /**
+   * {
+   *   "app_meta_1" : "app_meta_1_val",
+   *   "app_meta_2" : "app_meta_2_val"
+   * }
+   */
+  @Multiline
+  public static String appMetadata;
+
+  @Test
+  public void testDefaultStrategy_withKafkaMetadata_withAppMetadata() throws Exception {
+    Tuple t = createTuple( kafkaMetadata
+                         , appMetadata);
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), true, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      checkKafkaMetadata(m, false);
+      checkAppMetadata(m, false);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), false, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  @Test
+  public void testDefaultStrategy_withKafkaMetadata_withoutAppMetadata() throws Exception {
+    Tuple t = createTuple(kafkaMetadata
+                         ,"{}");
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), true, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      checkKafkaMetadata(m, false);
+      checkAppMetadata(m, true);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), false, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  @Test
+  public void testDefaultStrategy_withoutKafkaMetadata_withAppMetadata() throws Exception {
+    Tuple t = createTuple(new HashMap<>() ,appMetadata);
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), true, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      checkKafkaMetadata(m, true);
+      checkAppMetadata(m, false);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), false, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  @Test
+  public void testDefaultStrategy_withoutKafkaMetadata_withoutAppMetadata() throws Exception {
+    Tuple t = createTuple(new HashMap<>() , "{}");
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), true, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      checkKafkaMetadata(m, true);
+      checkAppMetadata(m, true);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage(RawMessageStrategies.DEFAULT, t, "raw_message".getBytes(), false, new HashMap<>());
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  /**
+   * {
+   *   "data" : "raw_message",
+   *   "original_string" : "real_original_string",
+   *   "enveloped_metadata_field_1" : "enveloped_metadata_val_1",
+   *   "enveloped_metadata_field_2" : "enveloped_metadata_val_2"
+   * }
+   */
+  @Multiline
+  public static String envelopedData;
+
+  public static JSONObject envelopedMessage = new JSONObject() {{
+    put("message_field1", "message_val1");
+    put(Constants.Fields.ORIGINAL.getName(), "envelope_message_val");
+  }};
+
+  private void checkEnvelopeMetadata(RawMessage m) {
+    Assert.assertEquals("real_original_string", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + "." + Constants.Fields.ORIGINAL.getName()));
+    Assert.assertEquals("enveloped_metadata_val_1", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + ".enveloped_metadata_field_1"));
+    Assert.assertEquals("enveloped_metadata_val_2", m.getMetadata().get(MetadataUtil.METADATA_PREFIX + ".enveloped_metadata_field_2"));
+  }
+
+  private void checkMergedData(RawMessage m) {
+    JSONObject message = new JSONObject(envelopedMessage);
+    RawMessageStrategies.ENVELOPE.mergeMetadata(message, m.getMetadata(), true, new HashMap<String, Object>() {});
+    if(m.getMetadata().containsKey(MetadataUtil.METADATA_PREFIX + "." +Constants.Fields.ORIGINAL.getName())) {
+      Assert.assertEquals(m.getMetadata().get(MetadataUtil.METADATA_PREFIX + "." + Constants.Fields.ORIGINAL.getName()), message.get(Constants.Fields.ORIGINAL.getName()));
+    }
+    Assert.assertEquals("message_val1", message.get("message_field1"));
+  }
+
+  @Test
+  public void testEnvelopeStrategy_withKafkaMetadata_withAppMetadata() throws Exception {
+    Tuple t = createTuple( kafkaMetadata
+                         , appMetadata);
+    Map<String, Object> config = ImmutableMap.of(EnvelopedRawMessageStrategy.MESSAGE_FIELD_CONFIG, "data");
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), true, config);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkEnvelopeMetadata(m);
+      checkMergedData(m);
+      checkKafkaMetadata(m, false);
+      checkAppMetadata(m, false);
+
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), false, config);
+      checkMergedData(m);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  @Test
+  public void testEnvelopeStrategy_withKafkaMetadata_withoutAppMetadata() throws Exception {
+    Tuple t = createTuple(kafkaMetadata
+                         ,"{}");
+    Map<String, Object> config = ImmutableMap.of(EnvelopedRawMessageStrategy.MESSAGE_FIELD_CONFIG, "data");
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), true, config);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkMergedData(m);
+      checkEnvelopeMetadata(m);
+      checkKafkaMetadata(m, false);
+      checkAppMetadata(m, true);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), false, config);
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkMergedData(m);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  @Test
+  public void testEnvelopeStrategy_withoutKafkaMetadata_withAppMetadata() throws Exception {
+    Tuple t = createTuple(new HashMap<>() ,appMetadata);
+    Map<String, Object> config = ImmutableMap.of(EnvelopedRawMessageStrategy.MESSAGE_FIELD_CONFIG, "data");
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), true, config);
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkMergedData(m);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      checkEnvelopeMetadata(m);
+      checkKafkaMetadata(m, true);
+      checkAppMetadata(m, false);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), false, config);
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkMergedData(m);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+  @Test
+  public void testEnvelopeStrategy_withoutKafkaMetadata_withoutAppMetadata() throws Exception {
+    Tuple t = createTuple(new HashMap<>() , "{}");
+    Map<String, Object> config = ImmutableMap.of(EnvelopedRawMessageStrategy.MESSAGE_FIELD_CONFIG, "data");
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), true, config);
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkMergedData(m);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      checkEnvelopeMetadata(m);
+      checkKafkaMetadata(m, true);
+      checkAppMetadata(m, true);
+    }
+    {
+      RawMessage m = RawMessageUtil.INSTANCE.getRawMessage( RawMessageStrategies.ENVELOPE, t, envelopedData.getBytes(), false, config);
+      Assert.assertFalse(m.getMetadata().containsKey("data"));
+      checkMergedData(m);
+      Assert.assertEquals("raw_message", new String(m.getMessage()));
+      Assert.assertTrue(m.getMetadata().isEmpty());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
index 5ebc416..6babb37 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -45,6 +45,7 @@ import org.apache.storm.flux.parser.FluxParser;
 import org.apache.storm.generated.KillOptions;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.thrift.TException;
+import org.apache.storm.thrift.protocol.TProtocolException;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -184,6 +185,14 @@ public class FluxTopologyComponent implements InMemoryComponent {
                       "If tests fail, we'll have to find a better way of killing them.", ise);
             }
         }
+        catch(RuntimeException re) {
+          if(re.getCause() instanceof TProtocolException) {
+            //let this go, it's some intermittent weirdness.
+          }
+          else {
+            throw re;
+          }
+        }
       }
       catch(Throwable t) {
         LOG.error(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-parsers/ParserChaining.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/ParserChaining.md b/metron-platform/metron-parsers/ParserChaining.md
new file mode 100644
index 0000000..a28f11d
--- /dev/null
+++ b/metron-platform/metron-parsers/ParserChaining.md
@@ -0,0 +1,179 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Parser Chaining
+
+Aggregating many different types sensors into a single data source (e.g.
+syslog) and ingesting that aggregate sensor into Metron is a common pattern.  It 
+is not obvious precisely how to manage these types of aggregate sensors 
+as they require two-pass parsing.  This document will walk through an
+example of supporting this kind of multi-pass ingest.
+
+Multi-pass parser involves the following requirements:
+* The enveloping parser (e.g. the aggregation format such as syslog or
+  plain CSV) may contain metadata which should be ingested along with the data.
+* The enveloping sensor contains many different sensor types
+
+# High Level Solution
+
+![High Level Approach](../../use-cases/parser_chaining/message_routing_high_level.svg)
+
+At a high level, we continue to maintain the architectural invariant of
+a 1-1 relationship between logical sensors and storm topologies.
+Eventually this relationship may become more complex, but at the moment
+the approach is to construct a routing parser which will have two
+responsibilities:
+* Parse the envelope (e.g. syslog data) and extract any metadata fields
+  from the envelope to pass along
+* Route the unfolded data to the appropriate kafka topic associated with
+  the enveloped sensor data
+
+Because the data emitted from the routing parser is just like any data
+emitted from any other parser, in that it is a JSON blob like any
+data emitted from any parser, we will need to adjust the downstream
+parsers to extract the enveloped data from the JSON blob and treat it as
+the data to parse.
+
+# Architecting a Parser Chaining Solution in Metron
+
+Currently the approach to fulfill this requirement involves a couple
+knobs in the Parser infrastructure for Metron.
+
+Consider the case, for instance,
+where we have many different TYPES of messages wrapped inside of syslog.
+As an architectural abstraction, we would want to have the following
+properties:
+* separate the concerns of parsing the individual types of messages from
+  each other
+* separate the concerns of parsing the individual types of messages from
+  parsing the envelope
+
+## Data Dependent Parser Writing
+
+Parsers allow users to configure the topic which the kafka producer uses
+in a couple of ways (from the parser config in an individual parser):
+* `kafka.topic` - Specify the topic in the config.  This can be updated by updating the config, but it is data independent (e.g. not dependent on the data in a message).  
+* `kafka.topicField` - Specify the topic as the value of a particular field.  If unpopulated, then the message is dropped.  This is inherrently data dependent.
+
+The `kafka.topicField` parameter allows for data dependent topic
+selection and this inherrently enables the routing capabilities
+necessary for handling enveloped data. 
+
+## Flexibly Interpreting Data
+
+### Aside: The Role of Metadata in Metron
+
+Before we continue, let's briefly talk about metadata.  We have exposed
+the ability to pass along metadata and interact with metadata in a
+decoupled way from the actual parser logic (i.e. the GrokParser should
+not have to consider how to interpret metadata).
+
+There are three choices about manipulating metadata in Metron:
+* Should you merge metadata into the downstream message?
+* If you do, should you use a key prefix to set it off from the message
+  by default?
+
+This enables users to specify metadata independent of the data that is
+persisted downstream and can inform the operations of enrichment and the
+profiler.
+
+### Interpretation
+
+Now that we have an approach which enables the routing of the data, the
+remaining question is how to decouple _parsing_ data from _interpreting_
+data and metadata.  By default, Metron operates like so:
+* The kafka record key (as a JSON Map) is considered metadata
+* The kafka record value is considered data
+
+Beyond that, we presume defaults for this default strategy around
+handling metadata.  In particular, by default we do not merge metadata
+and use a `metron.metadata` prefix for all metadata.
+
+In order to enable chained parser WITH metadata, we allow the following
+to be specified via strategy in the parser config:
+* How to extract the data from the kafka record
+* How to extract the metadata from the kafka record
+* The default operations for merging
+* The prefix for the metadata key
+
+The available strategies, specified by the `rawMessageStrategy`
+configuration is either`ENVELOPE` or `DEFAULT`.
+
+Specifically, to enable parsing enveloped data (i.e. data in a field of a JSON
+blob with the other fields being metadata), one can specify the strategy
+and configuration of that strategy in the parser config.  One must
+specify the `rawMessageStrategy` as `ENVELOPE` in the parser and the
+`rawMessageStrategyConfig` to indicate the field which contains the
+data.
+
+Together with routing, we have the complete solution to chain parsers which can:
+* parse the envelope
+* route the parsed data to specific parsers
+* have the specific parsers interpret the data via the `rawMessageStrategy` whereby they pull the data out from JSON Map that they receive
+
+Together this enables a directed acyclic graph of parsers to handle single or multi-layer parsing.
+
+### Example
+For a complete example, look at the [parser chaining use-case](../../use-cases/parser_chaining), however for a simple example the following should suffice.
+
+If I want to configure a CSV parser to parse data which has 3 columns `f1`, `f2` and `f3` and is
+held in a field called `payload` inside of a JSON Map, I can do so like
+this:
+```
+{
+  "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+  ,"sensorTopic" : "my_topic"
+  ,"rawMessageStrategy" : "ENVELOPE"
+  ,"rawMessageStrategyConfig" : {
+      "messageField" : "payload",
+      "metadataPrefix" : ""
+  }
+  , "parserConfig": {
+     "columns" : { "f1": 0,
+                 , "f2": 1,
+                 , "f3": 2
+                 } 
+   }
+}
+```
+
+This would parse the following message:
+```
+{
+  "meta_f1" : "val1",
+  "payload" : "foo,bar,grok",
+  "original_string" : "2019 Jul, 01: val1 foo,bar,grok",
+  "timestamp" : 10000
+}
+```
+into
+```
+{
+  "meta_f1" : "val1",
+  "f1" : "foo",
+  "f2" : "bar",
+  "f3" : "grok",
+  "original_string" : "2019 Jul, 01: val1 foo,bar,grok",
+  "timestamp" : 10002
+}
+```
+
+Note a couple of things here:
+* The metadata field `meta_f1` is not prefixed here because we configured the strategy with `metadataPrefix` as empty string.
+* The `timestamp` is not inherited from the metadata
+* The `original_string` is inherited from the metadata

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index c453758..43bcc4a 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -159,8 +159,10 @@ Example Stellar Filter which includes messages which contain a the `field1` fiel
 ```
 * `sensorTopic` : The kafka topic to send the parsed messages to.  If the topic is prefixed and suffixed by `/` 
 then it is assumed to be a regex and will match any topic matching the pattern (e.g. `/bro.*/` would match `bro_cust0`, `bro_cust1` and `bro_cust2`)
-* `readMetadata` : Boolean indicating whether to read metadata or not (`false` by default).  See below for a discussion about metadata.
-* `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (`false` by default).  See below for a discussion about metadata.
+* `readMetadata` : Boolean indicating whether to read metadata or not (The default is raw message strategy dependent).  See below for a discussion about metadata.
+* `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (The default is raw message strategy dependent).  See below for a discussion about metadata.
+* `rawMessageStrategy` : The strategy to use when reading the raw data and metadata.  See below for a discussion about message reading strategies.
+* `rawMessageStrategyConfig` : The raw message strategy configuration map.  See below for a discussion about message reading strategies.
 * `parserConfig` : A JSON Map representing the parser implementation specific configuration. Also include batch sizing and timeout for writer configuration here.
   * `batchSize` : Integer indicating number of records to batch together before sending to the writer. (default to `15`)
   * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met.  Optional.
@@ -213,16 +215,33 @@ As such, there are two types of metadata that we seek to support in Metron:
    * At the moment, only the kafka topic is kept as the field name.
 * Custom metadata: Custom metadata from an individual telemetry source that one might want to use within Metron. 
 
-Metadata is controlled by two fields in the parser:
+Metadata is controlled by the following parser configs:
+* `rawMessageStrategy` : This is a strategy which indicates how to read
+  data and metadata.  The strategies supported are:
+  * `DEFAULT` : Data is read directly from the kafka record value and metadata, if any, is read from the kafka record key.  This strategy defaults to not reading metadata and not merging metadata.  This is the default strategy.
+  * `ENVELOPE` : Data from kafka record value is presumed to be a JSON blob. One of
+    these fields must contain the raw data to pass to the parser.  All other fields should be considered metadata.  The field containing the raw data is specified in the `rawMessageStrategyConfig`.  Data held in the kafka key as well as the non-data fields in the JSON blob passed into the kafka value are considered metadata. Note that the exception to this is that any `original_string` field is inherited from the envelope data so that the original string contains the envelope data.  If you do not prefer this behavior, remove this field from the envelope data.
+* `rawMessageStrategyConfig` : The configuration (a map) for the `rawMessageStrategy`.  Available configurations are strategy dependent:
+  * `DEFAULT` 
+    * `metadataPrefix` defines the key prefix for metadata (default is `metron.metadata`).
+  * `ENVELOPE` 
+    * `metadataPrefix` defines the key prefix for metadata (default is `metron.metadata`) 
+    * `messageField` defines the field from the envelope to use as the data.  All other fields are considered metadata.
 * `readMetadata` : This is a boolean indicating whether metadata will be read and made available to Field 
-transformations (i.e. Stellar field transformations).  The default is `false`.
-* `mergeMetadata` : This is a boolean indicating whether metadata fields will be merged with the message automatically.  
-That is to say, if this property is set to `true` then every metadata field will become part of the messages and, 
-consequently, also available for use in field transformations.
+transformations (i.e. Stellar field transformations).  The default is
+dependent upon the `rawMessageStrategy`:
+  * `DEFAULT` : default to `false`.
+  * `ENVELOPE` : default to `true`.
+* `mergeMetadata` : This is a boolean indicating whether metadata fields will be merged with the message automatically.  That is to say, if this property is set to `true` then every metadata field will become part of the messages and, consequently, also available for use in field transformations.  The default is dependent upon the `rawMessageStrategy`:
+  * `DEFAULT` : default to `false`.
+  * `ENVELOPE` : default to `true`.
+
+
 #### Field Naming
 
-In order to avoid collisions from metadata fields, metadata fields will be prefixed with `metron.metadata.`.  
-So, for instance the kafka topic would be in the field `metron.metadata.topic`.
+In order to avoid collisions from metadata fields, metadata fields will
+be prefixed (the default is `metron.metadata.`, but this is configurable
+in the `rawMessageStrategyConfig`).  So, for instance the kafka topic would be in the field `metron.metadata.topic`.
 
 #### Specifying Custom Metadata
 Custom metadata is specified by sending a JSON Map in the key.  If no key is sent, then, obviously, no metadata will be parsed.

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 0e9b48f..f68c670 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.parsers.bolt;
 
-import static org.apache.metron.common.Constants.METADATA_PREFIX;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import java.io.IOException;
@@ -43,11 +42,12 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
 
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
-  private static final int KEY_INDEX = 1;
+
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputCollector collector;
   private MessageParser<JSONObject> parser;
@@ -169,7 +169,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
     Map<String, Object> conf = super.getComponentConfiguration();
     if (conf == null) {
-      conf = new HashMap<String, Object>();
+      conf = new HashMap<>();
     }
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, requestedTickFreqSecs);
     LOG.info("Requesting " + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + " set to " + Integer.toString(requestedTickFreqSecs));
@@ -231,37 +231,6 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     StellarFunctions.initialize(stellarContext);
   }
 
-  private Map<String, Object> getMetadata(Tuple t, boolean readMetadata) {
-    Map<String, Object> ret = new HashMap<>();
-    if(!readMetadata) {
-      return ret;
-    }
-    Fields tupleFields = t.getFields();
-    for(int i = 2;i < tupleFields.size();++i) {
-      String envMetadataFieldName = tupleFields.get(i);
-      Object envMetadataFieldValue = t.getValue(i);
-      if(!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue != null) {
-        ret.put(METADATA_PREFIX + envMetadataFieldName, envMetadataFieldValue);
-      }
-    }
-    byte[] keyObj = t.getBinary(KEY_INDEX);
-    String keyStr = null;
-    try {
-      keyStr = keyObj == null?null:new String(keyObj);
-      if(!StringUtils.isEmpty(keyStr)) {
-        Map<String, Object> metadata = JSONUtils.INSTANCE.load(keyStr,JSONUtils.MAP_SUPPLIER);
-        for(Map.Entry<String, Object> kv : metadata.entrySet()) {
-          ret.put(METADATA_PREFIX + kv.getKey(), kv.getValue());
-        }
-
-      }
-    } catch (IOException e) {
-        String reason = "Unable to parse metadata; expected JSON Map: " + (keyStr == null?"NON-STRING!":keyStr);
-        LOG.error(reason, e);
-        throw new IllegalStateException(reason, e);
-      }
-    return ret;
-  }
 
   @SuppressWarnings("unchecked")
   @Override
@@ -277,22 +246,31 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
       }
       return;
     }
-    byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
     SensorParserConfig sensorParserConfig = getSensorParserConfig();
+    byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
     try {
       //we want to ack the tuple in the situation where we have are not doing a bulk write
       //otherwise we want to defer to the writerComponent who will ack on bulk commit.
       boolean ackTuple = !writer.handleAck();
       int numWritten = 0;
       if(sensorParserConfig != null) {
-        Map<String, Object> metadata = getMetadata(tuple, sensorParserConfig.getReadMetadata());
+        RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
+                                                                   , tuple
+                                                                   , originalMessage
+                                                                   , sensorParserConfig.getReadMetadata()
+                                                                   , sensorParserConfig.getRawMessageStrategyConfig()
+                                                                   );
+        Map<String, Object> metadata = rawMessage.getMetadata();
         List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
-        Optional<List<JSONObject>> messages = parser.parseOptional(originalMessage);
+
+        Optional<List<JSONObject>> messages = parser.parseOptional(rawMessage.getMessage());
         for (JSONObject message : messages.orElse(Collections.emptyList())) {
+          sensorParserConfig.getRawMessageStrategy().mergeMetadata( message
+                                                                  , metadata
+                                                                  , sensorParserConfig.getMergeMetadata()
+                                                                  , sensorParserConfig.getRawMessageStrategyConfig()
+                                                                  );
           message.put(Constants.SENSOR_TYPE, getSensorType());
-          if(sensorParserConfig.getMergeMetadata()) {
-            message.putAll(metadata);
-          }
           for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
             if (handler != null) {
               if(!sensorParserConfig.getMergeMetadata()) {

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/EnvelopedParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/EnvelopedParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/EnvelopedParserIntegrationTest.java
new file mode 100644
index 0000000..c017415
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/EnvelopedParserIntegrationTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.integration;
+
+import com.google.common.collect.ImmutableList;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.message.metadata.MetadataUtil;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.integration.ProcessorResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EnvelopedParserIntegrationTest {
+
+  /**
+   *  {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+   *   ,"sensorTopic":"test"
+   *   ,"rawMessageStrategy" : "ENVELOPE"
+   *   ,"rawMessageStrategyConfig" : {
+   *       "messageField" : "data"
+   *   }
+   *   ,"parserConfig": {
+   *     "columns" : {
+   *      "field1" : 0,
+   *      "timestamp" : 1
+   *     }
+   *   }
+   * }
+   */
+  @Multiline
+  public static String parserConfig_default;
+
+  @Test
+  public void testEnvelopedData() throws IOException {
+    ParserDriver driver = new ParserDriver("test", parserConfig_default, "{}");
+    Map<String, Object> inputRecord = new HashMap<String, Object>() {{
+      put(Constants.Fields.ORIGINAL.getName(), "real_original_string");
+      put("data", "field1_val,100");
+      put("metadata_field", "metadata_val");
+    }};
+    ProcessorResult<List<byte[]>> results = driver.run(ImmutableList.of(JSONUtils.INSTANCE.toJSONPretty(inputRecord)));
+    Assert.assertFalse(results.failed());
+    List<byte[]> resultList = results.getResult();
+    Assert.assertEquals(1, resultList.size());
+    Map<String, Object> outputRecord = JSONUtils.INSTANCE.load(new String(resultList.get(0)), JSONUtils.MAP_SUPPLIER);
+    Assert.assertEquals("field1_val", outputRecord.get("field1"));
+    Assert.assertEquals(inputRecord.get(Constants.Fields.ORIGINAL.getName()), outputRecord.get(Constants.Fields.ORIGINAL.getName()));
+    Assert.assertEquals(inputRecord.get(MetadataUtil.METADATA_PREFIX + ".metadata_field"), outputRecord.get("metadata_field"));
+
+  }
+
+  /**
+   *  {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+   *   ,"sensorTopic":"test"
+   *   ,"rawMessageStrategy" : "ENVELOPE"
+   *   ,"rawMessageStrategyConfig" : {
+   *       "messageField" : "data",
+   *       "metadataPrefix" : ""
+   *   }
+   *   ,"parserConfig": {
+   *     "columns" : {
+   *      "field1" : 0,
+   *      "timestamp" : 1
+   *     }
+   *   }
+   * }
+   */
+  @Multiline
+  public static String parserConfig_withPrefix;
+
+  @Test
+  public void testEnvelopedData_withMetadataPrefix() throws IOException {
+    ParserDriver driver = new ParserDriver("test", parserConfig_withPrefix, "{}");
+    Map<String, Object> inputRecord = new HashMap<String, Object>() {{
+      put(Constants.Fields.ORIGINAL.getName(), "real_original_string");
+      put("data", "field1_val,100");
+      put("metadata_field", "metadata_val");
+    }};
+    ProcessorResult<List<byte[]>> results = driver.run(ImmutableList.of(JSONUtils.INSTANCE.toJSONPretty(inputRecord)));
+    Assert.assertFalse(results.failed());
+    List<byte[]> resultList = results.getResult();
+    Assert.assertEquals(1, resultList.size());
+    Map<String, Object> outputRecord = JSONUtils.INSTANCE.load(new String(resultList.get(0)), JSONUtils.MAP_SUPPLIER);
+    Assert.assertEquals("field1_val", outputRecord.get("field1"));
+    Assert.assertEquals(inputRecord.get(Constants.Fields.ORIGINAL.getName()), outputRecord.get(Constants.Fields.ORIGINAL.getName()));
+    Assert.assertEquals(inputRecord.get("metadata_field"), outputRecord.get("metadata_field"));
+
+  }
+/**
+   *  {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+   *   ,"sensorTopic":"test"
+   *   ,"rawMessageStrategy" : "ENVELOPE"
+   *   ,"rawMessageStrategyConfig" : {
+   *       "messageField" : "data"
+   *   }
+   *   ,"mergeMetadata" : false
+   *   ,"parserConfig": {
+   *     "columns" : {
+   *      "field1" : 0,
+   *      "timestamp" : 1
+   *     }
+   *   }
+   * }
+   */
+  @Multiline
+  public static String parserConfig_nomerge;
+
+  @Test
+  public void testEnvelopedData_noMergeMetadata() throws IOException {
+    ParserDriver driver = new ParserDriver("test", parserConfig_nomerge, "{}");
+    Map<String, Object> inputRecord = new HashMap<String, Object>() {{
+      put(Constants.Fields.ORIGINAL.getName(), "real_original_string");
+      put("data", "field1_val,100");
+      put("metadata_field", "metadata_val");
+    }};
+    ProcessorResult<List<byte[]>> results = driver.run(ImmutableList.of(JSONUtils.INSTANCE.toJSONPretty(inputRecord)));
+    Assert.assertFalse(results.failed());
+    List<byte[]> resultList = results.getResult();
+    Assert.assertEquals(1, resultList.size());
+    Map<String, Object> outputRecord = JSONUtils.INSTANCE.load(new String(resultList.get(0)), JSONUtils.MAP_SUPPLIER);
+    Assert.assertEquals("field1_val", outputRecord.get("field1"));
+    Assert.assertEquals(inputRecord.get(Constants.Fields.ORIGINAL.getName()), outputRecord.get(Constants.Fields.ORIGINAL.getName()));
+    Assert.assertFalse(outputRecord.containsKey(MetadataUtil.METADATA_PREFIX + ".metadata_field"));
+  }
+
+  /**
+   * {
+   *    "parserClassName" : "org.apache.metron.parsers.GrokParser"
+   *   ,"sensorTopic" : "ciscoPix"
+
+   *   , "parserConfig": {
+   *      "grokPath": "/patterns/cisco_patterns",
+   *      "patternLabel": "CISCO_PIX",
+   *      "timestampField": "timestamp",
+   *      "timeFields" : [ "timestamp" ],
+   *      "dateFormat" : "MMM dd yyyy HH:mm:ss"
+   *    }
+   * }
+   */
+  @Multiline
+  public static String ciscoPixSyslogConfig;
+
+  /**
+   * {
+   *    "parserClassName" : "org.apache.metron.parsers.GrokParser"
+   *   ,"sensorTopic" : "cisco302020"
+   *   ,"rawMessageStrategy" : "ENVELOPE"
+   *   ,"rawMessageStrategyConfig" : {
+   *       "messageField" : "data",
+   *       "metadataPrefix" : ""
+   *   }
+   *   , "parserConfig": {
+   *      "grokPath": "/patterns/cisco_patterns",
+   *      "patternLabel": "CISCOFW302020_302021"
+   *    }
+   * }
+   */
+  @Multiline
+  public static String cisco302020Config;
+
+  @Test
+  public void testCiscoPixEnvelopingCisco302020() throws Exception {
+    byte[] envelopedData = null;
+    {
+      ParserDriver driver = new ParserDriver("ciscoPix", ciscoPixSyslogConfig, "{}");
+      String inputRecord = "Mar 29 2004 09:54:18: %PIX-6-302005: Built UDP connection for faddr 198.207.223.240/53337 gaddr 10.0.0.187/53 laddr 192.168.0.2/53";
+      ProcessorResult<List<byte[]>> results = driver.run(ImmutableList.of(inputRecord.getBytes()));
+      Assert.assertFalse(results.failed());
+      List<byte[]> resultList = results.getResult();
+      envelopedData = resultList.get(0);
+    }
+    {
+      ParserDriver driver = new ParserDriver("cisco302020", cisco302020Config, "{}");
+      ProcessorResult<List<byte[]>> results = driver.run(ImmutableList.of(envelopedData));
+      Assert.assertFalse(results.failed());
+      List<byte[]> resultList = results.getResult();
+      Assert.assertEquals(1, resultList.size());
+      Map<String, Object> result = JSONUtils.INSTANCE.load(new String(resultList.get(0)), JSONUtils.MAP_SUPPLIER);
+      Assert.assertEquals("UDP", result.get("protocol"));
+      Assert.assertTrue((long)result.get("timestamp") > 1000 );
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index b03ea80..ec7c3ab 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -151,7 +151,7 @@ public class ParserDriver implements Serializable {
     this.globalConfig = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER);
   }
 
-  public ProcessorResult<List<byte[]>> run(List<byte[]> in) {
+  public ProcessorResult<List<byte[]>> run(Iterable<byte[]> in) {
     ShimParserBolt bolt = new ShimParserBolt(new ArrayList<>());
     byte[] b = SerializationUtils.serialize(bolt);
     ShimParserBolt b2 = (ShimParserBolt) SerializationUtils.deserialize(b);

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/metron-platform/metron-parsers/src/test/resources/patterns/cisco_patterns
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/resources/patterns/cisco_patterns b/metron-platform/metron-parsers/src/test/resources/patterns/cisco_patterns
new file mode 100644
index 0000000..bebab22
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/resources/patterns/cisco_patterns
@@ -0,0 +1,6 @@
+CISCO_ACTION Built|Teardown|Deny|Denied|denied|requested|permitted|denied by ACL|discarded|est-allowed|Dropping|created|deleted
+CISCO_REASON Duplicate TCP SYN|Failed to locate egress interface|Invalid transport field|No matching connection|DNS Response|DNS Query|(?:%{WORD}\s*)*
+CISCO_DIRECTION Inbound|inbound|Outbound|outbound
+CISCOFW302020_302021 %{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} connection for faddr %{IP:ip_dst_addr}/%{INT:icmp_seq_num}(?:\(%{DATA:fwuser}\))? gaddr %{IP:ip_src_xlated}/%{INT:icmp_code_xlated} laddr %{IP:ip_src_addr}/%{INT:icmp_code}( \(%{DATA:user}\))?
+ACCESSED %{URIHOST:ip_src_addr} Accessed URL %{IP:ip_dst_addr}:%{URIPATHPARAM:uri_path}
+CISCO_PIX %{GREEDYDATA:timestamp}: %PIX-%{NOTSPACE:pix_type}: %{GREEDYDATA:data}

http://git-wip-us.apache.org/repos/asf/metron/blob/cbdaee17/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1947b37..eb79e52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -321,6 +321,7 @@
                         <exclude>**/*.rpm</exclude>
                         <exclude>site/**</exclude>
                         <exclude>**/src/main/resources/patterns/**</exclude>
+                        <exclude>**/src/test/resources/patterns/**</exclude>
                         <exclude>**/src/main/sample/patterns/**</exclude>
                         <exclude>**/src/test/resources/**</exclude>
                         <exclude>**/src/main/sample/data/**</exclude>