You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/27 13:59:24 UTC

incubator-metron git commit: METRON-186: Create a fieldMapping functionality which allows for parsed fields to be transformed closes apache/incubator-metron#136

Repository: incubator-metron
Updated Branches:
  refs/heads/master bf5726e89 -> 64b0f18a9


METRON-186: Create a fieldMapping functionality which allows for parsed fields to be transformed closes apache/incubator-metron#136


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/64b0f18a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/64b0f18a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/64b0f18a

Branch: refs/heads/master
Commit: 64b0f18a9a03a5cba3062c1fb82072b93eb19dce
Parents: bf5726e
Author: cstella <ce...@gmail.com>
Authored: Fri May 27 09:59:13 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Fri May 27 09:59:13 2016 -0400

----------------------------------------------------------------------
 .../common/configuration/FieldTransformer.java  | 157 ++++++++++++++++
 .../configuration/SensorParserConfig.java       |  51 ++++--
 .../transformation/FieldTransformation.java     |  30 ++++
 .../transformation/FieldTransformations.java    |  39 ++++
 .../IPProtocolTransformation.java               | 178 +++++++++++++++++++
 .../transformation/RemoveTransformation.java    |  85 +++++++++
 .../SimpleFieldTransformation.java              |  42 +++++
 .../transformation/FieldTransformationTest.java | 148 +++++++++++++++
 .../RemoveTransformationTest.java               | 106 +++++++++++
 .../sample/data/yaf/parsed/YafExampleParsed     |  20 +--
 metron-platform/metron-parsers/README.md        | 114 +++++++++++-
 .../src/main/config/zookeeper/parsers/yaf.json  |   6 +
 .../apache/metron/parsers/bolt/ParserBolt.java  |  30 +++-
 .../metron/parsers/bolt/ParserBoltTest.java     |  14 +-
 14 files changed, 989 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
new file mode 100644
index 0000000..cc0c8b4
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.common.field.transformation.FieldTransformation;
+import org.apache.metron.common.field.transformation.FieldTransformations;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FieldTransformer implements Serializable {
+  private List<String> input = new ArrayList<>();
+  private List<String> output;
+  private FieldTransformation transformation;
+  private Map<String, Object> config = new HashMap<>();
+
+  public FieldTransformer() {
+  }
+
+  public List<String> getInput() {
+    return input;
+  }
+
+  public void setInput(Object inputFields) {
+    if(inputFields instanceof String) {
+      this.input= ImmutableList.of(inputFields.toString());
+    }
+    else if(inputFields instanceof List) {
+      this.input= (List<String>)inputFields;
+    }
+  }
+
+  public List<String> getOutput() {
+    return output;
+  }
+
+  public void setOutput(Object outputField) {
+    if(outputField instanceof String) {
+      this.output = ImmutableList.of(outputField.toString());
+    }
+    else if(outputField instanceof List) {
+      this.output = (List<String>)outputField;
+    }
+  }
+
+  public Map<String, Object> getConfig() {
+    return config;
+  }
+
+  public void setConfig(Map<String, Object> config) {
+    this.config = config;
+  }
+
+  public FieldTransformation getTransformation() {
+    return transformation;
+  }
+
+  public void setTransformation(String transformation) {
+    this.transformation = FieldTransformations.get(transformation);
+  }
+
+  public void initAndValidate() {
+    if(getTransformation() == null) {
+      throw new IllegalStateException("Mapping cannot be null.");
+    }
+
+    if(output== null || output.isEmpty()) {
+      if(input == null || input.isEmpty()) {
+        throw new IllegalStateException("You must specify an input field if you want to leave the output fields empty");
+      }
+      else {
+        output = input;
+      }
+    }
+  }
+
+  public Map<String, Object> transform(JSONObject input, Map<String, Object> sensorConfig) {
+    if(getInput() == null || getInput().isEmpty()) {
+      return transformation.map(input, getOutput(), config, sensorConfig);
+    }
+    else {
+      Map<String, Object> in = new HashMap<>();
+      for(String inputField : getInput()) {
+        in.put(inputField, input.get(inputField));
+      }
+      return transformation.map(in, getOutput(), config, sensorConfig);
+    }
+  }
+
+  public void transformAndUpdate(JSONObject message, Map<String, Object> sensorConfig) {
+    Map<String, Object> currentValue = transform(message, sensorConfig);
+    if(currentValue != null) {
+      for(Map.Entry<String, Object> kv : currentValue.entrySet()) {
+        if(kv.getValue() == null) {
+          message.remove(kv.getKey());
+        }
+        else {
+          message.put(kv.getKey(), kv.getValue());
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MappingHandler{" +
+            "input=" + input +
+            ", output='" + output + '\'' +
+            ", transformation=" + transformation +
+            ", config=" + config +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    FieldTransformer that = (FieldTransformer) o;
+
+    if (getInput() != null ? !getInput().equals(that.getInput()) : that.getInput() != null) return false;
+    if (getOutput() != null ? !getOutput().equals(that.getOutput()) : that.getOutput() != null) return false;
+    if (getTransformation() != null ? !getTransformation().equals(that.getTransformation()) : that.getTransformation() != null) return false;
+    return getConfig() != null ? getConfig().equals(that.getConfig()) : that.getConfig() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getInput() != null ? getInput().hashCode() : 0;
+    result = 31 * result + (getOutput() != null ? getOutput().hashCode() : 0);
+    result = 31 * result + (getTransformation() != null ? getTransformation().hashCode() : 0);
+    result = 31 * result + (getConfig() != null ? getConfig().hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 8cf1901..106c55a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -21,13 +21,25 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class SensorParserConfig {
 
   private String parserClassName;
   private String sensorTopic;
-  private Map<String, Object> parserConfig;
+  private Map<String, Object> parserConfig = new HashMap<>();
+  private List<FieldTransformer> fieldTransformations = new ArrayList<>();
+
+  public List<FieldTransformer> getFieldTransformations() {
+    return fieldTransformations;
+  }
+
+  public void setFieldTransformations(List<FieldTransformer> fieldTransformations) {
+    this.fieldTransformations = fieldTransformations;
+  }
 
   public String getParserClassName() {
     return parserClassName;
@@ -54,29 +66,47 @@ public class SensorParserConfig {
   }
 
   public static SensorParserConfig fromBytes(byte[] config) throws IOException {
-    return JSONUtils.INSTANCE.load(new String(config), SensorParserConfig.class);
+    SensorParserConfig ret = JSONUtils.INSTANCE.load(new String(config), SensorParserConfig.class);
+    ret.init();
+    return ret;
   }
 
+  public void init() {
+    for(FieldTransformer h : getFieldTransformations()) {
+      h.initAndValidate();
+    }
+  }
+
+
   public String toJSON() throws JsonProcessingException {
     return JSONUtils.INSTANCE.toJSON(this, true);
   }
 
   @Override
+  public String toString() {
+    return "SensorParserConfig{" +
+            "parserClassName='" + parserClassName + '\'' +
+            ", sensorTopic='" + sensorTopic + '\'' +
+            ", parserConfig=" + parserConfig +
+            ", fieldTransformations=" + fieldTransformations +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
 
     SensorParserConfig that = (SensorParserConfig) o;
 
-    if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null) return false;
-    if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null) return false;
-    return getParserConfig() != null ? getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() == null;
-  }
+    if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null)
+      return false;
+    if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
+      return false;
+    if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
+      return false;
+    return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null;
 
-  @Override
-  public String toString() {
-    return "{parserClassName=" + parserClassName + ", sensorTopic=" + sensorTopic +
-            ", parserConfig=" + parserConfig + "}";
   }
 
   @Override
@@ -84,6 +114,7 @@ public class SensorParserConfig {
     int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
     result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
     result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
+    result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
new file mode 100644
index 0000000..b983815
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import java.util.List;
+import java.util.Map;
+
+public interface FieldTransformation {
+  Map<String, Object> map( Map<String, Object> input
+                         , List<String> outputField
+                         , Map<String, Object> fieldMappingConfig
+                         , Map<String, Object> sensorConfig
+                         );
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
new file mode 100644
index 0000000..f4fa488
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import org.apache.metron.common.utils.ReflectionUtils;
+
+public enum FieldTransformations {
+  IP_PROTOCOL(new IPProtocolTransformation())
+  ,REMOVE(new RemoveTransformation())
+  ;
+  FieldTransformation mapping;
+  FieldTransformations(FieldTransformation mapping) {
+    this.mapping = mapping;
+  }
+  public static FieldTransformation get(String mapping) {
+    try {
+      return FieldTransformations.valueOf(mapping).mapping;
+    }
+    catch(Exception ex) {
+      return ReflectionUtils.createInstance(mapping);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/IPProtocolTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/IPProtocolTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/IPProtocolTransformation.java
new file mode 100644
index 0000000..b0e9a9c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/IPProtocolTransformation.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IPProtocolTransformation extends SimpleFieldTransformation {
+
+  private final static Map<Integer, String> PROTOCOLS = new HashMap<>();
+
+  static {
+    PROTOCOLS.put(1, "ICMP");
+    PROTOCOLS.put(2, "IGMP");
+    PROTOCOLS.put(3, "GGP");
+    PROTOCOLS.put(4, "IP-in-IP");
+    PROTOCOLS.put(5, "ST");
+    PROTOCOLS.put(6, "TCP");
+    PROTOCOLS.put(7, "CBT");
+    PROTOCOLS.put(8, "EGP");
+    PROTOCOLS.put(9, "IGP");
+    PROTOCOLS.put(10, "BBN-RCC-MON");
+    PROTOCOLS.put(11, "NVP-II");
+    PROTOCOLS.put(12, "PUP");
+    PROTOCOLS.put(13, "ARGUS");
+    PROTOCOLS.put(14, "EMCON");
+    PROTOCOLS.put(15, "XNET");
+    PROTOCOLS.put(16, "CHAOS");
+    PROTOCOLS.put(17, "UDP");
+    PROTOCOLS.put(18, "MUX");
+    PROTOCOLS.put(19, "DCN-MEAS");
+    PROTOCOLS.put(20, "HMP");
+    PROTOCOLS.put(21, "PRM");
+    PROTOCOLS.put(22, "XNS-IDP");
+    PROTOCOLS.put(23, "TRUNK-1");
+    PROTOCOLS.put(24, "TRUNK-2");
+    PROTOCOLS.put(25, "LEAF-1");
+    PROTOCOLS.put(26, "LEAF-2");
+    PROTOCOLS.put(27, "RDP");
+    PROTOCOLS.put(28, "IRTP");
+    PROTOCOLS.put(29, "ISO-TP4");
+    PROTOCOLS.put(30, "NETBLT");
+    PROTOCOLS.put(31, "MFE-NSP");
+    PROTOCOLS.put(32, "MERIT-INP");
+    PROTOCOLS.put(33, "DCCP");
+    PROTOCOLS.put(34, "3PC");
+    PROTOCOLS.put(35, "IDPR");
+    PROTOCOLS.put(36, "XTP");
+    PROTOCOLS.put(37, "DDP");
+    PROTOCOLS.put(38, "IDPR-CMTP");
+    PROTOCOLS.put(39, "TP++");
+    PROTOCOLS.put(40, "IL");
+    PROTOCOLS.put(41, "IPv6");
+    PROTOCOLS.put(42, "SDRP");
+    PROTOCOLS.put(43, "IPv6-Route");
+    PROTOCOLS.put(44, "IPv6-Frag");
+    PROTOCOLS.put(45, "IDRP");
+    PROTOCOLS.put(46, "RSVP");
+    PROTOCOLS.put(47, "GRE");
+    PROTOCOLS.put(48, "MHRP");
+    PROTOCOLS.put(49, "BNA");
+    PROTOCOLS.put(50, "ESP");
+    PROTOCOLS.put(51, "AH");
+    PROTOCOLS.put(52, "I-NLSP");
+    PROTOCOLS.put(53, "SWIPE");
+    PROTOCOLS.put(54, "NARP");
+    PROTOCOLS.put(55, "MOBILE");
+    PROTOCOLS.put(56, "TLSP");
+    PROTOCOLS.put(57, "SKIP");
+    PROTOCOLS.put(58, "IPv6-ICMP");
+    PROTOCOLS.put(59, "IPv6-NoNxt");
+    PROTOCOLS.put(60, "IPv6-Opts");
+    PROTOCOLS.put(62, "CFTP");
+    PROTOCOLS.put(64, "SAT-EXPAK");
+    PROTOCOLS.put(65, "KRYPTOLAN");
+    PROTOCOLS.put(66, "RVD");
+    PROTOCOLS.put(67, "IPPC");
+    PROTOCOLS.put(68, "SAT-MON");
+    PROTOCOLS.put(70, "VISA");
+    PROTOCOLS.put(71, "IPCU");
+    PROTOCOLS.put(72, "CPNX");
+    PROTOCOLS.put(73, "CPHB");
+    PROTOCOLS.put(74, "WSN");
+    PROTOCOLS.put(75, "PVP");
+    PROTOCOLS.put(76, "BR-SAT-MON");
+    PROTOCOLS.put(77, "SUN-ND");
+    PROTOCOLS.put(78, "WB-MON");
+    PROTOCOLS.put(79, "WB-EXPAK");
+    PROTOCOLS.put(80, "ISO-IP");
+    PROTOCOLS.put(81, "VMTP");
+    PROTOCOLS.put(82, "SECURE-VMTP");
+    PROTOCOLS.put(83, "VINES");
+    PROTOCOLS.put(84, "TTP");
+    PROTOCOLS.put(85, "NSFNET-IGP");
+    PROTOCOLS.put(86, "DGP");
+    PROTOCOLS.put(87, "TCF");
+    PROTOCOLS.put(88, "");
+    PROTOCOLS.put(89, "OSPF");
+    PROTOCOLS.put(90, "Sprite-RP");
+    PROTOCOLS.put(91, "LARP");
+    PROTOCOLS.put(92, "MTP");
+    PROTOCOLS.put(93, "AX.25");
+    PROTOCOLS.put(94, "IPIP");
+    PROTOCOLS.put(95, "MICP");
+    PROTOCOLS.put(96, "SCC-SP");
+    PROTOCOLS.put(97, "ETHERIP");
+    PROTOCOLS.put(98, "ENCAP");
+    PROTOCOLS.put(100, "GMTP");
+    PROTOCOLS.put(101, "IFMP");
+    PROTOCOLS.put(102, "PNNI");
+    PROTOCOLS.put(103, "PIM");
+    PROTOCOLS.put(104, "ARIS");
+    PROTOCOLS.put(105, "SCPS");
+    PROTOCOLS.put(106, "QNX");
+    PROTOCOLS.put(107, "A/N");
+    PROTOCOLS.put(108, "IPComp");
+    PROTOCOLS.put(109, "SNP");
+    PROTOCOLS.put(110, "Compaq-Peer");
+    PROTOCOLS.put(111, "IPX-in-IP");
+    PROTOCOLS.put(112, "VRRP");
+    PROTOCOLS.put(113, "PGM");
+    PROTOCOLS.put(115, "L2TP");
+    PROTOCOLS.put(116, "DDX");
+    PROTOCOLS.put(117, "IATP");
+    PROTOCOLS.put(118, "STP");
+    PROTOCOLS.put(119, "SRP");
+    PROTOCOLS.put(120, "UTI");
+    PROTOCOLS.put(121, "SMP");
+    PROTOCOLS.put(122, "SM");
+    PROTOCOLS.put(123, "PTP");
+    PROTOCOLS.put(124, "IS-IS");
+    PROTOCOLS.put(125, "FIRE");
+    PROTOCOLS.put(126, "CRTP");
+    PROTOCOLS.put(127, "CRUDP");
+    PROTOCOLS.put(128, "SSCOPMCE");
+    PROTOCOLS.put(129, "IPLT");
+    PROTOCOLS.put(130, "SPS");
+    PROTOCOLS.put(131, "PIPE");
+    PROTOCOLS.put(132, "SCTP");
+    PROTOCOLS.put(133, "FC");
+    PROTOCOLS.put(134, "RSVP-E2E-IGNORE");
+    PROTOCOLS.put(135, "Mobility Header");
+    PROTOCOLS.put(136, "UDPLite");
+    PROTOCOLS.put(137, "MPLS-in-IP");
+    PROTOCOLS.put(138, "manet");
+    PROTOCOLS.put(139, "HIP");
+    PROTOCOLS.put(140, "Shim6");
+    PROTOCOLS.put(141, "WESP");
+    PROTOCOLS.put(142, "ROHC");
+  }
+
+  @Override
+  public Map<String, Object> map(Object value, String outputField) {
+    Map<String, Object> ret = new HashMap<>();
+    if(value != null && value instanceof Number) {
+      int protocolNum = ((Number)value).intValue();
+      ret.put(outputField, PROTOCOLS.get(protocolNum));
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
new file mode 100644
index 0000000..c370cd3
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.common.query.MapVariableResolver;
+import org.apache.metron.common.query.ParseException;
+import org.apache.metron.common.query.PredicateProcessor;
+import org.apache.metron.common.query.VariableResolver;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RemoveTransformation implements FieldTransformation {
+  public static final String CONDITION_CONF = "condition";
+  public static final PredicateProcessor PASSTHROUGH_PROCESSOR = new PredicateProcessor() {
+    @Override
+    public boolean parse(String rule, VariableResolver resolver) {
+      return true;
+    }
+
+    @Override
+    public boolean validate(String rule) throws ParseException {
+      return true;
+    }
+
+    @Override
+    public boolean validate(String rule, boolean throwException) throws ParseException {
+      return true;
+    }
+  };
+  private String getCondition(Map<String, Object> fieldMappingConfig) {
+    Object conditionObj = fieldMappingConfig.get(CONDITION_CONF);
+
+    if(conditionObj == null || !(conditionObj instanceof String)) {
+      return null;
+    }
+    return conditionObj.toString();
+  }
+
+  private PredicateProcessor getPredicateProcessor(String condition)
+  {
+    if(condition == null) {
+      return PASSTHROUGH_PROCESSOR;
+    }
+    else {
+      return new PredicateProcessor();
+    }
+  }
+
+  @Override
+  public Map<String, Object> map( Map<String, Object> input
+                                , final List<String> outputFields
+                                , Map<String, Object> fieldMappingConfig
+                                , Map<String, Object> sensorConfig
+                                ) {
+    String condition = getCondition(fieldMappingConfig);
+    PredicateProcessor processor = getPredicateProcessor(condition);
+    if(processor.parse(condition, new MapVariableResolver(input))) {
+      return new HashMap<String, Object>() {{
+        for(String outputField : outputFields) {
+          put(outputField, null);
+        }
+      }};
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
new file mode 100644
index 0000000..3f6215e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import com.google.common.collect.Iterables;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class SimpleFieldTransformation implements FieldTransformation {
+  @Override
+  public Map<String, Object> map (Map<String, Object> input
+                                , List<String> outputField
+                                , Map<String, Object> fieldMappingConfig
+                                , Map<String, Object> sensorConfig
+                                )
+  {
+    Object value = (input == null || input.values() == null && input.values().isEmpty())
+                 ? null
+                 : Iterables.getFirst(input.values(), null)
+                 ;
+    return map(value, outputField.get(0));
+  }
+
+  public abstract Map<String, Object> map(Object input, String outputField);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
new file mode 100644
index 0000000..4f0319a
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class FieldTransformationTest {
+  public static class TestTransformation implements FieldTransformation {
+
+    @Override
+    public Map<String, Object> map( Map<String, Object> input
+                                  , List<String> outputField
+                                  , Map<String, Object> fieldMappingConfig
+                                  , Map<String, Object> sensorConfig
+                                  )
+    {
+      return ImmutableMap.of(outputField.get(0), Joiner.on(fieldMappingConfig.get("delim").toString()).join(input.entrySet()));
+    }
+  }
+
+ /**
+   {
+    "fieldTransformations" : [
+          {
+            "input" : [ "field1", "field2" ]
+          , "output" : "output"
+          , "transformation" : "org.apache.metron.common.field.transformation.FieldTransformationTest$TestTransformation"
+          , "config" : {
+                "delim" : ","
+                      }
+          }
+                      ]
+   }
+   */
+  @Multiline
+  public static String complexConfig;
+
+  /**
+   {
+    "fieldTransformations" : [
+          {
+            "input" : "protocol"
+          , "transformation" : "IP_PROTOCOL"
+          }
+                      ]
+   }
+   */
+  @Multiline
+  public static String config;
+
+  /**
+   {
+    "fieldTransformations" : [
+          {
+           "transformation" : "IP_PROTOCOL"
+          }
+                      ]
+   }
+   */
+  @Multiline
+  public static String badConfigMissingInput;
+
+  /**
+   {
+    "fieldTransformations" : [
+          {
+            "input" : "protocol"
+          }
+                      ]
+   }
+   */
+  @Multiline
+  public static String badConfigMissingMapping;
+
+  @Test
+  public void testValidSerde_simple() throws IOException {
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(config));
+    Assert.assertEquals(1, c.getFieldTransformations().size());
+    Assert.assertEquals(IPProtocolTransformation.class, c.getFieldTransformations().get(0).getTransformation().getClass());
+    Assert.assertEquals(ImmutableList.of("protocol"), c.getFieldTransformations().get(0).getInput());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testInValidSerde_missingInput() throws IOException {
+    SensorParserConfig.fromBytes(Bytes.toBytes(badConfigMissingInput));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testInValidSerde_missingMapping() throws IOException {
+    SensorParserConfig.fromBytes(Bytes.toBytes(badConfigMissingMapping));
+  }
+
+  @Test
+  public void testComplexMapping() throws IOException {
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(complexConfig));
+    FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null);
+
+    Assert.assertNotNull(handler);
+    Assert.assertEquals(ImmutableMap.of("output", "field1=value1,field2=value2")
+                       ,handler.transform(new JSONObject(ImmutableMap.of("field1", "value1"
+                                                                  ,"field2", "value2"
+                                                                  )
+                                                  )
+                                   , c.getParserConfig()
+                                   )
+                       );
+  }
+  @Test
+  public void testSimpleMapping() throws IOException {
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(config));
+    FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null);
+
+    Assert.assertNotNull(handler);
+    Assert.assertEquals(ImmutableMap.of("protocol", "TCP")
+                       ,handler.transform(new JSONObject(ImmutableMap.of("protocol", 6)), c.getParserConfig())
+                       );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
new file mode 100644
index 0000000..fc43406
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class RemoveTransformationTest {
+  /**
+   {
+    "fieldTransformations" : [
+          {
+            "input" : "field1"
+          , "transformation" : "REMOVE"
+          }
+                      ]
+   }
+   */
+  @Multiline
+  public static String removeUnconditionalConfig;
+
+  @Test
+  public void testUnconditionalRemove() throws Exception{
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(removeUnconditionalConfig));
+    FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null);
+    JSONObject input = new JSONObject(new HashMap<String, Object>() {{
+      put("field1", "foo");
+    }});
+    handler.transformAndUpdate(input, new HashMap<>());
+    Assert.assertFalse(input.containsKey("field1"));
+  }
+
+  /**
+   {
+    "fieldTransformations" : [
+          {
+            "output" : "field1"
+          , "transformation" : "REMOVE"
+          , "config" : {
+              "condition" : "exists(field2) and field2 == 'foo'"
+                       }
+          }
+                      ]
+   }
+   */
+  @Multiline
+  public static String removeConditionalConfig;
+  @Test
+  public void testConditionalRemove() throws Exception {
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(removeConditionalConfig));
+    FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null);
+    {
+      JSONObject input = new JSONObject(new HashMap<String, Object>() {{
+        put("field1", "foo");
+      }});
+      handler.transformAndUpdate(input, new HashMap<>());
+      //no removal happened because field2 does not exist
+      Assert.assertTrue(input.containsKey("field1"));
+      Assert.assertFalse(input.containsKey("field2"));
+    }
+    {
+      JSONObject input = new JSONObject(new HashMap<String, Object>() {{
+        put("field1", "foo");
+        put("field2", "bar");
+      }});
+      handler.transformAndUpdate(input, new HashMap<>());
+      //no removal happened because field2 != bar
+      Assert.assertTrue(input.containsKey("field1"));
+      Assert.assertTrue(input.containsKey("field2"));
+    }
+    {
+      JSONObject input = new JSONObject(new HashMap<String, Object>() {{
+        put("field1", "bar");
+        put("field2", "foo");
+      }});
+      //removal of field1 happens because field2 exists and is 'bar'
+      handler.transformAndUpdate(input, new HashMap<>());
+      Assert.assertFalse(input.containsKey("field1"));
+      Assert.assertTrue(input.containsKey("field2"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-integration-test/src/main/sample/data/yaf/parsed/YafExampleParsed
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/sample/data/yaf/parsed/YafExampleParsed b/metron-platform/metron-integration-test/src/main/sample/data/yaf/parsed/YafExampleParsed
index 6155e98..9d51813 100644
--- a/metron-platform/metron-integration-test/src/main/sample/data/yaf/parsed/YafExampleParsed
+++ b/metron-platform/metron-integration-test/src/main/sample/data/yaf/parsed/YafExampleParsed
@@ -1,10 +1,10 @@
-{"iflags":"AS","uflags":0,"isn":"22efa001","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988512,"app":0,"oct":44,"end_reason":"idle","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
-{"iflags":"A","uflags":0,"isn":10000000,"ip_dst_addr":"10.0.2.3","ip_dst_port":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":37299,"timestamp":1453994988502,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988502,"source.type":"yaf","start_time":1453994988502,"riflags":0,"rtt":"0.000","protocol":17}
-{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.15","ip_dst_port":37299,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.3","tag":0,"rtag":0,"ip_src_port":53,"timestamp":1453994988504,"app":0,"oct":312,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","protocol":17}
-{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.3","ip_dst_port":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":56303,"timestamp":1453994988504,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","protocol":17}
-{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.15","ip_dst_port":56303,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.3","tag":0,"rtag":0,"ip_src_port":53,"timestamp":1453994988506,"app":0,"oct":84,"end_reason":"idle","risn":0,"end_time":1453994988506,"source.type":"yaf","start_time":1453994988506,"riflags":0,"rtt":"0.000","protocol":17}
-{"iflags":"S","uflags":0,"isn":"58c52fca","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988508,"app":0,"oct":60,"end_reason":"idle","risn":0,"end_time":1453994988508,"source.type":"yaf","start_time":1453994988508,"riflags":0,"rtt":"0.000","protocol":6}
-{"iflags":"A","uflags":0,"isn":"58c52fcb","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
-{"iflags":"AP","uflags":0,"isn":"58c52fcb","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988512,"app":0,"oct":148,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
-{"iflags":"A","uflags":0,"isn":"22efa002","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
-{"iflags":"AP","uflags":0,"isn":"22efa002","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988562,"app":0,"oct":604,"end_reason":"idle","risn":0,"end_time":1453994988562,"source.type":"yaf","start_time":1453994988562,"riflags":0,"rtt":"0.000","protocol":6}
+{"iflags":"AS","uflags":0,"isn":"22efa001","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988512,"app":0,"oct":44,"end_reason":"idle","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":"TCP"}
+{"iflags":"A","uflags":0,"isn":10000000,"ip_dst_addr":"10.0.2.3","ip_dst_port":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":37299,"timestamp":1453994988502,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988502,"source.type":"yaf","start_time":1453994988502,"riflags":0,"rtt":"0.000","protocol":"UDP"}
+{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.15","ip_dst_port":37299,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.3","tag":0,"rtag":0,"ip_src_port":53,"timestamp":1453994988504,"app":0,"oct":312,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","protocol":"UDP"}
+{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.3","ip_dst_port":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":56303,"timestamp":1453994988504,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","protocol":"UDP"}
+{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.15","ip_dst_port":56303,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.3","tag":0,"rtag":0,"ip_src_port":53,"timestamp":1453994988506,"app":0,"oct":84,"end_reason":"idle","risn":0,"end_time":1453994988506,"source.type":"yaf","start_time":1453994988506,"riflags":0,"rtt":"0.000","protocol":"UDP"}
+{"iflags":"S","uflags":0,"isn":"58c52fca","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988508,"app":0,"oct":60,"end_reason":"idle","risn":0,"end_time":1453994988508,"source.type":"yaf","start_time":1453994988508,"riflags":0,"rtt":"0.000","protocol":"TCP"}
+{"iflags":"A","uflags":0,"isn":"58c52fcb","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":"TCP"}
+{"iflags":"AP","uflags":0,"isn":"58c52fcb","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988512,"app":0,"oct":148,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":"TCP"}
+{"iflags":"A","uflags":0,"isn":"22efa002","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":"TCP"}
+{"iflags":"AP","uflags":0,"isn":"22efa002","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988562,"app":0,"oct":604,"end_reason":"idle","risn":0,"end_time":1453994988562,"source.type":"yaf","start_time":1453994988562,"riflags":0,"rtt":"0.000","protocol":"TCP"}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 9fcb431..cb3e74b 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -1,8 +1,13 @@
-#metron-parsers
+#Parsers
 
-##Module Description
+Parsers are pluggable components which are used to transform raw data
+(textual or raw bytes) into JSON messages suitable for downstream
+enrichment and indexing.  
 
-This module provides a list of parsers that can be used with the Metron framework.  There are two types of parsers.  First type is a Java parser.  This kind of parser is optimized for speed and performance and is built for use with higher velicity topologies.  These parsers are not easily modifiable and in order to make changes to them the entire topology need to be recompiled.  The second type of parser provided with the system is a Grok parser.  This type of parser is primarily designed for lower-velocity topologies or for quickly standing up a parser for a new telemetry before a permanent Java parser can be written for it.
+There are two types of parsers:
+*  A parser written in Java which conforms to the `MessageParser` interface.  This kind of parser is optimized for speed and performance and
+is built for use with higher velocity topologies.  These parsers are not easily modifiable and in order to make changes to them the entire topology need to be recompiled.  
+* A Grok parser.  This type of parser is primarily designed for lower-velocity topologies or for quickly standing up a parser for a new telemetry before a permanent Java parser can be written for it.
 
 ##Message Format
 
@@ -42,6 +47,109 @@ So putting it all together a typical Metron message with all 5-tuple fields pres
 }
 ```
 
+##Parser Configuration
+
+The configuration for the various parser topologies is defined by JSON
+documents stored in zookeeper.
+
+The document is structured in the following way
+
+* `parserClassName` : The fully qualified classname for the parser to be used.
+* `sensorTopic` : The kafka topic to send the parsed messages to.
+* `parserConfig` : A JSON Map representing the parser implementation specific configuration.
+* `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic.
+
+The `fieldTransformations` is a complex object which defines a
+transformation which can be done to a message.  This transformation can 
+* Modify existing fields to a message
+* Add new fields given the values of existing fields of a message
+* Remove existing fields of a message
+
+###`fieldTransformation` configuration
+
+The format of a `fieldTransformation` is as follows:
+* `input` : An array of fields or a single field representing the input.  This is optional; if unspecified, then the whole message is passed as input.
+* `output` : The outputs to produce from the transformation.  If unspecified, it is assumed to be the same as inputs.
+* `transformation` : The fully qualified classname of the transformation to be used.  This is either a class which implements `FieldTransformation` or a member of the `FieldTransformations` enum.
+* `config` : A String to Object map of transformation specific configuration.
+ 
+The currently implemented fieldTransformations are:
+* `REMOVE` : This transformation removes the specified input fields.  If you want a conditional removal, you can pass a Metron Query Language statement to define the conditions under which you want to remove the fields. 
+
+Consider the following simple configuration which will remove `field1`
+unconditionally:
+```
+{
+...
+    "fieldTransformations" : [
+          {
+            "input" : "field1"
+          , "mapping" : "REMOVE"
+          }
+                      ]
+}
+```
+
+Consider the following simple sensor parser configuration which will remove `field1`
+whenever `field2` exists and whose corresponding equal to 'foo':
+```
+{
+...
+  "fieldTransformations" : [
+          {
+            "input" : "field1"
+          , "mapping" : "REMOVE"
+          , "config" : {
+              "condition" : "exists(field2) and field2 == 'foo'"
+                       }
+          }
+                      ]
+}
+```
+
+* `IP_PROTOCOL` : This transformation maps IANA protocol numbers to consistent string representations.
+
+Consider the following sensor parser config to map the `protocol` field
+to a textual representation of the protocol:
+```
+{
+...
+    "fieldTransformations" : [
+          {
+            "input" : "protocol"
+          , "transformation" : "IP_PROTOCOL"
+          }
+                      ]
+}
+```
+
+This transformation would transform `{ "protocol" : 6, "source.type" : "bro", ... }` 
+into `{ "protocol" : "TCP", "source.type" : "bro", ...}`
+
+###An Example Configuration for a Sensor
+Consider the following example configuration for the `yaf` sensor:
+
+```
+{
+  "parserClassName":"org.apache.metron.parsers.GrokParser",
+  "sensorTopic":"yaf",
+  "fieldTransformations" : [
+                    {
+                      "input" : "protocol"
+                     ,"transformation": "IP_PROTOCOL"
+                    }
+                    ],
+  "parserConfig":
+  {
+    "grokPath":"/patterns/yaf",
+    "patternLabel":"YAF_DELIMITED",
+    "timestampField":"start_time",
+    "timeFields": ["start_time", "end_time"],
+    "dateFormat":"yyyy-MM-dd HH:mm:ss.S"
+  }
+}
+```
+
 ##Parser Bolt
 
 The Metron parser bolt is a standard bolt, which can be extended with multiple Java and Grok parser adapter for parsing different topology messages.  The bolt signature for declaration in a storm topology is as follows:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
index 6290e9f..1267554 100644
--- a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
+++ b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
@@ -1,6 +1,12 @@
 {
   "parserClassName":"org.apache.metron.parsers.GrokParser",
   "sensorTopic":"yaf",
+  "fieldTransformations" : [
+                    {
+                      "input" : "protocol"
+                     ,"transformation": "IP_PROTOCOL"
+                    }
+                    ],
   "parserConfig":
   {
     "grokPath":"/patterns/yaf",

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 39f2641..2a64666 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -22,8 +22,9 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredBolt;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.parsers.filters.GenericMessageFilter;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.parsers.interfaces.MessageFilter;
@@ -61,19 +62,34 @@ public class ParserBolt extends ConfiguredParserBolt {
     this.collector = collector;
     parser.init();
     writer.init();
+    SensorParserConfig config = getConfigurations().getSensorParserConfig(sensorType);
+    if(config != null) {
+      config.init();
+    }
+    else {
+      throw new IllegalStateException("Unable to retrieve a parser config for " + sensorType);
+    }
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
     byte[] originalMessage = tuple.getBinary(0);
+    SensorParserConfig sensorParserConfig = getConfigurations().getSensorParserConfig(sensorType);
     try {
-      List<JSONObject> messages = parser.parse(originalMessage);
-      for(JSONObject message: messages) {
-        if (parser.validate(message)) {
-          if (filter != null && filter.emitTuple(message)) {
-            message.put(Constants.SENSOR_TYPE, sensorType);
-            writer.write(sensorType, configurations, tuple, message);
+      if(sensorParserConfig != null) {
+        List<JSONObject> messages = parser.parse(originalMessage);
+        for (JSONObject message : messages) {
+          if (parser.validate(message)) {
+            if (filter != null && filter.emitTuple(message)) {
+              message.put(Constants.SENSOR_TYPE, sensorType);
+              for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
+                if (handler != null) {
+                  handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
+                }
+              }
+              writer.write(sensorType, configurations, tuple, message);
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/64b0f18a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index a6f2ee1..b90e521 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.parsers.interfaces.MessageFilter;
@@ -52,7 +54,17 @@ public class ParserBoltTest extends BaseBoltTest {
   @Test
   public void test() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer);
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+      @Override
+      public ParserConfigurations getConfigurations() {
+        return new ParserConfigurations() {
+          @Override
+          public SensorParserConfig getSensorParserConfig(String sensorType) {
+            return new SensorParserConfig();
+          }
+        };
+      }
+    };
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);