You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/07 22:50:21 UTC
incubator-metron git commit: METRON-381: Add support for multiple
reducers in pcap_query.sh closes apache/incubator-metron#217
Repository: incubator-metron
Updated Branches:
refs/heads/master e3ce452a0 -> 7445e328d
METRON-381: Add support for multiple reducers in pcap_query.sh closes apache/incubator-metron#217
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7445e328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7445e328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7445e328
Branch: refs/heads/master
Commit: 7445e328dfc34736bf81e5ba1d6580dabbd4376a
Parents: e3ce452
Author: cstella <ce...@gmail.com>
Authored: Wed Sep 7 15:49:41 2016 -0700
Committer: cstella <ce...@gmail.com>
Committed: Wed Sep 7 15:49:41 2016 -0700
----------------------------------------------------------------------
.../pcapservice/PcapReceiverImplRestEasy.java | 5 ++
.../PcapReceiverImplRestEasyTest.java | 19 +++----
.../components/KafkaWithZKComponent.java | 8 ++-
metron-platform/metron-pcap-backend/README.md | 4 ++
.../org/apache/metron/pcap/query/CliConfig.java | 9 ++++
.../org/apache/metron/pcap/query/CliParser.java | 8 +++
.../org/apache/metron/pcap/query/PcapCli.java | 9 +++-
.../PcapTopologyIntegrationTest.java | 10 ++++
.../apache/metron/pcap/query/PcapCliTest.java | 23 ++++++---
.../java/org/apache/metron/pcap/mr/PcapJob.java | 53 ++++++++++++++++++--
10 files changed, 123 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 9510318..18b5dc9 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -104,6 +104,7 @@ public class PcapReceiverImplRestEasy {
* @param query Filter results based on this query
* @param startTime Only return packets originating after this start time
* @param endTime Only return packets originating before this end time
+ * @param numReducers Number of reducers to use
* @param servlet_response
* @return REST response
* @throws IOException
@@ -114,6 +115,7 @@ public class PcapReceiverImplRestEasy {
@QueryParam ("query") String query,
@DefaultValue("-1") @QueryParam ("startTime")long startTime,
@DefaultValue("-1") @QueryParam ("endTime")long endTime,
+ @DefaultValue("10") @QueryParam ("numReducers")int numReducers,
@Context HttpServletResponse servlet_response)
throws IOException {
@@ -139,6 +141,7 @@ public class PcapReceiverImplRestEasy {
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
+ , numReducers
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
@@ -184,6 +187,7 @@ public class PcapReceiverImplRestEasy {
@QueryParam ("dstPort") String dstPort,
@DefaultValue("-1") @QueryParam ("startTime")long startTime,
@DefaultValue("-1") @QueryParam ("endTime")long endTime,
+ @DefaultValue("10") @QueryParam ("numReducers")int numReducers,
@DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
@Context HttpServletResponse servlet_response)
@@ -237,6 +241,7 @@ public class PcapReceiverImplRestEasy {
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
+ , numReducers
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index bfe2233..1c1c236 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -48,6 +48,7 @@ public class PcapReceiverImplRestEasyTest {
, Path baseOutputPath
, long beginNS
, long endNS
+ , int numReducers
, T fields
, Configuration conf
, FileSystem fs
@@ -91,7 +92,7 @@ public class PcapReceiverImplRestEasyTest {
{
boolean includeReverseTraffic = false;
- fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -104,7 +105,7 @@ public class PcapReceiverImplRestEasyTest {
}
{
boolean includeReverseTraffic = true;
- fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -122,7 +123,7 @@ public class PcapReceiverImplRestEasyTest {
long startTime = 100;
long endTime = 1000;
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
- queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+ queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
Assert.assertEquals(query, queryQueryHandler.fields);
@@ -140,7 +141,7 @@ public class PcapReceiverImplRestEasyTest {
long startTime = 100;
long endTime = 1000;
boolean includeReverseTraffic = false;
- fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -162,7 +163,7 @@ public class PcapReceiverImplRestEasyTest {
long startTime = 100;
long endTime = 1000;
boolean includeReverseTraffic = false;
- fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -185,7 +186,7 @@ public class PcapReceiverImplRestEasyTest {
long endTime = 1000;
{
boolean includeReverseTraffic = false;
- fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -198,7 +199,7 @@ public class PcapReceiverImplRestEasyTest {
}
{
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
- queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+ queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
Assert.assertEquals(query, queryQueryHandler.fields);
@@ -218,7 +219,7 @@ public class PcapReceiverImplRestEasyTest {
long endTime = -1;
{
boolean includeReverseTraffic = false;
- fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
@@ -231,7 +232,7 @@ public class PcapReceiverImplRestEasyTest {
}
{
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
- queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+ queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
Assert.assertEquals(query, queryQueryHandler.fields);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index 6d2261b..ffe7b54 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -157,8 +157,12 @@ public class KafkaWithZKComponent implements InMemoryComponent {
@Override
public void stop() {
- kafkaServer.shutdown();
- zkClient.close();
+ if(kafkaServer != null) {
+ kafkaServer.shutdown();
+ }
+ if(zkClient != null) {
+ zkClient.close();
+ }
if(zkServer != null) {
zkServer.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index b708fe5..927ae40 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -96,6 +96,8 @@ usage: Fixed filter options
-dp,--ip_dst_port <arg> Destination port
-et,--end_time <arg> Packet end time range. Default is current
system time.
+ -nr,--num_reducers <arg> The number of reducers to use. Default
+ is 10.
-h,--help Display help
-ir,--include_reverse Indicates if filter should check swapped
src/dest addresses and IPs
@@ -116,6 +118,8 @@ usage: Query filter options
millis since the epoch.
-et,--end_time <arg> Packet end time range. Default is current
system time.
+ -nr,--num_reducers <arg> The number of reducers to use. Default
+ is 10.
-h,--help Display help
-q,--query <arg> Query string to use as a filter
-st,--start_time <arg> (required) Packet start time range.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
index a0271b8..f8ab0ac 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
@@ -30,6 +30,7 @@ public class CliConfig {
private String baseOutputPath;
private long startTime;
private long endTime;
+ private int numReducers = 0;
private DateFormat dateFormat;
public CliConfig() {
@@ -40,6 +41,10 @@ public class CliConfig {
endTime = -1L;
}
+ public int getNumReducers() {
+ return numReducers;
+ }
+
public boolean showHelp() {
return showHelp;
}
@@ -91,4 +96,8 @@ public class CliConfig {
public DateFormat getDateFormat() {
return dateFormat;
}
+
+ public void setNumReducers(int numReducers) {
+ this.numReducers = numReducers;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index 4fbb05d..ea6f8e7 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -36,6 +36,7 @@ public class CliParser {
options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
+ options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true));
options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
return options;
@@ -77,6 +78,13 @@ public class CliParser {
//no-op
}
}
+ if (commandLine.hasOption("num_reducers")) {
+ int numReducers = Integer.parseInt(commandLine.getOptionValue("num_reducers"));
+ config.setNumReducers(numReducers);
+ }
+ else {
+ config.setNumReducers(10);
+ }
if (commandLine.hasOption("end_time")) {
try {
if (commandLine.hasOption("date_format")) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index e7ce4da..d96e166 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -93,6 +93,7 @@ public class PcapCli {
new Path(config.getBaseOutputPath()),
startTime,
endTime,
+ config.getNumReducers(),
config.getFixedFields(),
hadoopConf,
FileSystem.get(hadoopConf),
@@ -128,6 +129,7 @@ public class PcapCli {
new Path(config.getBaseOutputPath()),
startTime,
endTime,
+ config.getNumReducers(),
config.getQuery(),
hadoopConf,
FileSystem.get(hadoopConf),
@@ -146,7 +148,12 @@ public class PcapCli {
String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
String outFileName = String.format("pcap-data-%s.pcap", timestamp);
try {
- resultsWriter.write(results, outFileName);
+ if(results.size() > 0) {
+ resultsWriter.write(results, outFileName);
+ }
+ else {
+ System.out.println("No results returned.");
+ }
} catch (IOException e) {
LOGGER.error("Unable to write file", e);
return -1;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 8fcdeed..d4367ea 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -276,6 +276,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(4, pcapEntries)
, getTimestamp(5, pcapEntries)
+ , 10
, new EnumMap<>(Constants.Fields.class)
, new Configuration()
, FileSystem.get(new Configuration())
@@ -292,6 +293,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(4, pcapEntries)
, getTimestamp(5, pcapEntries)
+ , 10
, ""
, new Configuration()
, FileSystem.get(new Configuration())
@@ -307,6 +309,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
+ , 10
, new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.DST_ADDR, "207.28.210.1");
}}
@@ -325,6 +328,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
+ , 10
, "ip_dst_addr == '207.28.210.1'"
, new Configuration()
, FileSystem.get(new Configuration())
@@ -340,6 +344,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
+ , 10
, new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.PROTOCOL, "foo");
}}
@@ -358,6 +363,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
+ , 10
, "protocol == 'foo'"
, new Configuration()
, FileSystem.get(new Configuration())
@@ -373,6 +379,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , 10
, new EnumMap<>(Constants.Fields.class)
, new Configuration()
, FileSystem.get(new Configuration())
@@ -389,6 +396,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , 10
, ""
, new Configuration()
, FileSystem.get(new Configuration())
@@ -403,6 +411,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , 10
, new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.DST_PORT, "22");
}}
@@ -433,6 +442,7 @@ public class PcapTopologyIntegrationTest {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , 10
, "ip_dst_port == '22'"
, new Configuration()
, FileSystem.get(new Configuration())
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 92ab26a..4d6432e 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -71,7 +71,8 @@ public class PcapCliTest {
"-ip_dst_addr", "192.168.1.2",
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
- "-protocol", "6"
+ "-protocol", "6",
+ "-num_reducers", "10"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
@@ -86,7 +87,7 @@ public class PcapCliTest {
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
}};
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -107,7 +108,8 @@ public class PcapCliTest {
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
"-protocol", "6",
- "-include_reverse"
+ "-include_reverse",
+ "-num_reducers", "10"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
@@ -122,7 +124,7 @@ public class PcapCliTest {
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
}};
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -144,7 +146,8 @@ public class PcapCliTest {
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
"-protocol", "6",
- "-include_reverse"
+ "-include_reverse",
+ "-num_reducers", "10"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
@@ -161,7 +164,7 @@ public class PcapCliTest {
long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
- when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -184,6 +187,7 @@ public class PcapCliTest {
String[] args = {
"query",
"-start_time", "500",
+ "-num_reducers", "10",
"-query", "some query string"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
@@ -192,7 +196,7 @@ public class PcapCliTest {
Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
String query = "some query string";
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -206,6 +210,7 @@ public class PcapCliTest {
"query",
"-start_time", "500",
"-end_time", "1000",
+ "-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-query", "some query string"
@@ -216,7 +221,7 @@ public class PcapCliTest {
Path base_output_path = new Path("/base/output/path");
String query = "some query string";
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -235,6 +240,7 @@ public class PcapCliTest {
"fixed",
"-start_time", "500",
"-end_time", "1000",
+ "-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-query", "THIS IS AN ERROR"
@@ -259,6 +265,7 @@ public class PcapCliTest {
"query",
"-start_time", "500",
"-end_time", "1000",
+ "-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
"-ip_src_addr", "THIS IS AN ERROR"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7445e328/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index a181637..cce4074 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -19,6 +19,7 @@
package org.apache.metron.pcap.mr;
import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -29,6 +30,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -47,9 +49,42 @@ import java.util.stream.Stream;
public class PcapJob {
private static final Logger LOG = Logger.getLogger(PcapJob.class);
+ public static final String START_TS_CONF = "start_ts";
+ public static final String END_TS_CONF = "end_ts";
+ public static final String WIDTH_CONF = "width";
+ public static class PcapPartitioner extends Partitioner<LongWritable, BytesWritable> implements Configurable {
+ private Configuration configuration;
+ Long start = null;
+ Long end = null;
+ Long width = null;
+ @Override
+ public int getPartition(LongWritable longWritable, BytesWritable bytesWritable, int numPartitions) {
+ if(start == null) {
+ initialize();
+ }
+ long x = longWritable.get();
+ int ret = (int)Long.divideUnsigned(x - start, width);
+ return ret;
+ }
+
+ private void initialize() {
+ start = Long.parseUnsignedLong(configuration.get(START_TS_CONF));
+ end = Long.parseUnsignedLong(configuration.get(END_TS_CONF));
+ width = Long.parseLong(configuration.get(WIDTH_CONF));
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.configuration = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+ }
public static class PcapMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
- public static final String START_TS_CONF = "start_ts";
- public static final String END_TS_CONF = "end_ts";
+
PcapFilter filter;
long start;
long end;
@@ -170,6 +205,7 @@ public class PcapJob {
, Path baseOutputPath
, long beginNS
, long endNS
+ , int numReducers
, T fields
, Configuration conf
, FileSystem fs
@@ -189,6 +225,7 @@ public class PcapJob {
, outputPath
, beginNS
, endNS
+ , numReducers
, fields
, conf
, fs
@@ -204,28 +241,34 @@ public class PcapJob {
}
+ public static int findWidth(long start, long end, int numReducers) {
+ return (int)Long.divideUnsigned(end - start, numReducers) + 1;
+ }
public <T> Job createJob( Path basePath
, Path outputPath
, long beginNS
, long endNS
+ , int numReducers
, T fields
, Configuration conf
, FileSystem fs
, PcapFilterConfigurator<T> filterImpl
) throws IOException
{
- conf.set(PcapMapper.START_TS_CONF, Long.toUnsignedString(beginNS));
- conf.set(PcapMapper.END_TS_CONF, Long.toUnsignedString(endNS));
+ conf.set(START_TS_CONF, Long.toUnsignedString(beginNS));
+ conf.set(END_TS_CONF, Long.toUnsignedString(endNS));
+ conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers));
filterImpl.addToConfig(fields, conf);
Job job = new Job(conf);
job.setJarByClass(PcapJob.class);
job.setMapperClass(PcapJob.PcapMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
- job.setNumReduceTasks(1);
+ job.setNumReduceTasks(numReducers);
job.setReducerClass(PcapReducer.class);
+ job.setPartitionerClass(PcapPartitioner.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(BytesWritable.class);
SequenceFileInputFormat.addInputPaths(job, Joiner.on(',').join(getPaths(fs, basePath, beginNS, endNS )));