You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/19 13:18:19 UTC

[1/2] incubator-metron git commit: METRON-155 Added query filtering capability for PCAP via Metron REST API (mmiklavcic via cestella) closes apache/incubator-metron#119

Repository: incubator-metron
Updated Branches:
  refs/heads/master df8d682e8 -> 3803df2f3


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
new file mode 100644
index 0000000..ea46cc3
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.fixed;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.query.VariableResolver;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.PcapFilters;
+import org.apache.metron.pcap.filter.PcapFieldResolver;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+
+public class FixedPcapFilter implements PcapFilter {
+
+  public static class Configurator implements PcapFilterConfigurator<EnumMap<Constants.Fields, String>> {
+    @Override
+    public void addToConfig(EnumMap<Constants.Fields, String> fields, Configuration conf) {
+      for (Map.Entry<Constants.Fields, String> kv : fields.entrySet()) {
+        conf.set(kv.getKey().getName(), kv.getValue());
+      }
+      conf.set(PCAP_FILTER_NAME_CONF, PcapFilters.FIXED.name());
+    }
+
+    @Override
+    public String queryToString(EnumMap<Constants.Fields, String> fields) {
+      return (fields == null ? "" : Joiner.on("_").join(fields.values()));
+    }
+  }
+
+  private String srcAddr;
+  private Integer srcPort;
+  private String dstAddr;
+  private Integer dstPort;
+  private String protocol;
+  private boolean includesReverseTraffic = false;
+
+  @Override
+  public void configure(Iterable<Map.Entry<String, String>> config) {
+    for (Map.Entry<String, String> kv : config) {
+      if (kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) {
+        this.dstAddr = kv.getValue();
+      }
+      if (kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) {
+        this.srcAddr = kv.getValue();
+      }
+      if (kv.getKey().equals(Constants.Fields.DST_PORT.getName())) {
+        this.dstPort = Integer.parseInt(kv.getValue());
+      }
+      if (kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) {
+        this.srcPort = Integer.parseInt(kv.getValue());
+      }
+      if (kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) {
+        this.protocol = kv.getValue();
+      }
+      if (kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) {
+        this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue());
+      }
+    }
+  }
+
+
+  @Override
+  public boolean test(PacketInfo pi) {
+    VariableResolver resolver = new PcapFieldResolver(packetToFields(pi));
+    String srcAddrIn = resolver.resolve(Constants.Fields.SRC_ADDR.getName());
+    String srcPortIn = resolver.resolve(Constants.Fields.SRC_PORT.getName());
+    String dstAddrIn = resolver.resolve(Constants.Fields.DST_ADDR.getName());
+    String dstPortIn = resolver.resolve(Constants.Fields.DST_PORT.getName());
+    String protocolIn = resolver.resolve(Constants.Fields.PROTOCOL.getName());
+
+    if (areMatch(protocol, protocolIn)) {
+      if (matchesSourceAndDestination(srcAddrIn, srcPortIn, dstAddrIn, dstPortIn)) {
+        return true;
+      } else if (includesReverseTraffic) {
+        return matchesReverseSourceAndDestination(srcAddrIn, srcPortIn, dstAddrIn, dstPortIn);
+      }
+    }
+    return false;
+  }
+
+  private boolean areMatch(Integer filter, String input) {
+    return filter == null || areMatch(filter.toString(), input);
+  }
+
+  private boolean areMatch(String filter, String input) {
+    if (filter != null) {
+      return input != null && input.equals(filter);
+    } else {
+      return true;
+    }
+  }
+
+  protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+    return PcapHelper.packetToFields(pi);
+  }
+
+  private boolean matchesSourceAndDestination(String srcAddrComp,
+                                              String srcPortComp,
+                                              String dstAddrComp,
+                                              String dstPortComp) {
+    boolean isMatch = true;
+    isMatch &= areMatch(this.srcAddr, srcAddrComp);
+    isMatch &= areMatch(this.srcPort, srcPortComp);
+    isMatch &= areMatch(this.dstAddr, dstAddrComp);
+    isMatch &= areMatch(this.dstPort, dstPortComp);
+    return isMatch;
+  }
+
+  private boolean matchesReverseSourceAndDestination(String srcAddr,
+                                                     String srcPort,
+                                                     String dstAddr,
+                                                     String dstPort) {
+    return matchesSourceAndDestination(dstAddr, dstPort, srcAddr, srcPort);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
new file mode 100644
index 0000000..e97ed82
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.query;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.query.PredicateProcessor;
+import org.apache.metron.common.query.VariableResolver;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.PcapFieldResolver;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.PcapFilters;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public class QueryPcapFilter implements PcapFilter {
+  public static final String QUERY_STR_CONFIG = "mql";
+
+  public static class Configurator implements PcapFilterConfigurator<String> {
+    @Override
+    public void addToConfig(String query, Configuration conf) {
+      conf.set(QUERY_STR_CONFIG, query);
+      conf.set(PCAP_FILTER_NAME_CONF, PcapFilters.QUERY.name());
+    }
+
+    @Override
+    public String queryToString(String fields) {
+      return (fields == null ? "" :
+              fields.trim().replaceAll("\\s", "_")
+                      .replace(".", "-")
+                      .replace("'", "")
+      );
+    }
+  }
+
+  private String queryString = null;
+  private PredicateProcessor predicateProcessor = new PredicateProcessor();
+
+  @Override
+  public void configure(Iterable<Map.Entry<String, String>> config) {
+    for (Map.Entry<String, String> entry : config) {
+      if (entry.getKey().equals(QUERY_STR_CONFIG)) {
+        queryString = entry.getValue();
+      }
+    }
+    predicateProcessor.validate(queryString);
+  }
+
+  @Override
+  public boolean test(PacketInfo input) {
+    EnumMap<Constants.Fields, Object> fields = packetToFields(input);
+    VariableResolver resolver = new PcapFieldResolver(fields);
+    return predicateProcessor.parse(queryString, resolver);
+  }
+
+  protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+    return PcapHelper.packetToFields(pi);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
deleted file mode 100644
index 2952a0a..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap.mr;
-
-import com.google.common.base.Predicate;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.PacketInfo;
-import org.apache.metron.pcap.PcapHelper;
-
-import javax.annotation.Nullable;
-import java.util.EnumMap;
-import java.util.Map;
-
-
-public class PcapFilter implements Predicate<PacketInfo> {
-
-  private String srcAddr;
-  private Integer srcPort;
-  private String dstAddr;
-  private Integer dstPort;
-  private String protocol;
-  private boolean includesReverseTraffic = false;
-
-
-  public PcapFilter(Iterable<Map.Entry<String, String>> config) {
-    for(Map.Entry<String, String> kv : config) {
-      if(kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) {
-        this.dstAddr = kv.getValue();
-      }
-      if(kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) {
-        this.srcAddr = kv.getValue();
-      }
-      if(kv.getKey().equals(Constants.Fields.DST_PORT.getName())) {
-        this.dstPort = Integer.parseInt(kv.getValue());
-      }
-      if(kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) {
-        this.srcPort = Integer.parseInt(kv.getValue());
-      }
-      if(kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) {
-        this.protocol= kv.getValue();
-      }
-      if(kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) {
-        this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue());
-      }
-    }
-  }
-
-  private boolean matchSourceAndDestination(Object srcAddrObj
-                                           , Object srcPortObj
-                                           , Object dstAddrObj
-                                           , Object dstPortObj
-                                            )
-  {
-    boolean isMatch = true;
-    if(srcAddr != null ) {
-      Object o = srcAddrObj;
-      isMatch &= o != null && o instanceof String && ((String)o).equals(srcAddr);
-    }
-    if(isMatch && srcPort != null ) {
-      Object o = srcPortObj;
-      isMatch &= o != null && o.toString().equals(srcPort.toString());
-    }
-    if(isMatch && dstAddr != null ) {
-      Object o = dstAddrObj;
-      isMatch &= o != null &&  o instanceof String && ((String)o).equals(dstAddr);
-    }
-    if(isMatch && dstPort != null) {
-      Object o = dstPortObj;
-      isMatch &= o != null && o.toString().equals(dstPort.toString());
-    }
-    return isMatch;
-  }
-
-
-  protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-    return PcapHelper.packetToFields(pi);
-  }
-
-  @Override
-  public boolean apply(@Nullable PacketInfo pi ) {
-    boolean isMatch = true;
-    EnumMap<Constants.Fields, Object> input= packetToFields(pi);
-    Object srcAddrObj = input.get(Constants.Fields.SRC_ADDR);
-    Object srcPortObj = input.get(Constants.Fields.SRC_PORT);
-    Object dstAddrObj = input.get(Constants.Fields.DST_ADDR);
-    Object dstPortObj = input.get(Constants.Fields.DST_PORT);
-    Object protocolObj = input.get(Constants.Fields.PROTOCOL);
-
-    //first we ensure the protocol matches if you pass one in
-    if(isMatch && protocol != null ) {
-      Object o = protocolObj;
-      isMatch &= o != null && o.toString().equals(protocol);
-    }
-    if(isMatch) {
-      //if we're still a match, then we try to match the source and destination
-      isMatch &= matchSourceAndDestination(srcAddrObj, srcPortObj, dstAddrObj, dstPortObj);
-      if (!isMatch && includesReverseTraffic) {
-        isMatch = true;
-        //then we have to try the other direction if that the forward direction isn't a match
-        isMatch &= matchSourceAndDestination(dstAddrObj, dstPortObj, srcAddrObj, srcPortObj);
-      }
-    }
-    return isMatch;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 0dc6dc9..3543b1d 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -18,12 +18,12 @@
 
 package org.apache.metron.pcap.mr;
 
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -33,41 +33,53 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.log4j.Logger;
-import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.PcapFilters;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.stream.Stream;
 
 public class PcapJob {
   private static final Logger LOG = Logger.getLogger(PcapJob.class);
-
   public static class PcapMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
     public static final String START_TS_CONF = "start_ts";
     public static final String END_TS_CONF = "end_ts";
     PcapFilter filter;
     long start;
     long end;
+
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
       super.setup(context);
-      filter = new PcapFilter(context.getConfiguration());
+      filter = PcapFilters.valueOf(context.getConfiguration().get(PcapFilterConfigurator.PCAP_FILTER_NAME_CONF)).create();
+      filter.configure(context.getConfiguration());
       start = Long.parseUnsignedLong(context.getConfiguration().get(START_TS_CONF));
       end = Long.parseUnsignedLong(context.getConfiguration().get(END_TS_CONF));
     }
 
     @Override
     protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
-      if(Long.compareUnsigned(key.get() ,start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) {
-        boolean send = Iterables.size(Iterables.filter(PcapHelper.toPacketInfo(value.copyBytes()), filter)) > 0;
+      if (Long.compareUnsigned(key.get(), start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) {
+        // It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1
+        // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
+        // objects back to byte arrays, otherwise we could support more than one packet.
+        // Note: short-circuit findAny() func on stream
+        boolean send = filteredPacketInfo(value).findAny().isPresent();
         if (send) {
           context.write(key, value);
         }
       }
     }
+
+    private Stream<PacketInfo> filteredPacketInfo(BytesWritable value) throws IOException {
+      return PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter);
+    }
   }
 
   public static class PcapReducer extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {
@@ -154,26 +166,23 @@ public class PcapJob {
     return ret;
   }
 
-  private static String queryToString(EnumMap<Constants.Fields, String> fields) {
-    return Joiner.on("_").join(fields.values());
-  }
-
-  public List<byte[]> query(Path basePath
+  public <T> List<byte[]> query(Path basePath
                             , Path baseOutputPath
                             , long beginNS
                             , long endNS
-                            , EnumMap<Constants.Fields, String> fields
+                            , T fields
                             , Configuration conf
                             , FileSystem fs
+                            , PcapFilterConfigurator<T> filterImpl
                             ) throws IOException, ClassNotFoundException, InterruptedException {
-    String fileName = Joiner.on("_").join(beginNS, endNS, queryToString(fields), UUID.randomUUID().toString());
+    String fileName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString());
     if(LOG.isDebugEnabled()) {
       DateFormat format = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG
                                                               , SimpleDateFormat.LONG
                                                               );
       String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000)));
       String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000)));
-      LOG.debug("Executing query " + queryToString(fields) + " on timerange " + from + " to " + to);
+      LOG.debug("Executing query " + filterImpl.queryToString(fields) + " on timerange " + from + " to " + to);
     }
     Path outputPath =  new Path(baseOutputPath, fileName);
     Job job = createJob( basePath
@@ -183,6 +192,7 @@ public class PcapJob {
                        , fields
                        , conf
                        , fs
+                       , filterImpl
                        );
     boolean completed = job.waitForCompletion(true);
     if(completed) {
@@ -194,24 +204,21 @@ public class PcapJob {
   }
 
 
-  public static void addToConfig(EnumMap<Constants.Fields, String> fields, Configuration conf) {
-    for(Map.Entry<Constants.Fields, String> kv : fields.entrySet()) {
-      conf.set(kv.getKey().getName(), kv.getValue());
-    }
-  }
 
-  public Job createJob( Path basePath
+
+  public <T> Job createJob( Path basePath
                       , Path outputPath
                       , long beginNS
                       , long endNS
-                      , EnumMap<Constants.Fields, String> fields
+                      , T fields
                       , Configuration conf
                       , FileSystem fs
+                      , PcapFilterConfigurator<T> filterImpl
                       ) throws IOException
   {
     conf.set(PcapMapper.START_TS_CONF, Long.toUnsignedString(beginNS));
     conf.set(PcapMapper.END_TS_CONF, Long.toUnsignedString(endNS));
-    addToConfig(fields, conf);
+    filterImpl.addToConfig(fields, conf);
     Job job = new Job(conf);
     job.setJarByClass(PcapJob.class);
     job.setMapperClass(PcapJob.PcapMapper.class);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java
new file mode 100644
index 0000000..75871e9
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class PcapFiltersTest {
+
+  @Test
+  public void creates_pcap_filters() throws Exception {
+    Assert.assertThat("filter type should be Fixed", PcapFilters.FIXED.create(), instanceOf(FixedPcapFilter.class));
+    Assert.assertThat("filter type should be Query", PcapFilters.QUERY.create(), instanceOf(QueryPcapFilter.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
new file mode 100644
index 0000000..b75b9c8
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.fixed;
+
+import org.apache.metron.common.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class FixedPcapFilterTest {
+
+  @Test
+  public void string_representation_of_query_gets_formatted() throws Exception {
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    String actual = new FixedPcapFilter.Configurator().queryToString(fields);
+    String expected = "src_ip_0_dst_ip_1_false";
+    Assert.assertThat("string representation did not match", actual, equalTo(expected));
+  }
+
+  @Test
+  public void string_representation_of_empty_fields_empty() throws Exception {
+    {
+      final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class);
+      String actual = new FixedPcapFilter.Configurator().queryToString(fields);
+      String expected = "";
+      Assert.assertThat("string representation did not match", actual, equalTo(expected));
+    }
+    {
+      String actual = new FixedPcapFilter.Configurator().queryToString(null);
+      String expected = "";
+      Assert.assertThat("string representation did not match", actual, equalTo(expected));
+    }
+    {
+      final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+        put(Constants.Fields.SRC_ADDR, "");
+        put(Constants.Fields.SRC_PORT, "");
+      }};
+      String actual = new FixedPcapFilter.Configurator().queryToString(fields);
+      String expected = "_";
+      Assert.assertThat("string representation did not match", actual, equalTo(expected));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
new file mode 100644
index 0000000..061066e
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.query;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class QueryPcapFilterTest {
+
+  @Test
+  public void string_representation_of_query_gets_formatted() throws Exception {
+    String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+    String actual = new QueryPcapFilter.Configurator().queryToString(query);
+    String expected = "ip_src_addr_==_srcIp_and_ip_src_port_==_80_and_ip_dst_addr_==_dstIp_and_ip_dst_port_==_100_and_protocol_==_protocol";
+    Assert.assertThat("string representation did not match", actual, equalTo(expected));
+  }
+
+  @Test
+  public void string_representation_of_empty_query_empty() throws Exception {
+    {
+      String query = "";
+      String actual = new QueryPcapFilter.Configurator().queryToString(query);
+      String expected = "";
+      Assert.assertThat("string representation did not match", actual, equalTo(expected));
+    }
+    {
+      String query = " ";
+      String actual = new QueryPcapFilter.Configurator().queryToString(query);
+      String expected = "";
+      Assert.assertThat("string representation did not match", actual, equalTo(expected));
+    }
+    {
+      String query = null;
+      String actual = new QueryPcapFilter.Configurator().queryToString(query);
+      String expected = "";
+      Assert.assertThat("string representation did not match", actual, equalTo(expected));
+    }
+  }
+
+}


[2/2] incubator-metron git commit: METRON-155 Added query filtering capability for PCAP via Metron REST API (mmiklavcic via cestella) closes apache/incubator-metron#119

Posted by ce...@apache.org.
METRON-155 Added query filtering capability for PCAP via Metron REST API (mmiklavcic via cestella) closes apache/incubator-metron#119


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/3803df2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/3803df2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/3803df2f

Branch: refs/heads/master
Commit: 3803df2f319738c28b1992a3a9206f5eb65ab6c0
Parents: df8d682
Author: mmiklavc <mi...@gmail.com>
Authored: Thu May 19 09:18:09 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu May 19 09:18:09 2016 -0400

----------------------------------------------------------------------
 .../pcapservice/PcapReceiverImplRestEasy.java   | 115 ++++++--
 .../PcapReceiverImplRestEasyTest.java           | 192 ++++++++-----
 .../org/apache/metron/common/Constants.java     |  24 +-
 .../java/org/apache/metron/common/Creator.java  |  22 ++
 .../metron/common/query/PredicateProcessor.java |  12 +-
 .../metron/common/query/QueryParserTest.java    |  13 +-
 .../apache/metron/pcap/FixedPcapFilterTest.java | 285 +++++++++++++++++++
 .../org/apache/metron/pcap/PcapFilterTest.java  | 269 -----------------
 .../apache/metron/pcap/QueryPcapFilterTest.java | 227 +++++++++++++++
 .../PcapTopologyIntegrationTest.java            | 103 ++++++-
 .../metron/pcap/filter/PcapFieldResolver.java   |  42 +++
 .../apache/metron/pcap/filter/PcapFilter.java   |  28 ++
 .../pcap/filter/PcapFilterConfigurator.java     |  27 ++
 .../apache/metron/pcap/filter/PcapFilters.java  |  49 ++++
 .../pcap/filter/fixed/FixedPcapFilter.java      | 139 +++++++++
 .../pcap/filter/query/QueryPcapFilter.java      |  78 +++++
 .../org/apache/metron/pcap/mr/PcapFilter.java   | 121 --------
 .../java/org/apache/metron/pcap/mr/PcapJob.java |  59 ++--
 .../metron/pcap/filter/PcapFiltersTest.java     |  35 +++
 .../pcap/filter/fixed/FixedPcapFilterTest.java  |  69 +++++
 .../pcap/filter/query/QueryPcapFilterTest.java  |  58 ++++
 21 files changed, 1444 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 1d0beb8..9510318 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -17,29 +17,29 @@
  */
 package org.apache.metron.pcapservice;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.List;
-
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
 
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.List;
+
 @Path("/")
 public class PcapReceiverImplRestEasy {
 
@@ -98,18 +98,84 @@ public class PcapReceiverImplRestEasy {
     return false;
   }
 
-	  /*
-	   * (non-Javadoc)
-	   * 
-	   * @see
-	   * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang
-	   * .String, java.lang.String, java.lang.String, java.lang.String,
-	   * java.lang.String, long, long, boolean,
-	   * javax.servlet.http.HttpServletResponse)
-	   */
+  /**
+   * Enable filtering PCAP results by query filter string and start/end packet TS
+   *
+   * @param query Filter results based on this query
+   * @param startTime Only return packets originating after this start time
+   * @param endTime Only return packets originating before this end time
+   * @param servlet_response
+   * @return REST response
+   * @throws IOException
+   */
   @GET
-  @Path("/pcapGetter/getPcapsByIdentifiers")
+  @Path("/pcapGetter/getPcapsByQuery")
+  public Response getPcapsByIdentifiers(
+          @QueryParam ("query") String query,
+          @DefaultValue("-1") @QueryParam ("startTime")long startTime,
+          @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+          @Context HttpServletResponse servlet_response)
 
+          throws IOException {
+    PcapsResponse response = new PcapsResponse();
+    try {
+      if (startTime < 0) {
+        startTime = 0L;
+      }
+      if (endTime < 0) {
+        endTime = System.currentTimeMillis();
+      }
+      if(query == null) {
+        return Response.serverError().status(Response.Status.NO_CONTENT)
+                .entity("Query is null").build();
+      }
+      //convert to nanoseconds since the epoch
+      startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime);
+      endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime);
+      if(LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Query received: " + query);
+      }
+      response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+              , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
+              , startTime
+              , endTime
+              , query
+              , CONFIGURATION.get()
+              , FileSystem.get(CONFIGURATION.get())
+              , new QueryPcapFilter.Configurator()
+              )
+      );
+
+    } catch (Exception e) {
+      LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+              e);
+      throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+    }
+
+    // return http status '200 OK' along with the complete pcaps response file,
+    // and headers
+    return Response
+            .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+            .status(200).build();
+  }
+
+  /**
+   * Enable filtering PCAP results by fixed properties and start/end packet TS
+   *
+   * @param srcIp filter value
+   * @param dstIp filter value
+   * @param protocol filter value
+   * @param srcPort filter value
+   * @param dstPort filter value
+   * @param startTime filter value
+   * @param endTime filter value
+   * @param includeReverseTraffic Indicates if filter should check swapped src/dest addresses and IPs
+   * @param servlet_response
+   * @return REST response
+   * @throws IOException
+   */
+  @GET
+  @Path("/pcapGetter/getPcapsByIdentifiers")
   public Response getPcapsByIdentifiers(
           @QueryParam ("srcIp") String srcIp,
           @QueryParam ("dstIp") String dstIp,
@@ -174,6 +240,7 @@ public class PcapReceiverImplRestEasy {
                                     , query
                                     , CONFIGURATION.get()
                                     , FileSystem.get(CONFIGURATION.get())
+                                    , new FixedPcapFilter.Configurator()
                                     )
                      );
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index eab4998..1793b06 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,33 +34,43 @@ import java.util.EnumMap;
 import java.util.List;
 
 public class PcapReceiverImplRestEasyTest {
-  public static class MockQueryHandler extends PcapJob {
+
+  public static class MockQueryHandler<R> extends PcapJob {
     Path basePath;
     Path baseOutputPath;
     long beginNS;
     long endNS;
-    EnumMap<Constants.Fields, String> fields;
+    R fields;
+    PcapFilterConfigurator<R> filterImpl;
+
     @Override
-    public List<byte[]> query( Path basePath
+    public <T> List<byte[]> query( Path basePath
             , Path baseOutputPath
             , long beginNS
             , long endNS
-            , EnumMap<Constants.Fields, String> fields
+            , T fields
             , Configuration conf
             , FileSystem fs
+            , PcapFilterConfigurator<T> filterImpl
     ) throws IOException, ClassNotFoundException, InterruptedException
     {
       this.basePath = basePath;
       this.baseOutputPath = baseOutputPath;
       this.beginNS = beginNS;
       this.endNS = endNS;
-      this.fields = fields;
+      this.fields = (R) fields;
+      this.filterImpl = (PcapFilterConfigurator<R>) filterImpl;
       return null;
     }
   }
-  final MockQueryHandler queryHandler = new MockQueryHandler();
-  PcapReceiverImplRestEasy restEndpoint = new PcapReceiverImplRestEasy() {{
-      this.queryUtil = queryHandler;
+
+  final MockQueryHandler<EnumMap<Constants.Fields, String>> fixedQueryHandler = new MockQueryHandler<EnumMap<Constants.Fields, String>>();
+  final MockQueryHandler<String> queryQueryHandler = new MockQueryHandler<String>();
+  PcapReceiverImplRestEasy fixedRestEndpoint = new PcapReceiverImplRestEasy() {{
+    this.queryUtil = fixedQueryHandler;
+  }};
+  PcapReceiverImplRestEasy queryRestEndpoint = new PcapReceiverImplRestEasy() {{
+      this.queryUtil = queryQueryHandler;
   }};
 
   @Before
@@ -69,7 +80,7 @@ public class PcapReceiverImplRestEasyTest {
   }
 
   @Test
-  public void testNormalPath() throws Exception {
+  public void testNormalFixedPath() throws Exception {
     String srcIp = "srcIp";
     String dstIp = "dstIp";
     String protocol = "protocol";
@@ -80,33 +91,46 @@ public class PcapReceiverImplRestEasyTest {
 
     {
       boolean includeReverseTraffic = false;
-      restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
-      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
-      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
-      Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
-      Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
-      Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
-      Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
-      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
-      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
-      Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+      Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
     }
     {
       boolean includeReverseTraffic = true;
-      restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
-      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
-      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
-      Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
-      Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
-      Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
-      Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
-      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
-      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
-      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+      Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
     }
   }
 
   @Test
+  public void testNormalQueryPath() throws Exception {
+    long startTime = 100;
+    long endTime = 1000;
+    String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+    queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
+    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
+    Assert.assertEquals(query, queryQueryHandler.fields);
+    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryQueryHandler.beginNS);
+    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryQueryHandler.endNS);
+  }
+
+  @Test
   public void testNullSrcIp() throws Exception {
     String srcIp = null;
     String dstIp = "dstIp";
@@ -116,16 +140,16 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = 100;
     long endTime = 1000;
     boolean includeReverseTraffic = false;
-    restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
-    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
-    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
-    Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
-    Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
-    Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
-    Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
-    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
-    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
-    Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+    Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+    Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+    Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+    Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+    Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
   }
 
   @Test
@@ -138,60 +162,82 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = 100;
     long endTime = 1000;
     boolean includeReverseTraffic = false;
-    restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
-    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
-    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
-    Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
-    Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
-    Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
-    Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
-    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
-    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
-    Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+    Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+    Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+    Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+    Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+    Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
   }
 
   @Test
   public void testEmptyStartTime() throws Exception {
     String srcIp = "srcIp";
-    String dstIp = null;
+    String dstIp = "dstIp";
     String protocol = "protocol";
     String srcPort = "80";
     String dstPort = "100";
     long startTime = -1;
     long endTime = 1000;
-    boolean includeReverseTraffic = false;
-    restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
-    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
-    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
-    Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
-    Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
-    Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
-    Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
-    Assert.assertEquals(0, queryHandler.beginNS);
-    Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
-    Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    {
+      boolean includeReverseTraffic = false;
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(0, fixedQueryHandler.beginNS);
+      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+      Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    }
+    {
+      String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+      queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
+      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
+      Assert.assertEquals(query, queryQueryHandler.fields);
+      Assert.assertEquals(0, queryQueryHandler.beginNS);
+      Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryQueryHandler.endNS);
+    }
   }
 
   @Test
   public void testEmptyEndTime() throws Exception {
     String srcIp = "srcIp";
-    String dstIp = null;
+    String dstIp = "dstIp";
     String protocol = "protocol";
     String srcPort = "80";
     String dstPort = "100";
     long startTime = -1;
     long endTime = -1;
-    boolean includeReverseTraffic = false;
-    restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
-    Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
-    Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
-    Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
-    Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
-    Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
-    Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
-    Assert.assertEquals(0, queryHandler.beginNS);
-    Assert.assertTrue(queryHandler.endNS > 0);
-    Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    {
+      boolean includeReverseTraffic = false;
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+      Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+      Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+      Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+      Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+      Assert.assertEquals(0, fixedQueryHandler.beginNS);
+      Assert.assertTrue(fixedQueryHandler.endNS > 0);
+      Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+    }
+    {
+      String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+      queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+      Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
+      Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
+      Assert.assertEquals(query, queryQueryHandler.fields);
+      Assert.assertEquals(0, queryQueryHandler.beginNS);
+      Assert.assertTrue(queryQueryHandler.endNS > 0);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 60a5b51..7e791d5 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -17,6 +17,9 @@
  */
 package org.apache.metron.common;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class Constants {
 
   public static final String ZOOKEEPER_ROOT = "/metron";
@@ -25,9 +28,11 @@ public class Constants {
   public static final String SENSOR_TYPE = "source.type";
   public static final String ENRICHMENT_TOPIC = "enrichments";
   public static final String ERROR_STREAM = "error";
+  public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
+  public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
 
   public static enum Fields {
-    SRC_ADDR("ip_src_addr")
+     SRC_ADDR("ip_src_addr")
     ,SRC_PORT("ip_src_port")
     ,DST_ADDR("ip_dst_addr")
     ,DST_PORT("ip_dst_port")
@@ -35,16 +40,29 @@ public class Constants {
     ,TIMESTAMP("timestamp")
     ,INCLUDES_REVERSE_TRAFFIC("includes_reverse_traffic")
     ;
+    private static Map<String, Fields> nameToField;
+
+    static {
+      nameToField = new HashMap<>();
+      for (Fields f : Fields.values()) {
+        nameToField.put(f.getName(), f);
+      }
+    }
+
     private String name;
+
     Fields(String name) {
       this.name = name;
     }
+
     public String getName() {
       return name;
     }
+
+    public static Fields fromString(String fieldName) {
+      return nameToField.get(fieldName);
+    }
   }
 
-  public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
-  public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java
new file mode 100644
index 0000000..b813dc5
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common;
+
+public interface Creator<T> {
+  T create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
index 6499319..26e4da8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
@@ -18,12 +18,20 @@
 
 package org.apache.metron.common.query;
 
-import org.antlr.v4.runtime.*;
-import org.apache.metron.common.query.generated.*;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.metron.common.query.generated.PredicateLexer;
+import org.apache.metron.common.query.generated.PredicateParser;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 
 public class PredicateProcessor {
   public boolean parse(String rule, VariableResolver resolver) {
+    if (rule == null || isEmpty(rule.trim())) {
+      return true;
+    }
     ANTLRInputStream input = new ANTLRInputStream(rule);
     PredicateLexer lexer = new PredicateLexer(input);
     lexer.removeErrorListeners();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
index ad798e2..4660002 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
@@ -31,14 +31,14 @@ public class QueryParserTest {
     PredicateProcessor processor = new PredicateProcessor();
     try {
       processor.validate("'foo'");
-      Assert.fail("Invalid rule found to be valid.");
+      Assert.fail("Invalid rule found to be valid - lone value.");
     }
     catch(ParseException e) {
 
     }
     try {
       processor.validate("enrichedField1 == 'enrichedValue1");
-      Assert.fail("Invalid rule found to be valid.");
+      Assert.fail("Invalid rule found to be valid - unclosed single quotes.");
     }
     catch(ParseException e) {
 
@@ -67,6 +67,9 @@ public class QueryParserTest {
     Assert.assertTrue(run("foo== foo", v -> variableMap.get(v)));
     Assert.assertTrue(run("empty== ''", v -> variableMap.get(v)));
     Assert.assertTrue(run("spaced == 'metron is great'", v -> variableMap.get(v)));
+    Assert.assertTrue(run(null, v -> variableMap.get(v)));
+    Assert.assertTrue(run("", v -> variableMap.get(v)));
+    Assert.assertTrue(run(" ", v -> variableMap.get(v)));
   }
 
   @Test
@@ -82,7 +85,10 @@ public class QueryParserTest {
     Assert.assertFalse(run("('casey' == foo) and (FALSE == TRUE)", v -> variableMap.get(v)));
     Assert.assertFalse(run("'casey' == foo and FALSE", v -> variableMap.get(v)));
     Assert.assertTrue(run("'casey' == foo and true", v -> variableMap.get(v)));
+    Assert.assertTrue(run("true", v -> variableMap.get(v)));
+    Assert.assertTrue(run("TRUE", v -> variableMap.get(v)));
   }
+
   @Test
   public void testList() throws Exception {
     final Map<String, String> variableMap = new HashMap<String, String>() {{
@@ -98,6 +104,7 @@ public class QueryParserTest {
     Assert.assertFalse(run("foo not in [ 'casey', 'david' ]", v -> variableMap.get(v)));
     Assert.assertFalse(run("foo not in [ 'casey', 'david' ] and 'casey' == foo", v -> variableMap.get(v)));
   }
+
   @Test
   public void testExists() throws Exception {
     final Map<String, String> variableMap = new HashMap<String, String>() {{
@@ -123,6 +130,7 @@ public class QueryParserTest {
     Assert.assertTrue(run("TO_UPPER(foo) in [ TO_UPPER('casey'), 'david' ] and IN_SUBNET(ip, '192.168.0.0/24')", v -> variableMap.get(v)));
     Assert.assertFalse(run("TO_LOWER(foo) in [ TO_UPPER('casey'), 'david' ]", v -> variableMap.get(v)));
   }
+
   @Test
   public void testLogicalFunctions() throws Exception {
     final Map<String, String> variableMap = new HashMap<String, String>() {{
@@ -149,4 +157,5 @@ public class QueryParserTest {
     Assert.assertFalse(run("IN_SUBNET(ip_dst_addr, '192.168.0.0/24')", v-> variableMap.get(v)));
     Assert.assertTrue(run("not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))", v-> variableMap.get(v)));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
new file mode 100644
index 0000000..218d143
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+public class FixedPcapFilterTest {
+  @Test
+  public void testTrivialEquality() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testReverseTraffic() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "dst_ip");
+            put(Constants.Fields.SRC_PORT, 1);
+            put(Constants.Fields.DST_ADDR, "src_ip");
+            put(Constants.Fields.DST_PORT, 0);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "dst_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "src_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+@Test
+public void testMissingDstAddr() throws Exception {
+  Configuration config = new Configuration();
+  final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+    put(Constants.Fields.SRC_ADDR, "src_ip");
+    put(Constants.Fields.SRC_PORT, "0");
+    put(Constants.Fields.DST_PORT, "1");
+    put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+  }};
+  new FixedPcapFilter.Configurator().addToConfig(fields, config);
+  {
+    FixedPcapFilter filter = new FixedPcapFilter() {
+      @Override
+      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+          put(Constants.Fields.SRC_ADDR, "src_ip");
+          put(Constants.Fields.SRC_PORT, 0);
+          put(Constants.Fields.DST_ADDR, "dst_ip");
+          put(Constants.Fields.DST_PORT, 1);
+        }};
+      }
+    };
+    filter.configure(config);
+    Assert.assertTrue(filter.test(null));
+  }
+  new FixedPcapFilter.Configurator().addToConfig(fields, config);
+  {
+    FixedPcapFilter filter = new FixedPcapFilter() {
+      @Override
+      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+          put(Constants.Fields.SRC_ADDR, "src_ip1");
+          put(Constants.Fields.SRC_PORT, 0);
+          put(Constants.Fields.DST_ADDR, "dst_ip");
+          put(Constants.Fields.DST_PORT, 1);
+        }};
+      }
+    };
+    filter.configure(config);
+    Assert.assertFalse(filter.test(null));
+  }
+}
+  @Test
+  public void testMissingDstPort() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 100);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+  @Test
+  public void testMissingSrcAddr() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+  @Test
+  public void testMissingSrcPort() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new FixedPcapFilter.Configurator().addToConfig(fields, config);
+    {
+      FixedPcapFilter filter = new FixedPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 100);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
deleted file mode 100644
index 70c973a..0000000
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.*;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.mr.PcapFilter;
-import org.apache.metron.pcap.mr.PcapJob;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.EnumMap;
-
-public class PcapFilterTest {
-  @Test
-  public void testTrivialEquality() throws Exception {
-    Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
-    }};
-    PcapJob.addToConfig(fields, config);
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-  }
-
-  @Test
-  public void testReverseTraffic() throws Exception {
-    Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
-    }};
-    PcapJob.addToConfig(fields, config);
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "dst_ip");
-            put(Constants.Fields.SRC_PORT, 1);
-            put(Constants.Fields.DST_ADDR, "src_ip");
-            put(Constants.Fields.DST_PORT, 0);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "dst_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "src_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertFalse(filter.apply(null));
-    }
-  }
-@Test
-public void testMissingDstAddr() throws Exception {
-  Configuration config = new Configuration();
-  final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-    put(Constants.Fields.SRC_ADDR, "src_ip");
-    put(Constants.Fields.SRC_PORT, "0");
-    put(Constants.Fields.DST_PORT, "1");
-    put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
-  }};
-  PcapJob.addToConfig(fields, config);
-  {
-    PcapFilter filter = new PcapFilter(config) {
-      @Override
-      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-          put(Constants.Fields.SRC_ADDR, "src_ip");
-          put(Constants.Fields.SRC_PORT, 0);
-          put(Constants.Fields.DST_ADDR, "dst_ip");
-          put(Constants.Fields.DST_PORT, 1);
-        }};
-      }
-    };
-    Assert.assertTrue(filter.apply(null));
-  }
-  {
-    PcapFilter filter = new PcapFilter(config) {
-      @Override
-      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-          put(Constants.Fields.SRC_ADDR, "src_ip1");
-          put(Constants.Fields.SRC_PORT, 0);
-          put(Constants.Fields.DST_ADDR, "dst_ip");
-          put(Constants.Fields.DST_PORT, 1);
-        }};
-      }
-    };
-    Assert.assertFalse(filter.apply(null));
-  }
-}
-  @Test
-  public void testMissingDstPort() throws Exception {
-    Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
-    }};
-    PcapJob.addToConfig(fields, config);
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 100);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 100);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 100);
-          }};
-        }
-      };
-      Assert.assertFalse(filter.apply(null));
-    }
-  }
-  @Test
-  public void testMissingSrcAddr() throws Exception {
-    Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_PORT, "0");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
-    }};
-    PcapJob.addToConfig(fields, config);
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-  }
-  @Test
-  public void testMissingSrcPort() throws Exception {
-    Configuration config = new Configuration();
-    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
-      put(Constants.Fields.SRC_ADDR, "src_ip");
-      put(Constants.Fields.DST_ADDR, "dst_ip");
-      put(Constants.Fields.DST_PORT, "1");
-      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
-    }};
-    PcapJob.addToConfig(fields, config);
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 0);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-    {
-      PcapFilter filter = new PcapFilter(config) {
-        @Override
-        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
-          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
-            put(Constants.Fields.SRC_ADDR, "src_ip");
-            put(Constants.Fields.SRC_PORT, 100);
-            put(Constants.Fields.DST_ADDR, "dst_ip");
-            put(Constants.Fields.DST_PORT, 1);
-          }};
-        }
-      };
-      Assert.assertTrue(filter.apply(null));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
new file mode 100644
index 0000000..f07dc48
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+public class QueryPcapFilterTest {
+
+  @Test
+  public void testEmptyQueryFilter() throws Exception {
+    Configuration config = new Configuration();
+    String query = "";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      PcapFilter filter = new QueryPcapFilter() {
+
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testTrivialEquality() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      PcapFilter filter = new QueryPcapFilter() {
+
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingDstAddr() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_port == '1'";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip1");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingDstPort() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_addr == 'dst_ip'";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 100);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 100);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertFalse(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingSrcAddr() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_port == '0' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+
+  @Test
+  public void testMissingSrcPort() throws Exception {
+    Configuration config = new Configuration();
+    String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'";
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+    new QueryPcapFilter.Configurator().addToConfig(query, config);
+    {
+      QueryPcapFilter filter = new QueryPcapFilter() {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 100);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      filter.configure(config);
+      Assert.assertTrue(filter.test(null));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 1142da6..03e3639 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -25,7 +25,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import kafka.consumer.ConsumerIterator;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -42,6 +43,8 @@ import org.apache.metron.integration.utils.KafkaUtil;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.PcapMerger;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.apache.metron.spout.pcap.Endianness;
 import org.apache.metron.spout.pcap.scheme.TimestampScheme;
@@ -51,7 +54,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
 import java.util.*;
 
 public class PcapTopologyIntegrationTest {
@@ -221,7 +227,6 @@ public class PcapTopologyIntegrationTest {
                                      }
                                    }
             );
-    //.withExistingZookeeper("localhost:2000");
 
 
     final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath());
@@ -274,6 +279,22 @@ public class PcapTopologyIntegrationTest {
                         , new EnumMap<>(Constants.Fields.class)
                         , new Configuration()
                         , FileSystem.get(new Configuration())
+                        , new FixedPcapFilter.Configurator()
+                );
+        Assert.assertEquals(results.size(), 2);
+      }
+      {
+        // Ensure that only two pcaps are returned when we look at 4 and 5
+        // test with empty query filter
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(4, pcapEntries)
+                        , getTimestamp(5, pcapEntries)
+                        , ""
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                        , new QueryPcapFilter.Configurator()
                 );
         Assert.assertEquals(results.size(), 2);
       }
@@ -289,6 +310,22 @@ public class PcapTopologyIntegrationTest {
                         }}
                         , new Configuration()
                         , FileSystem.get(new Configuration())
+                        , new FixedPcapFilter.Configurator()
+                );
+        Assert.assertEquals(results.size(), 0);
+      }
+      {
+        // ensure that none get returned since that destination IP address isn't in the dataset
+        // test with query filter
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(1, pcapEntries)
+                        , "ip_dst_addr == '207.28.210.1'"
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                        , new QueryPcapFilter.Configurator()
                 );
         Assert.assertEquals(results.size(), 0);
       }
@@ -304,6 +341,22 @@ public class PcapTopologyIntegrationTest {
                         }}
                         , new Configuration()
                         , FileSystem.get(new Configuration())
+                        , new FixedPcapFilter.Configurator()
+                );
+        Assert.assertEquals(results.size(), 0);
+      }
+      {
+        //same with protocol as before with the destination addr
+        //test with query filter
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(1, pcapEntries)
+                        , "protocol == 'foo'"
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                        , new QueryPcapFilter.Configurator()
                 );
         Assert.assertEquals(results.size(), 0);
       }
@@ -317,6 +370,22 @@ public class PcapTopologyIntegrationTest {
                         , new EnumMap<>(Constants.Fields.class)
                         , new Configuration()
                         , FileSystem.get(new Configuration())
+                        , new FixedPcapFilter.Configurator()
+                );
+        Assert.assertEquals(results.size(), pcapEntries.size());
+      }
+      {
+        //make sure I get them all.
+        //with query filter
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , ""
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                        , new QueryPcapFilter.Configurator()
                 );
         Assert.assertEquals(results.size(), pcapEntries.size());
       }
@@ -331,6 +400,34 @@ public class PcapTopologyIntegrationTest {
                         }}
                         , new Configuration()
                         , FileSystem.get(new Configuration())
+                        , new FixedPcapFilter.Configurator()
+                );
+        Assert.assertTrue(results.size() > 0);
+        Assert.assertEquals(results.size()
+                , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+                          @Override
+                          public boolean apply(@Nullable JSONObject input) {
+                            Object prt = input.get(Constants.Fields.DST_PORT.getName());
+                            return prt != null && prt.toString().equals("22");
+                          }
+                        }, withHeaders)
+                )
+        );
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PcapMerger.merge(baos, results);
+        Assert.assertTrue(baos.toByteArray().length > 0);
+      }
+      {
+        //test with query filter
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , "ip_dst_port == '22'"
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                        , new QueryPcapFilter.Configurator()
                 );
         Assert.assertTrue(results.size() > 0);
         Assert.assertEquals(results.size()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
new file mode 100644
index 0000000..50537e1
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.query.VariableResolver;
+
+import java.util.EnumMap;
+
+public class PcapFieldResolver implements VariableResolver {
+  EnumMap<Constants.Fields, Object> fieldsMap = null;
+
+  public PcapFieldResolver(EnumMap<Constants.Fields, Object> fieldsMap) {
+    this.fieldsMap = fieldsMap;
+  }
+
+  @Override
+  public String resolve(String variable) {
+    Object obj = fieldsMap.get(Constants.Fields.fromString(variable));
+    if (obj != null) {
+      return obj.toString();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
new file mode 100644
index 0000000..c7168aa
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.pcap.PacketInfo;
+
+import java.util.Map;
+import java.util.function.Predicate;
+
+public interface PcapFilter extends Predicate<PacketInfo> {
+  void configure(Iterable<Map.Entry<String, String>> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java
new file mode 100644
index 0000000..43e79ce
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface PcapFilterConfigurator<T> {
+  public static final String PCAP_FILTER_NAME_CONF = "PCAP_FILTER_NAME";
+  void addToConfig(T fields, Configuration conf);
+  String queryToString(T fields);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java
new file mode 100644
index 0000000..7d9e285
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.common.Creator;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+
+public enum PcapFilters implements Creator<PcapFilter> {
+  FIXED(new Creator<PcapFilter>() {
+    @Override
+    public PcapFilter create() {
+      return new FixedPcapFilter();
+    }
+  }),
+  QUERY(new Creator<PcapFilter>() {
+    @Override
+    public PcapFilter create() {
+      return new QueryPcapFilter();
+    }
+  });
+
+  Creator<PcapFilter> creator;
+
+  PcapFilters(Creator<PcapFilter> creator) {
+    this.creator = creator;
+  }
+
+  public PcapFilter create() {
+    return creator.create();
+  }
+
+}