You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/04/27 16:20:55 UTC

[2/2] incubator-metron git commit: METRON-870: Add filtering by packet payload to the pcap query closes apache/incubator-metron#541

METRON-870: Add filtering by packet payload to the pcap query closes apache/incubator-metron#541


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/bf2528fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/bf2528fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/bf2528fd

Branch: refs/heads/master
Commit: bf2528fd3f4d474164ddf497459cd2421df3e4bb
Parents: 5dd8788
Author: cstella <ce...@gmail.com>
Authored: Thu Apr 27 12:20:45 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Apr 27 12:20:45 2017 -0400

----------------------------------------------------------------------
 dependencies_with_url.csv                       |   1 +
 .../pcapservice/PcapReceiverImplRestEasy.java   |  24 ++-
 .../PcapReceiverImplRestEasyTest.java           |  85 ++++----
 .../org/apache/metron/common/Constants.java     |   7 +-
 .../stellar/GeoEnrichmentFunctions.java         |   2 +-
 metron-platform/metron-pcap-backend/README.md   |  29 ++-
 .../metron/pcap/query/FixedCliConfig.java       |  12 +-
 .../metron/pcap/query/FixedCliParser.java       |  15 +-
 .../org/apache/metron/utils/PcapInspector.java  |  11 +-
 .../apache/metron/pcap/FixedPcapFilterTest.java | 211 ++++++++++---------
 .../apache/metron/pcap/QueryPcapFilterTest.java | 121 +++++------
 .../PcapTopologyIntegrationTest.java            |  42 +++-
 .../apache/metron/pcap/query/PcapCliTest.java   |  47 +++--
 metron-platform/metron-pcap/pom.xml             |  19 ++
 .../java/org/apache/metron/pcap/PacketInfo.java |  25 ++-
 .../java/org/apache/metron/pcap/PcapHelper.java |  38 +++-
 .../metron/pcap/filter/PcapFieldResolver.java   |   8 +-
 .../apache/metron/pcap/filter/PcapFilter.java   |   2 +-
 .../pcap/filter/fixed/FixedPcapFilter.java      |  71 ++++++-
 .../pcap/filter/query/QueryPcapFilter.java      |   7 +-
 .../pcap/pattern/ByteArrayMatcherFunction.java  |  63 ++++++
 .../pcap/pattern/ByteArrayMatchingUtil.java     |  64 ++++++
 .../pcap/filter/fixed/FixedPcapFilterTest.java  |  22 +-
 .../pcap/pattern/ByteArrayMatchingUtilTest.java | 133 ++++++++++++
 24 files changed, 754 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 053f11a..93a19b7 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -293,5 +293,6 @@ org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/c
 com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp
 com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp
 org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/
+net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek
 org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
 org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 1f3c03e..9c58813 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -27,6 +27,7 @@ import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
@@ -37,10 +38,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.List;
+import java.util.*;
 
 @Path("/")
 public class PcapReceiverImplRestEasy {
@@ -197,6 +195,7 @@ public class PcapReceiverImplRestEasy {
           @DefaultValue("-1") @QueryParam ("endTime")long endTime,
           @DefaultValue("10") @QueryParam ("numReducers")int numReducers,
           @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
+          @DefaultValue("") @QueryParam ("packetFilter") String packetFilter,
           @Context HttpServletResponse servlet_response)
 
           throws IOException {
@@ -225,23 +224,26 @@ public class PcapReceiverImplRestEasy {
       //convert to nanoseconds since the epoch
       startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime);
       endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime);
-      EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      Map<String, String> query = new HashMap<String, String>() {{
                                       if(srcIp != null) {
-                                        put(Constants.Fields.SRC_ADDR, srcIp);
+                                        put(Constants.Fields.SRC_ADDR.getName(), srcIp);
                                       }
                                       if(dstIp != null) {
-                                        put(Constants.Fields.DST_ADDR, dstIp);
+                                        put(Constants.Fields.DST_ADDR.getName(), dstIp);
                                       }
                                       if(srcPort != null) {
-                                        put(Constants.Fields.SRC_PORT, srcPort);
+                                        put(Constants.Fields.SRC_PORT.getName(), srcPort);
                                       }
                                       if(dstPort != null) {
-                                        put(Constants.Fields.DST_PORT, dstPort);
+                                        put(Constants.Fields.DST_PORT.getName(), dstPort);
                                       }
                                       if(protocol != null) {
-                                        put(Constants.Fields.PROTOCOL, protocol);
+                                        put(Constants.Fields.PROTOCOL.getName(), protocol);
+                                      }
+                                      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "" + includeReverseTrafficF);
+                                      if(!org.apache.commons.lang3.StringUtils.isEmpty(packetFilter)) {
+                                        put(PcapHelper.PacketFields.PACKET_FILTER.getName(), packetFilter);
                                       }
-                                      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "" + includeReverseTrafficF);
                                     }};
       if(LOGGER.isDebugEnabled()) {
         LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet()));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index dba87cf..ea6db70 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.junit.Assert;
@@ -32,6 +33,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.EnumMap;
+import java.util.Map;
 
 public class PcapReceiverImplRestEasyTest {
 
@@ -65,7 +67,7 @@ public class PcapReceiverImplRestEasyTest {
     }
   }
 
-  final MockQueryHandler<EnumMap<Constants.Fields, String>> fixedQueryHandler = new MockQueryHandler<EnumMap<Constants.Fields, String>>();
+  final MockQueryHandler<Map<String, String>> fixedQueryHandler = new MockQueryHandler<>();
   final MockQueryHandler<String> queryQueryHandler = new MockQueryHandler<String>();
   PcapReceiverImplRestEasy fixedRestEndpoint = new PcapReceiverImplRestEasy() {{
     this.queryUtil = fixedQueryHandler;
@@ -89,32 +91,35 @@ public class PcapReceiverImplRestEasyTest {
     String dstPort = "100";
     long startTime = 100;
     long endTime = 1000;
+    String query = "`blah`";
 
     {
       boolean includeReverseTraffic = false;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
-      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
-      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
-      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
-      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName()));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName()));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName()));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName()));
       Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
       Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
-      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())));
+      Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
     }
     {
       boolean includeReverseTraffic = true;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
-      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
-      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
-      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
-      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName()));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName()));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName()));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName()));
       Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
       Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
-      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())));
+      Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
     }
   }
 
@@ -141,16 +146,18 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = 100;
     long endTime = 1000;
     boolean includeReverseTraffic = false;
-    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
+    String query = "`metron`";
+    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null);
     Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
     Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
-    Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
-    Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
-    Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
-    Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+    Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName()));
+    Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName()));
+    Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName()));
+    Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName()));
     Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
     Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
-    Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())));
+    Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
   }
 
   @Test
@@ -162,17 +169,19 @@ public class PcapReceiverImplRestEasyTest {
     String dstPort = "100";
     long startTime = 100;
     long endTime = 1000;
+    String query = null;
     boolean includeReverseTraffic = false;
-    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
+    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null);
     Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
     Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
-    Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
-    Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
-    Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
-    Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+    Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName()));
+    Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName()));
+    Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName()));
+    Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName()));
     Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
     Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
-    Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())));
+    Assert.assertEquals(query, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
   }
 
   @Test
@@ -185,17 +194,19 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = -1;
     long endTime = 1000;
     {
+      String query = "";
       boolean includeReverseTraffic = false;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, query, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
-      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
-      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
-      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
-      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName()));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName()));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName() ));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName()));
       Assert.assertEquals(0, fixedQueryHandler.beginNS);
       Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
-      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())));
+      Assert.assertEquals(null, fixedQueryHandler.fields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
     }
     {
       String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
@@ -219,16 +230,16 @@ public class PcapReceiverImplRestEasyTest {
     long endTime = -1;
     {
       boolean includeReverseTraffic = false;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
-      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
-      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
-      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
-      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR.getName()));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR.getName()));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT.getName()));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT.getName()));
       Assert.assertEquals(0, fixedQueryHandler.beginNS);
       Assert.assertTrue(fixedQueryHandler.endNS > 0);
-      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())));
     }
     {
       String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 1dc73da..c2ede49 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -17,7 +17,9 @@
  */
 package org.apache.metron.common;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class Constants {
@@ -33,7 +35,10 @@ public class Constants {
   public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
   public static final String GUID = "guid";
 
-  public static enum Fields {
+  public interface Field {
+    String getName();
+  }
+  public enum Fields implements Field {
      SRC_ADDR("ip_src_addr")
     ,SRC_PORT("ip_src_port")
     ,DST_ADDR("ip_dst_addr")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
index 11e024e..42913b2 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
@@ -36,7 +36,7 @@ public class GeoEnrichmentFunctions {
           ,namespace="GEO"
           ,description="Look up an IPV4 address and returns geographic information about it"
           ,params = {
-                      "ip - The IPV4 address to lookup" +
+                      "ip - The IPV4 address to lookup",
                       "fields - Optional list of GeoIP fields to grab. Options are locID, country, city, postalCode, dmaCode, latitude, longitude, location_point"
                     }
           ,returns = "If a Single field is requested a string of the field, If multiple fields a map of string of the fields, and null otherwise"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index e1a2683..5b554b5 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -44,11 +44,13 @@ sequence files.
 ## Configuration
 
 The configuration file for the Flux topology is located at
-`$METRON_HOME/config/etc/env/pcap.properties` and the possible options
+`$METRON_HOME/config/pcap.properties` and the possible options
 are as follows:
 * `spout.kafka.topic.pcap` : The kafka topic to listen to
+* `storm.auto.credentials` : The kerberos ticket renewal.  If running on a kerberized cluster, this should be `['org.apache.storm.security.auth.kerberos.AutoTGT']`
+* `kafka.security.protocol` : The security protocol to use for kafka.  This should be `PLAINTEXT` for a non-kerberized cluster and probably `SASL_PLAINTEXT` for a kerberized cluster.
 * `kafka.zk` : The comma separated zookeeper quorum (i.e.  host:2181,host2:2181)
-* `kafka.pcap.start` : One of `START`, `END`, `WHERE_I_LEFT_OFF` representing where to start listening on the queue. 
+* `kafka.pcap.start` : One of `EARLIEST`, `LATEST`, `UNCOMMITTED_EARLIEST`, `UNCOMMITTED_LATEST` representing where to start listening on the queue. 
 * `kafka.pcap.numPackets` : The number of packets to keep in one file.
 * `kafka.pcap.maxTimeMS` : The number of packets to keep in one file in terms of duration (in milliseconds).  For instance, you may only want to keep an hour's worth of packets in a given file.
 * `kafka.pcap.ts_scheme` : One of `FROM_KEY` or `FROM_VALUE`.  You really only want `FROM_KEY` as that fits the current tooling.  `FROM_VALUE` assumes that fully headerized packets are coming in on the value, which is legacy.
@@ -78,7 +80,7 @@ usage: PcapInspector
 ### Query Filter Utility
 This tool exposes the two methods for filtering PCAP data via a command line tool:
 - fixed
-- query (Metron Stellar)
+- query (via Stellar)
 
 The tool is executed via 
 ```
@@ -97,6 +99,7 @@ usage: Fixed filter options
                                  and end_time. Default is to use time in
                                  millis since the epoch.
  -dp,--ip_dst_port <arg>         Destination port
+ -pf,--packet_filter <arg>       Packet filter regex
  -et,--end_time <arg>            Packet end time range. Default is current
                                  system time.
  -nr,--num_reducers <arg>        The number of reducers to use.  Default
@@ -127,3 +130,23 @@ usage: Query filter options
  -q,--query <arg>                Query string to use as a filter
  -st,--start_time <arg>          (required) Packet start time range.
 ```
+
+The Query filter's `--query` argument specifies the Stellar expression to
+execute on each packet.  To interact with the packet, a few variables are exposed:
+* `packet` : The packet data (a `byte[]`)
+* `ip_src_addr` : The source address for the packet (a `String`)
+* `ip_src_port` : The source port for the packet (an `Integer`)
+* `ip_dst_addr` : The destination address for the packet (a `String`)
+* `ip_dst_port` : The destination port for the packet (an `Integer`)
+
+#### Binary Regex
+
+Filtering can be done both by the packet header as well as via a binary regular expression
+which can be run on the packet payload itself.  This filter can be specified via:
+* The `-pf` or `--packet_filter` options for the fixed query filter
+* The `BYTEARRAY_MATCHER(pattern, data)` Stellar function.
+The first argument is the regex pattern and the second argument is the data.
+The packet data will be exposed via the`packet` variable in Stellar.
+
+The format of this regular expression is described [here](https://github.com/nishihatapalmer/byteseek/blob/master/sequencesyntax.md).
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
index 897e0fd..df653e1 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
@@ -20,24 +20,26 @@ package org.apache.metron.pcap.query;
 import org.apache.metron.common.Constants;
 
 import java.util.EnumMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 public class FixedCliConfig extends CliConfig {
 
-  private EnumMap<Constants.Fields, String> fixedFields;
+  private Map<String, String> fixedFields;
 
   public FixedCliConfig() {
-    this.fixedFields = new EnumMap<>(Constants.Fields.class);
+    this.fixedFields = new LinkedHashMap<>();
   }
 
-  public EnumMap<Constants.Fields, String> getFixedFields() {
+  public Map<String, String> getFixedFields() {
     return fixedFields;
   }
 
-  public void setFixedFields(EnumMap<Constants.Fields, String> fixedFields) {
+  public void setFixedFields(Map<String, String> fixedFields) {
     this.fixedFields = fixedFields;
   }
 
-  public void putFixedField(Constants.Fields key, String value) {
+  public void putFixedField(String key, String value) {
     String trimmedVal = value != null ? value.trim() : null;
     if (!isNullOrEmpty(trimmedVal)) {
       this.fixedFields.put(key, value);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
index 1123cad..fda8692 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PcapHelper;
 
 public class FixedCliParser extends CliParser {
   private Options fixedOptions;
@@ -36,6 +37,7 @@ public class FixedCliParser extends CliParser {
     options.addOption(newOption("sp", "ip_src_port", true, "Source port"));
     options.addOption(newOption("dp", "ip_dst_port", true, "Destination port"));
     options.addOption(newOption("p", "protocol", true, "IP Protocol"));
+    options.addOption(newOption("pf", "packet_filter", true, "Packet Filter regex"));
     options.addOption(newOption("ir", "include_reverse", false, "Indicates if filter should check swapped src/dest addresses and IPs"));
     return options;
   }
@@ -51,12 +53,13 @@ public class FixedCliParser extends CliParser {
     CommandLine commandLine = getParser().parse(fixedOptions, args);
     FixedCliConfig config = new FixedCliConfig();
     super.parse(commandLine, config);
-    config.putFixedField(Constants.Fields.SRC_ADDR, commandLine.getOptionValue("ip_src_addr"));
-    config.putFixedField(Constants.Fields.DST_ADDR, commandLine.getOptionValue("ip_dst_addr"));
-    config.putFixedField(Constants.Fields.SRC_PORT, commandLine.getOptionValue("ip_src_port"));
-    config.putFixedField(Constants.Fields.DST_PORT, commandLine.getOptionValue("ip_dst_port"));
-    config.putFixedField(Constants.Fields.PROTOCOL, commandLine.getOptionValue("protocol"));
-    config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, Boolean.toString(commandLine.hasOption("include_reverse")));
+    config.putFixedField(Constants.Fields.SRC_ADDR.getName(), commandLine.getOptionValue("ip_src_addr"));
+    config.putFixedField(Constants.Fields.DST_ADDR.getName(), commandLine.getOptionValue("ip_dst_addr"));
+    config.putFixedField(Constants.Fields.SRC_PORT.getName(), commandLine.getOptionValue("ip_src_port"));
+    config.putFixedField(Constants.Fields.DST_PORT.getName(), commandLine.getOptionValue("ip_dst_port"));
+    config.putFixedField(Constants.Fields.PROTOCOL.getName(), commandLine.getOptionValue("protocol"));
+    config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), Boolean.toString(commandLine.hasOption("include_reverse")));
+    config.putFixedField(PcapHelper.PacketFields.PACKET_FILTER.getName(), commandLine.getOptionValue("packet_filter"));
     return config;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
index ef11af8..f460db3 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
@@ -35,10 +35,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.List;
+import java.util.*;
 
 public class PcapInspector {
   private static abstract class OptionHandler implements Function<String, Option> {}
@@ -143,13 +140,13 @@ public class PcapInspector {
       long millis = Long.divideUnsigned(key.get(), 1000000);
       String ts = DATE_FORMAT.format(new Date(millis));
       for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
-        EnumMap<Constants.Fields, Object> result = PcapHelper.packetToFields(pi);
+        Map<String, Object> result = PcapHelper.packetToFields(pi);
         List<String> fieldResults = new ArrayList<String>() {{
           add("TS: " + ts);
         }};
         for(Constants.Fields field : Constants.Fields.values()) {
-          if(result.containsKey(field)) {
-            fieldResults.add(field.getName() + ": " + result.get(field));
+          if(result.containsKey(field.getName())) {
+            fieldResults.add(field.getName() + ": " + result.get(field.getName()));
           }
         }
         System.out.println(Joiner.on(",").join(fieldResults));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
index 218d143..84969d3 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
@@ -24,29 +24,30 @@ import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
 
 public class FixedPcapFilterTest {
   @Test
   public void testTrivialEquality() throws Exception {
     Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    final Map<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
     }};
     new FixedPcapFilter.Configurator().addToConfig(fields, config);
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -58,23 +59,23 @@ public class FixedPcapFilterTest {
   @Test
   public void testReverseTraffic() throws Exception {
     Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    final Map<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
     }};
     new FixedPcapFilter.Configurator().addToConfig(fields, config);
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -85,12 +86,12 @@ public class FixedPcapFilterTest {
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "dst_ip");
-            put(Constants.Fields.SRC_PORT, 1);
-            put(Constants.Fields.DST_ADDR, "src_ip");
-            put(Constants.Fields.DST_PORT, 0);
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 1);
+            put(Constants.Fields.DST_ADDR.getName(), "src_ip");
+            put(Constants.Fields.DST_PORT.getName(), 0);
           }};
         }
       };
@@ -101,12 +102,12 @@ public class FixedPcapFilterTest {
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "dst_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "src_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected Map<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "src_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -117,22 +118,22 @@ public class FixedPcapFilterTest {
 @Test
 public void testMissingDstAddr() throws Exception {
   Configuration config = new Configuration();
-  final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-    put(Constants.Fields.SRC_ADDR, "src_ip");
-    put(Constants.Fields.SRC_PORT, "0");
-    put(Constants.Fields.DST_PORT, "1");
-    put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+  final HashMap<String, String> fields = new HashMap<String, String>() {{
+    put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+    put(Constants.Fields.SRC_PORT.getName(), "0");
+    put(Constants.Fields.DST_PORT.getName(), "1");
+    put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
   }};
   new FixedPcapFilter.Configurator().addToConfig(fields, config);
   {
     FixedPcapFilter filter = new FixedPcapFilter() {
       @Override
-      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-          put(Constants.Fields.SRC_ADDR, "src_ip");
-          put(Constants.Fields.SRC_PORT, 0);
-          put(Constants.Fields.DST_ADDR, "dst_ip");
-          put(Constants.Fields.DST_PORT, 1);
+      protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+        return new HashMap<String, Object>() {{
+          put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+          put(Constants.Fields.SRC_PORT.getName(), 0);
+          put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+          put(Constants.Fields.DST_PORT.getName(), 1);
         }};
       }
     };
@@ -143,12 +144,12 @@ public void testMissingDstAddr() throws Exception {
   {
     FixedPcapFilter filter = new FixedPcapFilter() {
       @Override
-      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-          put(Constants.Fields.SRC_ADDR, "src_ip1");
-          put(Constants.Fields.SRC_PORT, 0);
-          put(Constants.Fields.DST_ADDR, "dst_ip");
-          put(Constants.Fields.DST_PORT, 1);
+      protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+        return new HashMap<String, Object>() {{
+          put(Constants.Fields.SRC_ADDR.getName(), "src_ip1");
+          put(Constants.Fields.SRC_PORT.getName(), 0);
+          put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+          put(Constants.Fields.DST_PORT.getName(), 1);
         }};
       }
     };
@@ -159,22 +160,22 @@ public void testMissingDstAddr() throws Exception {
   @Test
   public void testMissingDstPort() throws Exception {
     Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
     }};
     new FixedPcapFilter.Configurator().addToConfig(fields, config);
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -185,12 +186,12 @@ public void testMissingDstAddr() throws Exception {
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 100);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
           }};
         }
       };
@@ -201,12 +202,12 @@ public void testMissingDstAddr() throws Exception {
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 100);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 100);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
           }};
         }
       };
@@ -217,22 +218,22 @@ public void testMissingDstAddr() throws Exception {
   @Test
   public void testMissingSrcAddr() throws Exception {
     Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_PORT.getName(), "0");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
     }};
     new FixedPcapFilter.Configurator().addToConfig(fields, config);
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -243,22 +244,22 @@ public void testMissingDstAddr() throws Exception {
   @Test
   public void testMissingSrcPort() throws Exception {
     Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    final HashMap<String, String> fields = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+      put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+      put(Constants.Fields.DST_PORT.getName(), "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
     }};
     new FixedPcapFilter.Configurator().addToConfig(fields, config);
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -269,12 +270,12 @@ public void testMissingDstAddr() throws Exception {
     {
       FixedPcapFilter filter = new FixedPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 100);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
index 5bb5d4a..7e3d55c 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.EnumMap;
+import java.util.HashMap;
 
 public class QueryPcapFilterTest {
 
@@ -37,12 +38,12 @@ public class QueryPcapFilterTest {
     {
       PcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -59,12 +60,12 @@ public class QueryPcapFilterTest {
     {
       PcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -81,12 +82,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -97,12 +98,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip_no_match");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -119,12 +120,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -135,12 +136,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 100);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
           }};
         }
       };
@@ -151,12 +152,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 100);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 100);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 100);
           }};
         }
       };
@@ -173,12 +174,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -195,12 +196,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 0);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };
@@ -211,12 +212,12 @@ public class QueryPcapFilterTest {
     {
       QueryPcapFilter filter = new QueryPcapFilter() {
         @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 100);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
+        protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+          return new HashMap<String, Object>() {{
+            put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+            put(Constants.Fields.SRC_PORT.getName(), 100);
+            put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+            put(Constants.Fields.DST_PORT.getName(), 1);
           }};
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 84e7574..a869723 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -109,6 +109,13 @@ public class PcapTopologyIntegrationTest {
     BytesWritable value = new BytesWritable();
     while (reader.next(key, value)) {
       byte[] pcapWithHeader = value.copyBytes();
+      //if you are debugging and want the hex dump of the packets, uncomment the following:
+
+      //for(byte b : pcapWithHeader) {
+      //  System.out.print(String.format("%02x", b));
+      //}
+      //System.out.println("");
+
       long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
       {
         List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
@@ -274,7 +281,7 @@ public class PcapTopologyIntegrationTest {
                         , getTimestamp(4, pcapEntries)
                         , getTimestamp(5, pcapEntries)
                         , 10
-                        , new EnumMap<>(Constants.Fields.class)
+                        , new HashMap<>()
                         , new Configuration()
                         , FileSystem.get(new Configuration())
                         , new FixedPcapFilter.Configurator()
@@ -307,8 +314,8 @@ public class PcapTopologyIntegrationTest {
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(1, pcapEntries)
                         , 10
-                        , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-                          put(Constants.Fields.DST_ADDR, "207.28.210.1");
+                        , new HashMap<String, String>() {{
+                          put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
                         }}
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -342,8 +349,8 @@ public class PcapTopologyIntegrationTest {
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(1, pcapEntries)
                         , 10
-                        , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-                          put(Constants.Fields.PROTOCOL, "foo");
+                        , new HashMap<String, String>() {{
+                          put(Constants.Fields.PROTOCOL.getName(), "foo");
                         }}
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -377,7 +384,7 @@ public class PcapTopologyIntegrationTest {
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
                         , 10
-                        , new EnumMap<>(Constants.Fields.class)
+                        , new HashMap<>()
                         , new Configuration()
                         , FileSystem.get(new Configuration())
                         , new FixedPcapFilter.Configurator()
@@ -409,8 +416,8 @@ public class PcapTopologyIntegrationTest {
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
                         , 10
-                        , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-                          put(Constants.Fields.DST_PORT, "22");
+                        , new HashMap<String, String>() {{
+                          put(Constants.Fields.DST_PORT.getName(), "22");
                         }}
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -433,6 +440,25 @@ public class PcapTopologyIntegrationTest {
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
       {
+        //test with query filter and byte array matching
+        Iterable<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , 10
+                        , "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                        , new QueryPcapFilter.Configurator()
+                );
+        assertInOrder(results);
+        Assert.assertEquals(1, Iterables.size(results));
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
+        Assert.assertTrue(baos.toByteArray().length > 0);
+      }
+      {
         //test with query filter
         Iterable<byte[]> results =
                 job.query(new Path(outDir.getAbsolutePath())

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index bad22e4..4f441f1 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -24,6 +24,7 @@ import org.apache.metron.common.Constants;
 import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
@@ -70,7 +71,8 @@ public class PcapCliTest {
             "-ip_dst_addr", "192.168.1.2",
             "-ip_src_port", "8081",
             "-ip_dst_port", "8082",
-            "-protocol", "6"
+            "-protocol", "6",
+            "-packet_filter", "`casey`"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
     Iterator iterator = pcaps.iterator();
@@ -79,13 +81,14 @@ public class PcapCliTest {
 
     Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
     Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
-    EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "192.168.1.1");
-      put(Constants.Fields.DST_ADDR, "192.168.1.2");
-      put(Constants.Fields.SRC_PORT, "8081");
-      put(Constants.Fields.DST_PORT, "8082");
-      put(Constants.Fields.PROTOCOL, "6");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    HashMap<String, String> query = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+      put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
+      put(Constants.Fields.SRC_PORT.getName(), "8081");
+      put(Constants.Fields.DST_PORT.getName(), "8082");
+      put(Constants.Fields.PROTOCOL.getName(), "6");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+      put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`");
     }};
 
     when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
@@ -120,13 +123,13 @@ public class PcapCliTest {
 
     Path base_path = new Path("/base/path");
     Path base_output_path = new Path("/base/output/path");
-    EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "192.168.1.1");
-      put(Constants.Fields.DST_ADDR, "192.168.1.2");
-      put(Constants.Fields.SRC_PORT, "8081");
-      put(Constants.Fields.DST_PORT, "8082");
-      put(Constants.Fields.PROTOCOL, "6");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    Map<String, String> query = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+      put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
+      put(Constants.Fields.SRC_PORT.getName(), "8081");
+      put(Constants.Fields.DST_PORT.getName(), "8082");
+      put(Constants.Fields.PROTOCOL.getName(), "6");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
     }};
 
     when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
@@ -162,13 +165,13 @@ public class PcapCliTest {
 
     Path base_path = new Path("/base/path");
     Path base_output_path = new Path("/base/output/path");
-    EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "192.168.1.1");
-      put(Constants.Fields.DST_ADDR, "192.168.1.2");
-      put(Constants.Fields.SRC_PORT, "8081");
-      put(Constants.Fields.DST_PORT, "8082");
-      put(Constants.Fields.PROTOCOL, "6");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    Map<String, String> query = new HashMap<String, String>() {{
+      put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+      put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
+      put(Constants.Fields.SRC_PORT.getName(), "8081");
+      put(Constants.Fields.DST_PORT.getName(), "8082");
+      put(Constants.Fields.PROTOCOL.getName(), "6");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
     }};
 
     long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index 16a34d1..73f57f0 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -26,6 +26,18 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>net.byteseek</groupId>
+            <artifactId>byteseek</artifactId>
+            <version>2.0.3</version>
+            <exclusions>
+                <!-- This is a LGPL dependency of byteseek, so we need to replace/mimic it minimally -->
+                <exclusion>
+                    <groupId>net.sf.trove4j</groupId>
+                    <artifactId>trove4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-hbase</artifactId>
             <version>${project.parent.version}</version>
@@ -126,6 +138,13 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${global_hadoop_version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
index fcaf1b0..f3a7e25 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
@@ -37,6 +37,9 @@ import org.apache.metron.pcap.utils.PcapUtils;
  */
 public class PacketInfo {
 
+  /** The packet data */
+  private byte[] packetBytes = null;
+
   /** The packetHeader. */
   private PacketHeader packetHeader = null;
 
@@ -158,9 +161,18 @@ public class PacketInfo {
    *          the tcp packet
    * @param udpPacket
    *          the udp packet
+   * @param packetBytes
+   *          the raw packet data
    */
-  public PacketInfo(GlobalHeader globalHeader, PacketHeader packetHeader, PcapPacket packet, Ipv4Packet ipv4Packet, TcpPacket tcpPacket,
-      UdpPacket udpPacket) {
+  public PacketInfo( GlobalHeader globalHeader
+                   , PacketHeader packetHeader
+                   , PcapPacket packet
+                   , Ipv4Packet ipv4Packet
+                   , TcpPacket tcpPacket
+                   , UdpPacket udpPacket
+                   , byte[] packetBytes
+                   ) {
+    this.packetBytes = packetBytes;
     this.packetHeader = packetHeader;
     this.packet = packet;
     this.ipv4Packet = ipv4Packet;
@@ -177,6 +189,15 @@ public class PacketInfo {
   public GlobalHeader getGlobalHeader() {
     return globalHeader;
   }
+  /**
+   * Gets the packet raw data.
+   *
+   *
+   * @return the packet data
+   */
+  public byte[] getPacketBytes() {
+    return packetBytes;
+  }
 
   /**
    * Gets the packet header.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index e48824f..ebd7ac7 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -39,9 +39,7 @@ import org.krakenapps.pcap.util.ByteOrderConverter;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.List;
+import java.util.*;
 
 import static org.apache.metron.pcap.Constants.*;
 
@@ -50,6 +48,22 @@ public class PcapHelper {
   public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES;
   public static final int GLOBAL_HEADER_SIZE = 24;
   private static final Logger LOG = Logger.getLogger(PcapHelper.class);
+
+  public enum PacketFields implements org.apache.metron.common.Constants.Field{
+    PACKET_DATA("packet"),
+    PACKET_FILTER("packet_filter")
+    ;
+    String name;
+    PacketFields(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+  }
+
   public static ThreadLocal<MetronEthernetDecoder> ETHERNET_DECODER = new ThreadLocal<MetronEthernetDecoder>() {
     @Override
     protected MetronEthernetDecoder initialValue() {
@@ -190,23 +204,25 @@ public class PcapHelper {
     System.arraycopy(packet, 0, ret, offset, packet.length);
     return ret;
   }
-  public static EnumMap<org.apache.metron.common.Constants.Fields, Object> packetToFields(PacketInfo pi) {
-    EnumMap<org.apache.metron.common.Constants.Fields, Object> ret = new EnumMap(org.apache.metron.common.Constants.Fields.class);
+
+  public static Map<String, Object> packetToFields(PacketInfo pi) {
+    Map<String, Object> ret = new HashMap<>();
+    ret.put(PacketFields.PACKET_DATA.getName(), pi.getPacketBytes());
     if(pi.getTcpPacket() != null) {
       if(pi.getTcpPacket().getSourceAddress() != null) {
-        ret.put(org.apache.metron.common.Constants.Fields.SRC_ADDR, pi.getTcpPacket().getSourceAddress().getHostAddress());
+        ret.put(org.apache.metron.common.Constants.Fields.SRC_ADDR.getName(), pi.getTcpPacket().getSourceAddress().getHostAddress());
       }
       if(pi.getTcpPacket().getSource() != null ) {
-        ret.put(org.apache.metron.common.Constants.Fields.SRC_PORT, pi.getTcpPacket().getSource().getPort());
+        ret.put(org.apache.metron.common.Constants.Fields.SRC_PORT.getName(), pi.getTcpPacket().getSource().getPort());
       }
       if(pi.getTcpPacket().getDestinationAddress() != null ) {
-        ret.put(org.apache.metron.common.Constants.Fields.DST_ADDR, pi.getTcpPacket().getDestinationAddress().getHostAddress());
+        ret.put(org.apache.metron.common.Constants.Fields.DST_ADDR.getName(), pi.getTcpPacket().getDestinationAddress().getHostAddress());
       }
       if(pi.getTcpPacket().getDestination() != null ) {
-        ret.put(org.apache.metron.common.Constants.Fields.DST_PORT, pi.getTcpPacket().getDestination().getPort());
+        ret.put(org.apache.metron.common.Constants.Fields.DST_PORT.getName(), pi.getTcpPacket().getDestination().getPort());
       }
       if(pi.getIpv4Packet() != null) {
-        ret.put(org.apache.metron.common.Constants.Fields.PROTOCOL, pi.getIpv4Packet().getProtocol());
+        ret.put(org.apache.metron.common.Constants.Fields.PROTOCOL.getName(), pi.getIpv4Packet().getProtocol());
       }
     }
     return ret;
@@ -281,7 +297,7 @@ public class PcapHelper {
         }
 
         packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
-            ipv4Packet, tcpPacket, udpPacket));
+            ipv4Packet, tcpPacket, udpPacket, pcap));
       } catch (NegativeArraySizeException ignored) {
         LOG.debug("Ignorable exception while parsing packet.", ignored);
       } catch (EOFException eof) { // $codepro.audit.disable logExceptions

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
index 4aeec6c..e5a15e7 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
@@ -22,17 +22,19 @@ import org.apache.metron.common.Constants;
 import org.apache.metron.common.dsl.VariableResolver;
 
 import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
 
 public class PcapFieldResolver implements VariableResolver {
-  EnumMap<Constants.Fields, Object> fieldsMap = null;
+  Map<String, Object> fieldsMap = new HashMap<>();
 
-  public PcapFieldResolver(EnumMap<Constants.Fields, Object> fieldsMap) {
+  public PcapFieldResolver(Map<String, Object> fieldsMap) {
     this.fieldsMap = fieldsMap;
   }
 
   @Override
   public Object resolve(String variable) {
-    return fieldsMap.get(Constants.Fields.fromString(variable));
+    return fieldsMap.get(variable);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/bf2528fd/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
index c7168aa..6d775df 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
@@ -23,6 +23,6 @@ import org.apache.metron.pcap.PacketInfo;
 import java.util.Map;
 import java.util.function.Predicate;
 
-public interface PcapFilter extends Predicate<PacketInfo> {
+public interface PcapFilter extends Predicate<PacketInfo>{
   void configure(Iterable<Map.Entry<String, String>> config);
 }