You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/07 22:50:21 UTC

incubator-metron git commit: METRON-381: Add support for multiple reducers in pcap_query.sh closes apache/incubator-metron#217

Repository: incubator-metron
Updated Branches:
  refs/heads/master e3ce452a0 -> 7445e328d


METRON-381: Add support for multiple reducers in pcap_query.sh closes apache/incubator-metron#217


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7445e328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7445e328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7445e328

Branch: refs/heads/master
Commit: 7445e328dfc34736bf81e5ba1d6580dabbd4376a
Parents: e3ce452
Author: cstella <ce...@gmail.com>
Authored: Wed Sep 7 15:49:41 2016 -0700
Committer: cstella <ce...@gmail.com>
Committed: Wed Sep 7 15:49:41 2016 -0700

----------------------------------------------------------------------
 .../pcapservice/PcapReceiverImplRestEasy.java   |  5 ++
 .../PcapReceiverImplRestEasyTest.java           | 19 +++----
 .../components/KafkaWithZKComponent.java        |  8 ++-
 metron-platform/metron-pcap-backend/README.md   |  4 ++
 .../org/apache/metron/pcap/query/CliConfig.java |  9 ++++
 .../org/apache/metron/pcap/query/CliParser.java |  8 +++
 .../org/apache/metron/pcap/query/PcapCli.java   |  9 +++-
 .../PcapTopologyIntegrationTest.java            | 10 ++++
 .../apache/metron/pcap/query/PcapCliTest.java   | 23 ++++++---
 .../java/org/apache/metron/pcap/mr/PcapJob.java | 53 ++++++++++++++++++--
 10 files changed, 123 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 9510318..18b5dc9 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -104,6 +104,7 @@ public class PcapReceiverImplRestEasy {
    * @param query Filter results based on this query
    * @param startTime Only return packets originating after this start time
    * @param endTime Only return packets originating before this end time
+ * @param numReducers Number of reducers to use
    * @param servlet_response
    * @return REST response
    * @throws IOException
@@ -114,6 +115,7 @@ public class PcapReceiverImplRestEasy {
           @QueryParam ("query") String query,
           @DefaultValue("-1") @QueryParam ("startTime")long startTime,
           @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+          @DefaultValue("10") @QueryParam ("numReducers")int numReducers,
           @Context HttpServletResponse servlet_response)
 
           throws IOException {
@@ -139,6 +141,7 @@ public class PcapReceiverImplRestEasy {
               , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
               , startTime
               , endTime
+              , numReducers
               , query
               , CONFIGURATION.get()
               , FileSystem.get(CONFIGURATION.get())
@@ -184,6 +187,7 @@ public class PcapReceiverImplRestEasy {
           @QueryParam ("dstPort") String dstPort,
           @DefaultValue("-1") @QueryParam ("startTime")long startTime,
           @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+          @DefaultValue("10") @QueryParam ("numReducers")int numReducers,
           @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
           @Context HttpServletResponse servlet_response)
 
@@ -237,6 +241,7 @@ public class PcapReceiverImplRestEasy {
                                     , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
                                     , startTime
                                     , endTime
+                                    , numReducers
                                     , query
                                     , CONFIGURATION.get()
                                     , FileSystem.get(CONFIGURATION.get())

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index bfe2233..1c1c236 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -48,6 +48,7 @@ public class PcapReceiverImplRestEasyTest {
             , Path baseOutputPath
             , long beginNS
             , long endNS
+            , int numReducers
             , T fields
             , Configuration conf
             , FileSystem fs
@@ -91,7 +92,7 @@ public class PcapReceiverImplRestEasyTest {
 
     {
       boolean includeReverseTraffic = false;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
       Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -104,7 +105,7 @@ public class PcapReceiverImplRestEasyTest {
     }
     {
       boolean includeReverseTraffic = true;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
       Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -122,7 +123,7 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = 100;
     long endTime = 1000;
     String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
-    queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+    queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10,  null);
     Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
     Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
     Assert.assertEquals(query, queryQueryHandler.fields);
@@ -140,7 +141,7 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = 100;
     long endTime = 1000;
     boolean includeReverseTraffic = false;
-    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
     Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
     Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
     Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -162,7 +163,7 @@ public class PcapReceiverImplRestEasyTest {
     long startTime = 100;
     long endTime = 1000;
     boolean includeReverseTraffic = false;
-    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+    fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
     Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
     Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
     Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -185,7 +186,7 @@ public class PcapReceiverImplRestEasyTest {
     long endTime = 1000;
     {
       boolean includeReverseTraffic = false;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
       Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -198,7 +199,7 @@ public class PcapReceiverImplRestEasyTest {
     }
     {
       String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
-      queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+      queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
       Assert.assertEquals(query, queryQueryHandler.fields);
@@ -218,7 +219,7 @@ public class PcapReceiverImplRestEasyTest {
     long endTime = -1;
     {
       boolean includeReverseTraffic = false;
-      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+      fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
       Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -231,7 +232,7 @@ public class PcapReceiverImplRestEasyTest {
     }
     {
       String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
-      queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+      queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
       Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
       Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
       Assert.assertEquals(query, queryQueryHandler.fields);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index 6d2261b..ffe7b54 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -157,8 +157,12 @@ public class KafkaWithZKComponent implements InMemoryComponent {
 
   @Override
   public void stop() {
-    kafkaServer.shutdown();
-    zkClient.close();
+    if(kafkaServer != null) {
+      kafkaServer.shutdown();
+    }
+    if(zkClient != null) {
+      zkClient.close();
+    }
     if(zkServer != null) {
       zkServer.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index b708fe5..927ae40 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -96,6 +96,8 @@ usage: Fixed filter options
  -dp,--ip_dst_port <arg>         Destination port
  -et,--end_time <arg>            Packet end time range. Default is current
                                  system time.
+ -nr,--num_reducers <arg>        The number of reducers to use.  Default
+                                 is 10.
  -h,--help                       Display help
  -ir,--include_reverse           Indicates if filter should check swapped
                                  src/dest addresses and IPs
@@ -116,6 +118,8 @@ usage: Query filter options
                                  millis since the epoch.
  -et,--end_time <arg>            Packet end time range. Default is current
                                  system time.
+ -nr,--num_reducers <arg>        The number of reducers to use.  Default
+                                 is 10.
  -h,--help                       Display help
  -q,--query <arg>                Query string to use as a filter
  -st,--start_time <arg>          (required) Packet start time range.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
index a0271b8..f8ab0ac 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
@@ -30,6 +30,7 @@ public class CliConfig {
   private String baseOutputPath;
   private long startTime;
   private long endTime;
+  private int numReducers = 0;
   private DateFormat dateFormat;
 
   public CliConfig() {
@@ -40,6 +41,10 @@ public class CliConfig {
     endTime = -1L;
   }
 
+  public int getNumReducers() {
+    return numReducers;
+  }
+
   public boolean showHelp() {
     return showHelp;
   }
@@ -91,4 +96,8 @@ public class CliConfig {
   public DateFormat getDateFormat() {
     return dateFormat;
   }
+
+  public void setNumReducers(int numReducers) {
+    this.numReducers = numReducers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index 4fbb05d..ea6f8e7 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -36,6 +36,7 @@ public class CliParser {
     options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
     options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
     options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
+    options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true));
     options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
     options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
     return options;
@@ -77,6 +78,13 @@ public class CliParser {
         //no-op
       }
     }
+    if (commandLine.hasOption("num_reducers")) {
+      int numReducers = Integer.parseInt(commandLine.getOptionValue("num_reducers"));
+      config.setNumReducers(numReducers);
+    }
+    else {
+      config.setNumReducers(10);
+    }
     if (commandLine.hasOption("end_time")) {
       try {
         if (commandLine.hasOption("date_format")) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index e7ce4da..d96e166 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -93,6 +93,7 @@ public class PcapCli {
                 new Path(config.getBaseOutputPath()),
                 startTime,
                 endTime,
+                config.getNumReducers(),
                 config.getFixedFields(),
                 hadoopConf,
                 FileSystem.get(hadoopConf),
@@ -128,6 +129,7 @@ public class PcapCli {
                 new Path(config.getBaseOutputPath()),
                 startTime,
                 endTime,
+                config.getNumReducers(),
                 config.getQuery(),
                 hadoopConf,
                 FileSystem.get(hadoopConf),
@@ -146,7 +148,12 @@ public class PcapCli {
     String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
     String outFileName = String.format("pcap-data-%s.pcap", timestamp);
     try {
-      resultsWriter.write(results, outFileName);
+      if(results.size() > 0) {
+        resultsWriter.write(results, outFileName);
+      }
+      else {
+        System.out.println("No results returned.");
+      }
     } catch (IOException e) {
       LOGGER.error("Unable to write file", e);
       return -1;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 8fcdeed..d4367ea 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -276,6 +276,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(4, pcapEntries)
                         , getTimestamp(5, pcapEntries)
+                        , 10
                         , new EnumMap<>(Constants.Fields.class)
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -292,6 +293,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(4, pcapEntries)
                         , getTimestamp(5, pcapEntries)
+                        , 10
                         , ""
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -307,6 +309,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(1, pcapEntries)
+                        , 10
                         , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
                           put(Constants.Fields.DST_ADDR, "207.28.210.1");
                         }}
@@ -325,6 +328,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(1, pcapEntries)
+                        , 10
                         , "ip_dst_addr == '207.28.210.1'"
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -340,6 +344,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(1, pcapEntries)
+                        , 10
                         , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
                           put(Constants.Fields.PROTOCOL, "foo");
                         }}
@@ -358,6 +363,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(1, pcapEntries)
+                        , 10
                         , "protocol == 'foo'"
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -373,6 +379,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , 10
                         , new EnumMap<>(Constants.Fields.class)
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -389,6 +396,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , 10
                         , ""
                         , new Configuration()
                         , FileSystem.get(new Configuration())
@@ -403,6 +411,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , 10
                         , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
                           put(Constants.Fields.DST_PORT, "22");
                         }}
@@ -433,6 +442,7 @@ public class PcapTopologyIntegrationTest {
                         , new Path(queryDir.getAbsolutePath())
                         , getTimestamp(0, pcapEntries)
                         , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , 10
                         , "ip_dst_port == '22'"
                         , new Configuration()
                         , FileSystem.get(new Configuration())

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 92ab26a..4d6432e 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -71,7 +71,8 @@ public class PcapCliTest {
             "-ip_dst_addr", "192.168.1.2",
             "-ip_src_port", "8081",
             "-ip_dst_port", "8082",
-            "-protocol", "6"
+            "-protocol", "6",
+            "-num_reducers", "10"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
 
@@ -86,7 +87,7 @@ public class PcapCliTest {
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
     }};
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -107,7 +108,8 @@ public class PcapCliTest {
             "-ip_src_port", "8081",
             "-ip_dst_port", "8082",
             "-protocol", "6",
-            "-include_reverse"
+            "-include_reverse",
+            "-num_reducers", "10"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
 
@@ -122,7 +124,7 @@ public class PcapCliTest {
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
     }};
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -144,7 +146,8 @@ public class PcapCliTest {
             "-ip_src_port", "8081",
             "-ip_dst_port", "8082",
             "-protocol", "6",
-            "-include_reverse"
+            "-include_reverse",
+            "-num_reducers", "10"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
 
@@ -161,7 +164,7 @@ public class PcapCliTest {
 
     long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
     long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
-    when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -184,6 +187,7 @@ public class PcapCliTest {
     String[] args = {
             "query",
             "-start_time", "500",
+            "-num_reducers", "10",
             "-query", "some query string"
     };
     List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
@@ -192,7 +196,7 @@ public class PcapCliTest {
     Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
     String query = "some query string";
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -206,6 +210,7 @@ public class PcapCliTest {
             "query",
             "-start_time", "500",
             "-end_time", "1000",
+            "-num_reducers", "10",
             "-base_path", "/base/path",
             "-base_output_path", "/base/output/path",
             "-query", "some query string"
@@ -216,7 +221,7 @@ public class PcapCliTest {
     Path base_output_path = new Path("/base/output/path");
     String query = "some query string";
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
     when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
 
     PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -235,6 +240,7 @@ public class PcapCliTest {
               "fixed",
               "-start_time", "500",
               "-end_time", "1000",
+              "-num_reducers", "10",
               "-base_path", "/base/path",
               "-base_output_path", "/base/output/path",
               "-query", "THIS IS AN ERROR"
@@ -259,6 +265,7 @@ public class PcapCliTest {
               "query",
               "-start_time", "500",
               "-end_time", "1000",
+              "-num_reducers", "10",
               "-base_path", "/base/path",
               "-base_output_path", "/base/output/path",
               "-ip_src_addr", "THIS IS AN ERROR"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index a181637..cce4074 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -19,6 +19,7 @@
 package org.apache.metron.pcap.mr;
 
 import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -29,6 +30,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -47,9 +49,42 @@ import java.util.stream.Stream;
 
 public class PcapJob {
   private static final Logger LOG = Logger.getLogger(PcapJob.class);
+  public static final String START_TS_CONF = "start_ts";
+  public static final String END_TS_CONF = "end_ts";
+  public static final String WIDTH_CONF = "width";
+  public static class PcapPartitioner extends Partitioner<LongWritable, BytesWritable> implements Configurable {
+    private Configuration configuration;
+    Long start = null;
+    Long end = null;
+    Long width = null;
+    @Override
+    public int getPartition(LongWritable longWritable, BytesWritable bytesWritable, int numPartitions) {
+      if(start == null) {
+        initialize();
+      }
+      long x = longWritable.get();
+      int ret = (int)Long.divideUnsigned(x - start, width);
+      return ret;
+    }
+
+    private void initialize() {
+      start = Long.parseUnsignedLong(configuration.get(START_TS_CONF));
+      end = Long.parseUnsignedLong(configuration.get(END_TS_CONF));
+      width = Long.parseLong(configuration.get(WIDTH_CONF));
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.configuration = conf;
+    }
+
+    @Override
+    public Configuration getConf() {
+      return configuration;
+    }
+  }
   public static class PcapMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
-    public static final String START_TS_CONF = "start_ts";
-    public static final String END_TS_CONF = "end_ts";
+
     PcapFilter filter;
     long start;
     long end;
@@ -170,6 +205,7 @@ public class PcapJob {
                             , Path baseOutputPath
                             , long beginNS
                             , long endNS
+                            , int numReducers
                             , T fields
                             , Configuration conf
                             , FileSystem fs
@@ -189,6 +225,7 @@ public class PcapJob {
                        , outputPath
                        , beginNS
                        , endNS
+                       , numReducers
                        , fields
                        , conf
                        , fs
@@ -204,28 +241,34 @@ public class PcapJob {
   }
 
 
+  public static int findWidth(long start, long end, int numReducers) {
+    return (int)Long.divideUnsigned(end - start, numReducers) + 1;
+  }
 
 
   public <T> Job createJob( Path basePath
                       , Path outputPath
                       , long beginNS
                       , long endNS
+                      , int numReducers
                       , T fields
                       , Configuration conf
                       , FileSystem fs
                       , PcapFilterConfigurator<T> filterImpl
                       ) throws IOException
   {
-    conf.set(PcapMapper.START_TS_CONF, Long.toUnsignedString(beginNS));
-    conf.set(PcapMapper.END_TS_CONF, Long.toUnsignedString(endNS));
+    conf.set(START_TS_CONF, Long.toUnsignedString(beginNS));
+    conf.set(END_TS_CONF, Long.toUnsignedString(endNS));
+    conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers));
     filterImpl.addToConfig(fields, conf);
     Job job = new Job(conf);
     job.setJarByClass(PcapJob.class);
     job.setMapperClass(PcapJob.PcapMapper.class);
     job.setMapOutputKeyClass(LongWritable.class);
     job.setMapOutputValueClass(BytesWritable.class);
-    job.setNumReduceTasks(1);
+    job.setNumReduceTasks(numReducers);
     job.setReducerClass(PcapReducer.class);
+    job.setPartitionerClass(PcapPartitioner.class);
     job.setOutputKeyClass(LongWritable.class);
     job.setOutputValueClass(BytesWritable.class);
     SequenceFileInputFormat.addInputPaths(job, Joiner.on(',').join(getPaths(fs, basePath, beginNS, endNS )));