You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/20 13:08:34 UTC

incubator-metron git commit: METRON-235 Expose filtering capability for PCAP via CLI tool. (mmiklavc via cestella) closes apache/incubator-metron#156

Repository: incubator-metron
Updated Branches:
  refs/heads/master 5ca8d650c -> 46e48f890


METRON-235 Expose filtering capability for PCAP via CLI tool. (mmiklavc via cestella) closes apache/incubator-metron#156


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/46e48f89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/46e48f89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/46e48f89

Branch: refs/heads/master
Commit: 46e48f890756c1afe3ffc49b924b40eb26144af2
Parents: 5ca8d65
Author: mmiklavc <mi...@gmail.com>
Authored: Mon Jun 20 09:08:16 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Jun 20 09:08:16 2016 -0400

----------------------------------------------------------------------
 .../roles/ambari_config/defaults/main.yml       |   6 +-
 .../roles/ambari_config/vars/single_node_vm.yml |   2 +
 .../SensorEnrichmentUpdateConfig.java           |  19 +-
 .../org/apache/metron/common/system/Clock.java  |  37 +++
 .../SensorEnrichmentUpdateConfigTest.java       |  73 +++--
 .../apache/metron/common/system/ClockTest.java  |  54 ++++
 metron-platform/metron-pcap-backend/README.md   |  47 ++++
 .../org/apache/metron/pcap/query/CliConfig.java |  94 +++++++
 .../org/apache/metron/pcap/query/CliParser.java | 102 +++++++
 .../metron/pcap/query/FixedCliConfig.java       |  47 ++++
 .../metron/pcap/query/FixedCliParser.java       |  67 +++++
 .../org/apache/metron/pcap/query/PcapCli.java   | 176 ++++++++++++
 .../metron/pcap/query/QueryCliConfig.java       |  30 ++
 .../metron/pcap/query/QueryCliParser.java       |  58 ++++
 .../apache/metron/pcap/query/ResultsWriter.java |  48 ++++
 .../src/main/scripts/pcap_query.sh              |  34 +++
 .../apache/metron/pcap/query/PcapCliTest.java   | 275 +++++++++++++++++++
 17 files changed, 1122 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-deployment/roles/ambari_config/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/ambari_config/defaults/main.yml b/metron-deployment/roles/ambari_config/defaults/main.yml
index 88ea47a..43d5d0b 100644
--- a/metron-deployment/roles/ambari_config/defaults/main.yml
+++ b/metron-deployment/roles/ambari_config/defaults/main.yml
@@ -29,5 +29,7 @@ storm_local_dir: /hadoop/storm
 kafka_log_dirs: /kafka-log
 cluster_type: small_cluster
 nodemanager_mem_mb : 4096
-mapred_map_mem_mb : 1024
-mapred_reduce_mem_mb : 1024
+mapred_map_java_opts : -Xmx1024m
+mapred_reduce_java_opts : -Xmx1024m
+mapred_map_mem_mb : 1229
+mapred_reduce_mem_mb : 1229

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
index 031fe9b..9a6afc1 100644
--- a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
+++ b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
@@ -61,6 +61,8 @@ configurations:
       jobhistory_heapsize: 256
   - mapred-site:
       mapreduce.jobhistory.recovery.store.leveldb.path : '{{ jhs_recovery_store_ldb_path }}'
+      mapreduce.map.java.opts : '{{ mapred_map_java_opts }}'
+      mapreduce.reduce.java.opts : '{{ mapred_reduce_java_opts }}'
       mapreduce.map.memory.mb : '{{ mapred_map_mem_mb }}'
       mapreduce.reduce.memory.mb : '{{ mapred_reduce_mem_mb }}'
   - yarn-site:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
index 99e681a..259fcf0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
@@ -115,13 +115,7 @@ public class SensorEnrichmentUpdateConfig {
   {
     Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
     for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
-      SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
-      if(config == null) {
-        config = scHandler.readConfig(kv.getKey());
-        if(_LOG.isDebugEnabled()) {
-          _LOG.debug(config.toJSON());
-        }
-      }
+      SensorEnrichmentConfig config = findConfigBySensorType(scHandler, sourceConfigsChanged, kv.getKey());
       Map<String, List<String> > fieldMap = null;
       Map<String, List<String> > fieldToTypeMap = null;
       List<String> fieldList = null;
@@ -206,4 +200,15 @@ public class SensorEnrichmentUpdateConfig {
     }
   }
 
+  private static SensorEnrichmentConfig findConfigBySensorType(SourceConfigHandler scHandler, Map<String, SensorEnrichmentConfig> sourceConfigsChanged, String key) throws Exception {
+    SensorEnrichmentConfig config = sourceConfigsChanged.get(key);
+    if(config == null) {
+      config = scHandler.readConfig(key);
+      if(_LOG.isDebugEnabled()) {
+        _LOG.debug(config.toJSON());
+      }
+    }
+    return config;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java
new file mode 100644
index 0000000..458b730
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.system;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class Clock {
+  private final static String UTC = "UTC";
+
+  public long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+
+  public String currentTimeFormatted(String stdDateFormat) {
+    SimpleDateFormat format = new SimpleDateFormat(stdDateFormat);
+    format.setTimeZone(TimeZone.getTimeZone(UTC));
+    return format.format(new Date(currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
index 77035fb..681c4c1 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
@@ -35,14 +35,14 @@ public class SensorEnrichmentUpdateConfigTest {
       "batchSize": 5,
       "enrichment" : {
         "fieldMap": {
-        "geo": ["ip_dst_addr", "ip_src_addr"],
-        "host": ["host"]
+          "geo": ["ip_dst_addr", "ip_src_addr"],
+          "host": ["host"]
                     }
       },
       "threatIntel": {
         "fieldMap": {
           "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
-                             },
+                    },
         "fieldToTypeMap": {
           "ip_dst_addr" : [ "malicious_ip" ]
          ,"ip_src_addr" : [ "malicious_ip" ]
@@ -63,27 +63,24 @@ public class SensorEnrichmentUpdateConfigTest {
 {
   "zkQuorum" : "localhost:2181"
  ,"sensorToFieldList" : {
-  "bro" : {
+      "bro" : {
            "type" : "THREAT_INTEL"
           ,"fieldToEnrichmentTypes" : {
-            "ip_src_addr" : [ "playful" ]
-           ,"ip_dst_addr" : [ "playful" ]
+              "ip_src_addr" : [ "playful" ]
+             ,"ip_dst_addr" : [ "playful" ]
                                       }
-          }
+              }
                         }
 }
-     */
+  */
   @Multiline
   public static String threatIntelConfigStr;
 
   @Test
   public void testThreatIntel() throws Exception {
-
     SensorEnrichmentConfig broSc = (SensorEnrichmentConfig) ConfigurationType.ENRICHMENT.deserialize(sourceConfigStr);
-
-
-    SensorEnrichmentUpdateConfig config = JSONUtils.INSTANCE.load(threatIntelConfigStr, SensorEnrichmentUpdateConfig.class);
-    final Map<String, SensorEnrichmentConfig> outputScs = new HashMap<>();
+    SensorEnrichmentUpdateConfig threatIntelConfig = JSONUtils.INSTANCE.load(threatIntelConfigStr, SensorEnrichmentUpdateConfig.class);
+    final Map<String, SensorEnrichmentConfig> finalEnrichmentConfig = new HashMap<>();
     SensorEnrichmentUpdateConfig.SourceConfigHandler scHandler = new SensorEnrichmentUpdateConfig.SourceConfigHandler() {
       @Override
       public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
@@ -97,50 +94,50 @@ public class SensorEnrichmentUpdateConfigTest {
 
       @Override
       public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
-        outputScs.put(sensor, config);
+        finalEnrichmentConfig.put(sensor, config);
       }
     };
-    SensorEnrichmentUpdateConfig.updateSensorConfigs(scHandler, config.getSensorToFieldList());
-    Assert.assertNotNull(outputScs.get("bro"));
-    Assert.assertNotSame(outputScs.get("bro"), broSc);
-    Assert.assertEquals( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldMap().get(Constants.SIMPLE_HBASE_THREAT_INTEL).size()
+    SensorEnrichmentUpdateConfig.updateSensorConfigs(scHandler, threatIntelConfig.getSensorToFieldList());
+    Assert.assertNotNull(finalEnrichmentConfig.get("bro"));
+    Assert.assertNotSame(finalEnrichmentConfig.get("bro"), broSc);
+    Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldMap().get(Constants.SIMPLE_HBASE_THREAT_INTEL).size()
                        , 2
                        );
-    Assert.assertEquals(1, outputScs.get("bro").getThreatIntel().getTriageConfig().getRiskLevelRules().size());
-    Assert.assertTrue( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldMap()
+    Assert.assertEquals(1, finalEnrichmentConfig.get("bro").getThreatIntel().getTriageConfig().getRiskLevelRules().size());
+    Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldMap()
                                   .get(Constants.SIMPLE_HBASE_THREAT_INTEL)
                                   .contains("ip_src_addr")
                        );
-    Assert.assertTrue( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldMap()
+    Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldMap()
                                   .get(Constants.SIMPLE_HBASE_THREAT_INTEL)
                                   .contains("ip_dst_addr")
                        );
-    Assert.assertEquals( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().keySet().size()
+    Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().keySet().size()
                        , 2
                        );
-    Assert.assertEquals( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").size()
+    Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").size()
                        , 2
                        );
-    Assert.assertTrue( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("playful")
+    Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("playful")
                        );
-    Assert.assertTrue( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("malicious_ip")
+    Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("malicious_ip")
                        );
-    Assert.assertEquals( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").size()
+    Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").size()
                        , 2
                        );
-    Assert.assertTrue( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("playful")
+    Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("playful")
                        );
-    Assert.assertTrue( outputScs.get("bro").toJSON()
-                       , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("malicious_ip")
+    Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+                       , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("malicious_ip")
                        );
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java
new file mode 100644
index 0000000..1efc356
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.system;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class ClockTest {
+
+  @Test
+  public void returns_system_time() throws Exception {
+    Clock clock = new Clock();
+    long t1 = clock.currentTimeMillis();
+    Thread.sleep(50);
+    long t2 = clock.currentTimeMillis();
+    Thread.sleep(50);
+    long t3 = clock.currentTimeMillis();
+    assertThat("t3 should be greater", t3 > t2, equalTo(true));
+    assertThat("t2 should be greater", t2 > t1, equalTo(true));
+  }
+
+  @Test
+  public void formats_system_time_given_passed_format() throws Exception {
+    Clock clock = Mockito.spy(Clock.class);
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSSZ");
+    sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+    Date date = sdf.parse("20160615183527162+0000");
+    Mockito.when(clock.currentTimeMillis()).thenReturn(date.getTime());
+    assertThat("time not right", clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"), equalTo("20160615183527162+0000"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index 9969c02..b708fe5 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -63,6 +63,7 @@ arguments has been created to make this very simple.  Simply, execute
 
 ## Utilities
 
+### Inspector Utility
 In order to ensure that data can be read back out, a utility,
 `$METRON_HOME/bin/pcap_inspector.sh` has been
 created to read portions of the sequence files.
@@ -73,3 +74,49 @@ usage: PcapInspector
  -i,--input <SEQ_FILE>   Input sequence file on HDFS
  -n,--num_packets <N>    Number of packets to dump
 ```
+
+### Query Filter Utility
+This tool exposes the two methods for filtering PCAP data via a command line tool:
+- fixed
+- query (Metron Stellar)
+
+The tool is executed via ```${metron_home}/bin/pcap_query.sh [fixed|query]```
+
+#### Usage
+```
+usage: Fixed filter options
+ -bop,--base_output_path <arg>   Query result output path. Default is
+                                 '/tmp'
+ -bp,--base_path <arg>           Base PCAP data path. Default is
+                                 '/apps/metron/pcap'
+ -da,--ip_dst_addr <arg>         Destination IP address
+ -df,--date_format <arg>         Date format to use for parsing start_time
+                                 and end_time. Default is to use time in
+                                 millis since the epoch.
+ -dp,--ip_dst_port <arg>         Destination port
+ -et,--end_time <arg>            Packet end time range. Default is current
+                                 system time.
+ -h,--help                       Display help
+ -ir,--include_reverse           Indicates if filter should check swapped
+                                 src/dest addresses and IPs
+ -p,--protocol <arg>             IP Protocol
+ -sa,--ip_src_addr <arg>         Source IP address
+ -sp,--ip_src_port <arg>         Source port
+ -st,--start_time <arg>          (required) Packet start time range.
+```
+
+```
+usage: Query filter options
+ -bop,--base_output_path <arg>   Query result output path. Default is
+                                 '/tmp'
+ -bp,--base_path <arg>           Base PCAP data path. Default is
+                                 '/apps/metron/pcap'
+ -df,--date_format <arg>         Date format to use for parsing start_time
+                                 and end_time. Default is to use time in
+                                 millis since the epoch.
+ -et,--end_time <arg>            Packet end time range. Default is current
+                                 system time.
+ -h,--help                       Display help
+ -q,--query <arg>                Query string to use as a filter
+ -st,--start_time <arg>          (required) Packet start time range.
+```

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
new file mode 100644
index 0000000..a0271b8
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+public class CliConfig {
+  public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
+  public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
+  private boolean showHelp;
+  private String basePath;
+  private String baseOutputPath;
+  private long startTime;
+  private long endTime;
+  private DateFormat dateFormat;
+
+  public CliConfig() {
+    showHelp = false;
+    basePath = BASE_PATH_DEFAULT;
+    baseOutputPath = BASE_OUTPUT_PATH_DEFAULT;
+    startTime = -1L;
+    endTime = -1L;
+  }
+
+  public boolean showHelp() {
+    return showHelp;
+  }
+
+  public void setShowHelp(boolean showHelp) {
+    this.showHelp = showHelp;
+  }
+
+  public String getBasePath() {
+    return basePath;
+  }
+
+  public String getBaseOutputPath() {
+    return baseOutputPath;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
+
+  public void setBasePath(String basePath) {
+    this.basePath = basePath;
+  }
+
+  public void setBaseOutputPath(String baseOutputPath) {
+    this.baseOutputPath = baseOutputPath;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
+  }
+
+  public boolean isNullOrEmpty(String val) {
+    return StringUtils.isEmpty(val);
+  }
+
+  public void setDateFormat(String dateFormat) {
+    this.dateFormat = new SimpleDateFormat(dateFormat);
+  }
+
+  public DateFormat getDateFormat() {
+    return dateFormat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
new file mode 100644
index 0000000..4fbb05d
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.*;
+
+/**
+ * Provides commmon required fields for the PCAP filter jobs
+ */
+public class CliParser {
+  private CommandLineParser parser;
+
+  public CliParser() {
+    parser = new PosixParser();
+  }
+
+  public Options buildOptions() {
+    Options options = new Options();
+    options.addOption(newOption("h", "help", false, "Display help"));
+    options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
+    options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
+    options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
+    options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
+    options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
+    return options;
+  }
+
+  protected Option newOption(String opt, String longOpt, boolean hasArg, String desc) {
+    return newOption(opt, longOpt, hasArg, desc, false);
+  }
+
+  protected Option newOption(String opt, String longOpt, boolean hasArg, String desc, boolean required) {
+    Option option = new Option(opt, longOpt, hasArg, desc);
+    option.setRequired(required);
+    return option;
+  }
+
+  public void parse(CommandLine commandLine, CliConfig config) throws java.text.ParseException {
+    if (commandLine.hasOption("help")) {
+      config.setShowHelp(true);
+    }
+    if (commandLine.hasOption("date_format")) {
+      config.setDateFormat(commandLine.getOptionValue("date_format"));
+    }
+    if (commandLine.hasOption("base_path")) {
+      config.setBasePath(commandLine.getOptionValue("base_path"));
+    }
+    if (commandLine.hasOption("base_output_path")) {
+      config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
+    }
+    if (commandLine.hasOption("start_time")) {
+      try {
+        if (commandLine.hasOption("date_format")) {
+          long startTime = config.getDateFormat().parse(commandLine.getOptionValue("start_time")).getTime();
+          config.setStartTime(startTime);
+        } else {
+          long startTime = Long.parseLong(commandLine.getOptionValue("start_time"));
+          config.setStartTime(startTime);
+        }
+      } catch (NumberFormatException nfe) {
+        //no-op
+      }
+    }
+    if (commandLine.hasOption("end_time")) {
+      try {
+        if (commandLine.hasOption("date_format")) {
+          long endTime = config.getDateFormat().parse(commandLine.getOptionValue("end_time")).getTime();
+          config.setEndTime(endTime);
+        } else {
+          long endTime = Long.parseLong(commandLine.getOptionValue("end_time"));
+          config.setEndTime(endTime);
+        }
+      } catch (NumberFormatException nfe) {
+        //no-op
+      }
+    }
+  }
+
+  public void printHelp(String msg, Options opts) {
+    new HelpFormatter().printHelp(msg, opts);
+  }
+
+  protected CommandLineParser getParser() {
+    return parser;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
new file mode 100644
index 0000000..897e0fd
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.metron.common.Constants;
+
+import java.util.EnumMap;
+
+public class FixedCliConfig extends CliConfig {
+
+  private EnumMap<Constants.Fields, String> fixedFields;
+
+  public FixedCliConfig() {
+    this.fixedFields = new EnumMap<>(Constants.Fields.class);
+  }
+
+  public EnumMap<Constants.Fields, String> getFixedFields() {
+    return fixedFields;
+  }
+
+  public void setFixedFields(EnumMap<Constants.Fields, String> fixedFields) {
+    this.fixedFields = fixedFields;
+  }
+
+  public void putFixedField(Constants.Fields key, String value) {
+    String trimmedVal = value != null ? value.trim() : null;
+    if (!isNullOrEmpty(trimmedVal)) {
+      this.fixedFields.put(key, value);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
new file mode 100644
index 0000000..1123cad
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.metron.common.Constants;
+
+public class FixedCliParser extends CliParser {
+  private Options fixedOptions;
+
+  public FixedCliParser() {
+    fixedOptions = buildFixedOptions();
+  }
+
+  private Options buildFixedOptions() {
+    Options options = buildOptions();
+    options.addOption(newOption("sa", "ip_src_addr", true, "Source IP address"));
+    options.addOption(newOption("da", "ip_dst_addr", true, "Destination IP address"));
+    options.addOption(newOption("sp", "ip_src_port", true, "Source port"));
+    options.addOption(newOption("dp", "ip_dst_port", true, "Destination port"));
+    options.addOption(newOption("p", "protocol", true, "IP Protocol"));
+    options.addOption(newOption("ir", "include_reverse", false, "Indicates if filter should check swapped src/dest addresses and IPs"));
+    return options;
+  }
+
+  /**
+   * Parses fixed pcap filter options and required parameters common to all filter types.
+   *
+   * @param args command line arguments to parse
+   * @return Configuration tailored to fixed pcap queries
+   * @throws ParseException
+   */
+  public FixedCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+    CommandLine commandLine = getParser().parse(fixedOptions, args);
+    FixedCliConfig config = new FixedCliConfig();
+    super.parse(commandLine, config);
+    config.putFixedField(Constants.Fields.SRC_ADDR, commandLine.getOptionValue("ip_src_addr"));
+    config.putFixedField(Constants.Fields.DST_ADDR, commandLine.getOptionValue("ip_dst_addr"));
+    config.putFixedField(Constants.Fields.SRC_PORT, commandLine.getOptionValue("ip_src_port"));
+    config.putFixedField(Constants.Fields.DST_PORT, commandLine.getOptionValue("ip_dst_port"));
+    config.putFixedField(Constants.Fields.PROTOCOL, commandLine.getOptionValue("protocol"));
+    config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, Boolean.toString(commandLine.hasOption("include_reverse")));
+    return config;
+  }
+
+  public void printHelp() {
+    super.printHelp("Fixed filter options", fixedOptions);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
new file mode 100644
index 0000000..e7ce4da
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PcapCli {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PcapCli.class);
+  private final PcapJob jobRunner;
+  private final ResultsWriter resultsWriter;
+  private final Clock clock;
+
+  public static void main(String[] args) {
+    int status = new PcapCli(new PcapJob(), new ResultsWriter(), new Clock()).run(args);
+    System.exit(status);
+  }
+
+  public PcapCli(PcapJob jobRunner, ResultsWriter resultsWriter, Clock clock) {
+    this.jobRunner = jobRunner;
+    this.resultsWriter = resultsWriter;
+    this.clock = clock;
+  }
+
+  public int run(String[] args) {
+    if (args.length < 1) {
+      printBasicHelp();
+      return -1;
+    }
+    String jobType = args[0];
+    List<byte[]> results = new ArrayList<>();
+    String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+    Configuration hadoopConf = new Configuration();
+    String[] otherArgs = null;
+    try {
+      otherArgs = new GenericOptionsParser(hadoopConf, commandArgs).getRemainingArgs();
+    } catch (IOException e) {
+      LOGGER.error("Failed to configure hadoop with provided options: " + e.getMessage(), e);
+      return -1;
+    }
+    if ("fixed".equals(jobType)) {
+      FixedCliParser fixedParser = new FixedCliParser();
+      FixedCliConfig config = null;
+      try {
+        config = fixedParser.parse(otherArgs);
+      } catch (ParseException | java.text.ParseException e) {
+        System.err.println(e.getMessage());
+        fixedParser.printHelp();
+        return -1;
+      }
+      if (config.showHelp()) {
+        fixedParser.printHelp();
+        return 0;
+      }
+      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+      long startTime = time.getLeft();
+      long endTime = time.getRight();
+
+      try {
+        results = jobRunner.query(
+                new Path(config.getBasePath()),
+                new Path(config.getBaseOutputPath()),
+                startTime,
+                endTime,
+                config.getFixedFields(),
+                hadoopConf,
+                FileSystem.get(hadoopConf),
+                new FixedPcapFilter.Configurator());
+      } catch (IOException | ClassNotFoundException e) {
+        LOGGER.error("Failed to execute fixed filter job: " + e.getMessage(), e);
+        return -1;
+      } catch (InterruptedException e) {
+        LOGGER.error("Failed to execute fixed filter job: " + e.getMessage(), e);
+        return -1;
+      }
+    } else if ("query".equals(jobType)) {
+      QueryCliParser queryParser = new QueryCliParser();
+      QueryCliConfig config = null;
+      try {
+        config = queryParser.parse(otherArgs);
+      } catch (ParseException | java.text.ParseException e) {
+        System.err.println(e.getMessage());
+        queryParser.printHelp();
+        return -1;
+      }
+      if (config.showHelp()) {
+        queryParser.printHelp();
+        return 0;
+      }
+      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+      long startTime = time.getLeft();
+      long endTime = time.getRight();
+
+      try {
+        results = jobRunner.query(
+                new Path(config.getBasePath()),
+                new Path(config.getBaseOutputPath()),
+                startTime,
+                endTime,
+                config.getQuery(),
+                hadoopConf,
+                FileSystem.get(hadoopConf),
+                new QueryPcapFilter.Configurator());
+      } catch (IOException | ClassNotFoundException e) {
+        LOGGER.error("Failed to execute query filter job: " + e.getMessage(), e);
+        return -1;
+      } catch (InterruptedException e) {
+        LOGGER.error("Failed to execute query filter job: " + e.getMessage(), e);
+        return -1;
+      }
+    } else {
+      printBasicHelp();
+      return -1;
+    }
+    String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
+    String outFileName = String.format("pcap-data-%s.pcap", timestamp);
+    try {
+      resultsWriter.write(results, outFileName);
+    } catch (IOException e) {
+      LOGGER.error("Unable to write file", e);
+      return -1;
+    }
+    return 0;
+  }
+
+  private Pair<Long, Long> timeAsNanosecondsSinceEpoch(long start, long end) {
+    long revisedStart = start;
+    if (revisedStart < 0) {
+      revisedStart = 0L;
+    }
+    long revisedEnd = end;
+    if (revisedEnd < 0) {
+      revisedEnd = System.currentTimeMillis();
+    }
+    //convert to nanoseconds since the epoch
+    revisedStart = TimestampConverters.MILLISECONDS.toNanoseconds(revisedStart);
+    revisedEnd = TimestampConverters.MILLISECONDS.toNanoseconds(revisedEnd);
+    return Pair.of(revisedStart, revisedEnd);
+  }
+
+  public void printBasicHelp() {
+    System.out.println("Usage: [fixed|query]");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
new file mode 100644
index 0000000..3d06e1d
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+public class QueryCliConfig extends CliConfig {
+  private String query;
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
new file mode 100644
index 0000000..72b5a95
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class QueryCliParser extends CliParser {
+  private Options queryOptions;
+
+  public QueryCliParser() {
+    queryOptions = setupOptions();
+  }
+
+  private Options setupOptions() {
+    Options options = buildOptions();
+    options.addOption(newOption("q", "query", true, "Query string to use as a filter"));
+    return options;
+  }
+
+  /**
+   * Parses query pcap filter options and required parameters common to all filter types.
+   *
+   * @param args command line arguments to parse
+   * @return Configuration tailored to query pcap queries
+   * @throws ParseException
+   */
+  public QueryCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+    CommandLine commandLine = getParser().parse(queryOptions, args);
+    QueryCliConfig config = new QueryCliConfig();
+    super.parse(commandLine, config);
+    if (commandLine.hasOption("query")) {
+      config.setQuery(commandLine.getOptionValue("query"));
+    }
+    return config;
+  }
+
+  public void printHelp() {
+    super.printHelp("Query filter options", queryOptions);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
new file mode 100644
index 0000000..ab11770
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.metron.pcap.PcapMerger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class ResultsWriter {
+
+  public void write(List<byte[]> pcaps, String outPath) throws IOException {
+    File out = new File(outPath);
+    try (FileOutputStream fos = new FileOutputStream(out)) {
+      fos.write(mergePcaps(pcaps));
+    }
+  }
+
+  public byte[] mergePcaps(List<byte[]> pcaps) throws IOException {
+    if (pcaps == null) {
+      return new byte[]{};
+    }
+    if (pcaps.size() == 1) {
+      return pcaps.get(0);
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, pcaps);
+    return baos.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh
new file mode 100755
index 0000000..c09aa03
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export PCAP_BACKEND_JAR=${project.artifactId}-$METRON_VERSION.jar
+
+yarn jar $METRON_HOME/lib/$PCAP_BACKEND_JAR org.apache.metron.pcap.query.PcapCli "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
new file mode 100644
index 0000000..92ab26a
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.when;
+
+public class PcapCliTest {
+
+  @Mock
+  private PcapJob jobRunner;
+  @Mock
+  private ResultsWriter resultsWriter;
+  @Mock
+  private Clock clock;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Exception {
+    String[] args = {
+            "fixed",
+            "-start_time", "500",
+            "-ip_src_addr", "192.168.1.1",
+            "-ip_dst_addr", "192.168.1.2",
+            "-ip_src_port", "8081",
+            "-ip_dst_port", "8082",
+            "-protocol", "6"
+    };
+    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+    Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
+    Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+    EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "192.168.1.1");
+      put(Constants.Fields.DST_ADDR, "192.168.1.2");
+      put(Constants.Fields.SRC_PORT, "8081");
+      put(Constants.Fields.DST_PORT, "8082");
+      put(Constants.Fields.PROTOCOL, "6");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+    assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+    Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+  }
+
+  @Test
+  public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_dateformat() throws Exception {
+    String[] args = {
+            "fixed",
+            "-start_time", "500",
+            "-end_time", "1000",
+            "-base_path", "/base/path",
+            "-base_output_path", "/base/output/path",
+            "-ip_src_addr", "192.168.1.1",
+            "-ip_dst_addr", "192.168.1.2",
+            "-ip_src_port", "8081",
+            "-ip_dst_port", "8082",
+            "-protocol", "6",
+            "-include_reverse"
+    };
+    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+    Path base_path = new Path("/base/path");
+    Path base_output_path = new Path("/base/output/path");
+    EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "192.168.1.1");
+      put(Constants.Fields.DST_ADDR, "192.168.1.2");
+      put(Constants.Fields.SRC_PORT, "8081");
+      put(Constants.Fields.DST_PORT, "8082");
+      put(Constants.Fields.PROTOCOL, "6");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    }};
+
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+    assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+    Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+  }
+
+  @Test
+  public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exception {
+    String[] args = {
+            "fixed",
+            "-start_time", "2016-06-13-18:35.00",
+            "-end_time", "2016-06-15-18:35.00",
+            "-date_format", "yyyy-MM-dd-HH:mm.ss",
+            "-base_path", "/base/path",
+            "-base_output_path", "/base/output/path",
+            "-ip_src_addr", "192.168.1.1",
+            "-ip_dst_addr", "192.168.1.2",
+            "-ip_src_port", "8081",
+            "-ip_dst_port", "8082",
+            "-protocol", "6",
+            "-include_reverse"
+    };
+    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+    Path base_path = new Path("/base/path");
+    Path base_output_path = new Path("/base/output/path");
+    EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "192.168.1.1");
+      put(Constants.Fields.DST_ADDR, "192.168.1.2");
+      put(Constants.Fields.SRC_PORT, "8081");
+      put(Constants.Fields.DST_PORT, "8082");
+      put(Constants.Fields.PROTOCOL, "6");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    }};
+
+    long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
+    long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
+    when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+    assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+    Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+  }
+
+  private long asNanos(String inDate, String format) throws ParseException {
+    SimpleDateFormat sdf = new SimpleDateFormat(format);
+    Date date = sdf.parse(inDate);
+    return TimestampConverters.MILLISECONDS.toNanoseconds(date.getTime());
+  }
+
+  private byte[] asBytes(String val) {
+    return val.getBytes(StandardCharsets.UTF_8);
+  }
+
+  @Test
+  public void runs_query_pcap_filter_job_with_default_argument_list() throws Exception {
+    String[] args = {
+            "query",
+            "-start_time", "500",
+            "-query", "some query string"
+    };
+    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+    Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
+    Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+    String query = "some query string";
+
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+    assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+    Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+  }
+
+  @Test
+  public void runs_query_pcap_filter_job_with_full_argument_list() throws Exception {
+    String[] args = {
+            "query",
+            "-start_time", "500",
+            "-end_time", "1000",
+            "-base_path", "/base/path",
+            "-base_output_path", "/base/output/path",
+            "-query", "some query string"
+    };
+    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+    Path base_path = new Path("/base/path");
+    Path base_output_path = new Path("/base/output/path");
+    String query = "some query string";
+
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+    assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+    Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+  }
+
+  @Test
+  public void invalid_fixed_filter_arg_prints_help() throws Exception {
+    PrintStream originalOutStream = System.out;
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      PrintStream testStream = new PrintStream(new BufferedOutputStream(bos));
+      System.setOut(testStream);
+      String[] args = {
+              "fixed",
+              "-start_time", "500",
+              "-end_time", "1000",
+              "-base_path", "/base/path",
+              "-base_output_path", "/base/output/path",
+              "-query", "THIS IS AN ERROR"
+      };
+
+      PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+      assertThat("Expect errors on run", cli.run(args), equalTo(-1));
+      assertThat(bos.toString(), bos.toString().contains("usage: Fixed filter options"), equalTo(true));
+    } finally {
+      System.setOut(originalOutStream);
+    }
+  }
+
+  @Test
+  public void invalid_query_filter_arg_prints_help() throws Exception {
+    PrintStream originalOutStream = System.out;
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      PrintStream outStream = new PrintStream(new BufferedOutputStream(bos));
+      System.setOut(outStream);
+      String[] args = {
+              "query",
+              "-start_time", "500",
+              "-end_time", "1000",
+              "-base_path", "/base/path",
+              "-base_output_path", "/base/output/path",
+              "-ip_src_addr", "THIS IS AN ERROR"
+      };
+
+      PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+      assertThat("Expect errors on run", cli.run(args), equalTo(-1));
+      assertThat(bos.toString(), bos.toString().contains("usage: Query filter options"), equalTo(true));
+    } finally {
+      System.setOut(originalOutStream);
+    }
+  }
+
+}