You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/20 13:08:34 UTC
incubator-metron git commit: METRON-235 Expose filtering capability
for PCAP via CLI tool. (mmiklavc via cestella) closes
apache/incubator-metron#156
Repository: incubator-metron
Updated Branches:
refs/heads/master 5ca8d650c -> 46e48f890
METRON-235 Expose filtering capability for PCAP via CLI tool. (mmiklavc via cestella) closes apache/incubator-metron#156
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/46e48f89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/46e48f89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/46e48f89
Branch: refs/heads/master
Commit: 46e48f890756c1afe3ffc49b924b40eb26144af2
Parents: 5ca8d65
Author: mmiklavc <mi...@gmail.com>
Authored: Mon Jun 20 09:08:16 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Jun 20 09:08:16 2016 -0400
----------------------------------------------------------------------
.../roles/ambari_config/defaults/main.yml | 6 +-
.../roles/ambari_config/vars/single_node_vm.yml | 2 +
.../SensorEnrichmentUpdateConfig.java | 19 +-
.../org/apache/metron/common/system/Clock.java | 37 +++
.../SensorEnrichmentUpdateConfigTest.java | 73 +++--
.../apache/metron/common/system/ClockTest.java | 54 ++++
metron-platform/metron-pcap-backend/README.md | 47 ++++
.../org/apache/metron/pcap/query/CliConfig.java | 94 +++++++
.../org/apache/metron/pcap/query/CliParser.java | 102 +++++++
.../metron/pcap/query/FixedCliConfig.java | 47 ++++
.../metron/pcap/query/FixedCliParser.java | 67 +++++
.../org/apache/metron/pcap/query/PcapCli.java | 176 ++++++++++++
.../metron/pcap/query/QueryCliConfig.java | 30 ++
.../metron/pcap/query/QueryCliParser.java | 58 ++++
.../apache/metron/pcap/query/ResultsWriter.java | 48 ++++
.../src/main/scripts/pcap_query.sh | 34 +++
.../apache/metron/pcap/query/PcapCliTest.java | 275 +++++++++++++++++++
17 files changed, 1122 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-deployment/roles/ambari_config/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/ambari_config/defaults/main.yml b/metron-deployment/roles/ambari_config/defaults/main.yml
index 88ea47a..43d5d0b 100644
--- a/metron-deployment/roles/ambari_config/defaults/main.yml
+++ b/metron-deployment/roles/ambari_config/defaults/main.yml
@@ -29,5 +29,7 @@ storm_local_dir: /hadoop/storm
kafka_log_dirs: /kafka-log
cluster_type: small_cluster
nodemanager_mem_mb : 4096
-mapred_map_mem_mb : 1024
-mapred_reduce_mem_mb : 1024
+mapred_map_java_opts : -Xmx1024m
+mapred_reduce_java_opts : -Xmx1024m
+mapred_map_mem_mb : 1229
+mapred_reduce_mem_mb : 1229
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
index 031fe9b..9a6afc1 100644
--- a/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
+++ b/metron-deployment/roles/ambari_config/vars/single_node_vm.yml
@@ -61,6 +61,8 @@ configurations:
jobhistory_heapsize: 256
- mapred-site:
mapreduce.jobhistory.recovery.store.leveldb.path : '{{ jhs_recovery_store_ldb_path }}'
+ mapreduce.map.java.opts : '{{ mapred_map_java_opts }}'
+ mapreduce.reduce.java.opts : '{{ mapred_reduce_java_opts }}'
mapreduce.map.memory.mb : '{{ mapred_map_mem_mb }}'
mapreduce.reduce.memory.mb : '{{ mapred_reduce_mem_mb }}'
- yarn-site:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
index 99e681a..259fcf0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
@@ -115,13 +115,7 @@ public class SensorEnrichmentUpdateConfig {
{
Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
- SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
- if(config == null) {
- config = scHandler.readConfig(kv.getKey());
- if(_LOG.isDebugEnabled()) {
- _LOG.debug(config.toJSON());
- }
- }
+ SensorEnrichmentConfig config = findConfigBySensorType(scHandler, sourceConfigsChanged, kv.getKey());
Map<String, List<String> > fieldMap = null;
Map<String, List<String> > fieldToTypeMap = null;
List<String> fieldList = null;
@@ -206,4 +200,15 @@ public class SensorEnrichmentUpdateConfig {
}
}
+ private static SensorEnrichmentConfig findConfigBySensorType(SourceConfigHandler scHandler, Map<String, SensorEnrichmentConfig> sourceConfigsChanged, String key) throws Exception {
+ SensorEnrichmentConfig config = sourceConfigsChanged.get(key);
+ if(config == null) {
+ config = scHandler.readConfig(key);
+ if(_LOG.isDebugEnabled()) {
+ _LOG.debug(config.toJSON());
+ }
+ }
+ return config;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java
new file mode 100644
index 0000000..458b730
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/Clock.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.system;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class Clock {
+ private final static String UTC = "UTC";
+
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
+ public String currentTimeFormatted(String stdDateFormat) {
+ SimpleDateFormat format = new SimpleDateFormat(stdDateFormat);
+ format.setTimeZone(TimeZone.getTimeZone(UTC));
+ return format.format(new Date(currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
index 77035fb..681c4c1 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
@@ -35,14 +35,14 @@ public class SensorEnrichmentUpdateConfigTest {
"batchSize": 5,
"enrichment" : {
"fieldMap": {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
+ "geo": ["ip_dst_addr", "ip_src_addr"],
+ "host": ["host"]
}
},
"threatIntel": {
"fieldMap": {
"hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
- },
+ },
"fieldToTypeMap": {
"ip_dst_addr" : [ "malicious_ip" ]
,"ip_src_addr" : [ "malicious_ip" ]
@@ -63,27 +63,24 @@ public class SensorEnrichmentUpdateConfigTest {
{
"zkQuorum" : "localhost:2181"
,"sensorToFieldList" : {
- "bro" : {
+ "bro" : {
"type" : "THREAT_INTEL"
,"fieldToEnrichmentTypes" : {
- "ip_src_addr" : [ "playful" ]
- ,"ip_dst_addr" : [ "playful" ]
+ "ip_src_addr" : [ "playful" ]
+ ,"ip_dst_addr" : [ "playful" ]
}
- }
+ }
}
}
- */
+ */
@Multiline
public static String threatIntelConfigStr;
@Test
public void testThreatIntel() throws Exception {
-
SensorEnrichmentConfig broSc = (SensorEnrichmentConfig) ConfigurationType.ENRICHMENT.deserialize(sourceConfigStr);
-
-
- SensorEnrichmentUpdateConfig config = JSONUtils.INSTANCE.load(threatIntelConfigStr, SensorEnrichmentUpdateConfig.class);
- final Map<String, SensorEnrichmentConfig> outputScs = new HashMap<>();
+ SensorEnrichmentUpdateConfig threatIntelConfig = JSONUtils.INSTANCE.load(threatIntelConfigStr, SensorEnrichmentUpdateConfig.class);
+ final Map<String, SensorEnrichmentConfig> finalEnrichmentConfig = new HashMap<>();
SensorEnrichmentUpdateConfig.SourceConfigHandler scHandler = new SensorEnrichmentUpdateConfig.SourceConfigHandler() {
@Override
public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
@@ -97,50 +94,50 @@ public class SensorEnrichmentUpdateConfigTest {
@Override
public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
- outputScs.put(sensor, config);
+ finalEnrichmentConfig.put(sensor, config);
}
};
- SensorEnrichmentUpdateConfig.updateSensorConfigs(scHandler, config.getSensorToFieldList());
- Assert.assertNotNull(outputScs.get("bro"));
- Assert.assertNotSame(outputScs.get("bro"), broSc);
- Assert.assertEquals( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldMap().get(Constants.SIMPLE_HBASE_THREAT_INTEL).size()
+ SensorEnrichmentUpdateConfig.updateSensorConfigs(scHandler, threatIntelConfig.getSensorToFieldList());
+ Assert.assertNotNull(finalEnrichmentConfig.get("bro"));
+ Assert.assertNotSame(finalEnrichmentConfig.get("bro"), broSc);
+ Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldMap().get(Constants.SIMPLE_HBASE_THREAT_INTEL).size()
, 2
);
- Assert.assertEquals(1, outputScs.get("bro").getThreatIntel().getTriageConfig().getRiskLevelRules().size());
- Assert.assertTrue( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldMap()
+ Assert.assertEquals(1, finalEnrichmentConfig.get("bro").getThreatIntel().getTriageConfig().getRiskLevelRules().size());
+ Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldMap()
.get(Constants.SIMPLE_HBASE_THREAT_INTEL)
.contains("ip_src_addr")
);
- Assert.assertTrue( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldMap()
+ Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldMap()
.get(Constants.SIMPLE_HBASE_THREAT_INTEL)
.contains("ip_dst_addr")
);
- Assert.assertEquals( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().keySet().size()
+ Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().keySet().size()
, 2
);
- Assert.assertEquals( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").size()
+ Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").size()
, 2
);
- Assert.assertTrue( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("playful")
+ Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("playful")
);
- Assert.assertTrue( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("malicious_ip")
+ Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_src_addr").contains("malicious_ip")
);
- Assert.assertEquals( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").size()
+ Assert.assertEquals( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").size()
, 2
);
- Assert.assertTrue( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("playful")
+ Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("playful")
);
- Assert.assertTrue( outputScs.get("bro").toJSON()
- , outputScs.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("malicious_ip")
+ Assert.assertTrue( finalEnrichmentConfig.get("bro").toJSON()
+ , finalEnrichmentConfig.get("bro").getThreatIntel().getFieldToTypeMap().get("ip_dst_addr").contains("malicious_ip")
);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java
new file mode 100644
index 0000000..1efc356
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/system/ClockTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.system;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class ClockTest {
+
+ @Test
+ public void returns_system_time() throws Exception {
+ Clock clock = new Clock();
+ long t1 = clock.currentTimeMillis();
+ Thread.sleep(50);
+ long t2 = clock.currentTimeMillis();
+ Thread.sleep(50);
+ long t3 = clock.currentTimeMillis();
+ assertThat("t3 should be greater", t3 > t2, equalTo(true));
+ assertThat("t2 should be greater", t2 > t1, equalTo(true));
+ }
+
+ @Test
+ public void formats_system_time_given_passed_format() throws Exception {
+ Clock clock = Mockito.spy(Clock.class);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSSZ");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date date = sdf.parse("20160615183527162+0000");
+ Mockito.when(clock.currentTimeMillis()).thenReturn(date.getTime());
+ assertThat("time not right", clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"), equalTo("20160615183527162+0000"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index 9969c02..b708fe5 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -63,6 +63,7 @@ arguments has been created to make this very simple. Simply, execute
## Utilities
+### Inspector Utility
In order to ensure that data can be read back out, a utility,
`$METRON_HOME/bin/pcap_inspector.sh` has been
created to read portions of the sequence files.
@@ -73,3 +74,49 @@ usage: PcapInspector
-i,--input <SEQ_FILE> Input sequence file on HDFS
-n,--num_packets <N> Number of packets to dump
```
+
+### Query Filter Utility
+This tool exposes the two methods for filtering PCAP data via a command line tool:
+- fixed
+- query (Metron Stellar)
+
+The tool is executed via ```${metron_home}/bin/pcap_query.sh [fixed|query]```
+
+#### Usage
+```
+usage: Fixed filter options
+ -bop,--base_output_path <arg> Query result output path. Default is
+ '/tmp'
+ -bp,--base_path <arg> Base PCAP data path. Default is
+ '/apps/metron/pcap'
+ -da,--ip_dst_addr <arg> Destination IP address
+ -df,--date_format <arg> Date format to use for parsing start_time
+ and end_time. Default is to use time in
+ millis since the epoch.
+ -dp,--ip_dst_port <arg> Destination port
+ -et,--end_time <arg> Packet end time range. Default is current
+ system time.
+ -h,--help Display help
+ -ir,--include_reverse Indicates if filter should check swapped
+ src/dest addresses and IPs
+ -p,--protocol <arg> IP Protocol
+ -sa,--ip_src_addr <arg> Source IP address
+ -sp,--ip_src_port <arg> Source port
+ -st,--start_time <arg> (required) Packet start time range.
+```
+
+```
+usage: Query filter options
+ -bop,--base_output_path <arg> Query result output path. Default is
+ '/tmp'
+ -bp,--base_path <arg> Base PCAP data path. Default is
+ '/apps/metron/pcap'
+ -df,--date_format <arg> Date format to use for parsing start_time
+ and end_time. Default is to use time in
+ millis since the epoch.
+ -et,--end_time <arg> Packet end time range. Default is current
+ system time.
+ -h,--help Display help
+ -q,--query <arg> Query string to use as a filter
+ -st,--start_time <arg> (required) Packet start time range.
+```
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
new file mode 100644
index 0000000..a0271b8
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+public class CliConfig {
+ public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
+ public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
+ private boolean showHelp;
+ private String basePath;
+ private String baseOutputPath;
+ private long startTime;
+ private long endTime;
+ private DateFormat dateFormat;
+
+ public CliConfig() {
+ showHelp = false;
+ basePath = BASE_PATH_DEFAULT;
+ baseOutputPath = BASE_OUTPUT_PATH_DEFAULT;
+ startTime = -1L;
+ endTime = -1L;
+ }
+
+ public boolean showHelp() {
+ return showHelp;
+ }
+
+ public void setShowHelp(boolean showHelp) {
+ this.showHelp = showHelp;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public String getBaseOutputPath() {
+ return baseOutputPath;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ public void setBaseOutputPath(String baseOutputPath) {
+ this.baseOutputPath = baseOutputPath;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public boolean isNullOrEmpty(String val) {
+ return StringUtils.isEmpty(val);
+ }
+
+ public void setDateFormat(String dateFormat) {
+ this.dateFormat = new SimpleDateFormat(dateFormat);
+ }
+
+ public DateFormat getDateFormat() {
+ return dateFormat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
new file mode 100644
index 0000000..4fbb05d
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.*;
+
+/**
+ * Provides commmon required fields for the PCAP filter jobs
+ */
+public class CliParser {
+ private CommandLineParser parser;
+
+ public CliParser() {
+ parser = new PosixParser();
+ }
+
+ public Options buildOptions() {
+ Options options = new Options();
+ options.addOption(newOption("h", "help", false, "Display help"));
+ options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
+ options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
+ options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
+ options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
+ options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
+ return options;
+ }
+
+ protected Option newOption(String opt, String longOpt, boolean hasArg, String desc) {
+ return newOption(opt, longOpt, hasArg, desc, false);
+ }
+
+ protected Option newOption(String opt, String longOpt, boolean hasArg, String desc, boolean required) {
+ Option option = new Option(opt, longOpt, hasArg, desc);
+ option.setRequired(required);
+ return option;
+ }
+
+ public void parse(CommandLine commandLine, CliConfig config) throws java.text.ParseException {
+ if (commandLine.hasOption("help")) {
+ config.setShowHelp(true);
+ }
+ if (commandLine.hasOption("date_format")) {
+ config.setDateFormat(commandLine.getOptionValue("date_format"));
+ }
+ if (commandLine.hasOption("base_path")) {
+ config.setBasePath(commandLine.getOptionValue("base_path"));
+ }
+ if (commandLine.hasOption("base_output_path")) {
+ config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
+ }
+ if (commandLine.hasOption("start_time")) {
+ try {
+ if (commandLine.hasOption("date_format")) {
+ long startTime = config.getDateFormat().parse(commandLine.getOptionValue("start_time")).getTime();
+ config.setStartTime(startTime);
+ } else {
+ long startTime = Long.parseLong(commandLine.getOptionValue("start_time"));
+ config.setStartTime(startTime);
+ }
+ } catch (NumberFormatException nfe) {
+ //no-op
+ }
+ }
+ if (commandLine.hasOption("end_time")) {
+ try {
+ if (commandLine.hasOption("date_format")) {
+ long endTime = config.getDateFormat().parse(commandLine.getOptionValue("end_time")).getTime();
+ config.setEndTime(endTime);
+ } else {
+ long endTime = Long.parseLong(commandLine.getOptionValue("end_time"));
+ config.setEndTime(endTime);
+ }
+ } catch (NumberFormatException nfe) {
+ //no-op
+ }
+ }
+ }
+
+ public void printHelp(String msg, Options opts) {
+ new HelpFormatter().printHelp(msg, opts);
+ }
+
+ protected CommandLineParser getParser() {
+ return parser;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
new file mode 100644
index 0000000..897e0fd
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.metron.common.Constants;
+
+import java.util.EnumMap;
+
+public class FixedCliConfig extends CliConfig {
+
+ private EnumMap<Constants.Fields, String> fixedFields;
+
+ public FixedCliConfig() {
+ this.fixedFields = new EnumMap<>(Constants.Fields.class);
+ }
+
+ public EnumMap<Constants.Fields, String> getFixedFields() {
+ return fixedFields;
+ }
+
+ public void setFixedFields(EnumMap<Constants.Fields, String> fixedFields) {
+ this.fixedFields = fixedFields;
+ }
+
+ public void putFixedField(Constants.Fields key, String value) {
+ String trimmedVal = value != null ? value.trim() : null;
+ if (!isNullOrEmpty(trimmedVal)) {
+ this.fixedFields.put(key, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
new file mode 100644
index 0000000..1123cad
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.metron.common.Constants;
+
+public class FixedCliParser extends CliParser {
+ private Options fixedOptions;
+
+ public FixedCliParser() {
+ fixedOptions = buildFixedOptions();
+ }
+
+ private Options buildFixedOptions() {
+ Options options = buildOptions();
+ options.addOption(newOption("sa", "ip_src_addr", true, "Source IP address"));
+ options.addOption(newOption("da", "ip_dst_addr", true, "Destination IP address"));
+ options.addOption(newOption("sp", "ip_src_port", true, "Source port"));
+ options.addOption(newOption("dp", "ip_dst_port", true, "Destination port"));
+ options.addOption(newOption("p", "protocol", true, "IP Protocol"));
+ options.addOption(newOption("ir", "include_reverse", false, "Indicates if filter should check swapped src/dest addresses and IPs"));
+ return options;
+ }
+
+ /**
+ * Parses fixed pcap filter options and required parameters common to all filter types.
+ *
+ * @param args command line arguments to parse
+ * @return Configuration tailored to fixed pcap queries
+ * @throws ParseException
+ */
+ public FixedCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+ CommandLine commandLine = getParser().parse(fixedOptions, args);
+ FixedCliConfig config = new FixedCliConfig();
+ super.parse(commandLine, config);
+ config.putFixedField(Constants.Fields.SRC_ADDR, commandLine.getOptionValue("ip_src_addr"));
+ config.putFixedField(Constants.Fields.DST_ADDR, commandLine.getOptionValue("ip_dst_addr"));
+ config.putFixedField(Constants.Fields.SRC_PORT, commandLine.getOptionValue("ip_src_port"));
+ config.putFixedField(Constants.Fields.DST_PORT, commandLine.getOptionValue("ip_dst_port"));
+ config.putFixedField(Constants.Fields.PROTOCOL, commandLine.getOptionValue("protocol"));
+ config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, Boolean.toString(commandLine.hasOption("include_reverse")));
+ return config;
+ }
+
+ public void printHelp() {
+ super.printHelp("Fixed filter options", fixedOptions);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
new file mode 100644
index 0000000..e7ce4da
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PcapCli {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PcapCli.class);
+ private final PcapJob jobRunner;
+ private final ResultsWriter resultsWriter;
+ private final Clock clock;
+
+ public static void main(String[] args) {
+ int status = new PcapCli(new PcapJob(), new ResultsWriter(), new Clock()).run(args);
+ System.exit(status);
+ }
+
+ public PcapCli(PcapJob jobRunner, ResultsWriter resultsWriter, Clock clock) {
+ this.jobRunner = jobRunner;
+ this.resultsWriter = resultsWriter;
+ this.clock = clock;
+ }
+
+ public int run(String[] args) {
+ if (args.length < 1) {
+ printBasicHelp();
+ return -1;
+ }
+ String jobType = args[0];
+ List<byte[]> results = new ArrayList<>();
+ String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+ Configuration hadoopConf = new Configuration();
+ String[] otherArgs = null;
+ try {
+ otherArgs = new GenericOptionsParser(hadoopConf, commandArgs).getRemainingArgs();
+ } catch (IOException e) {
+ LOGGER.error("Failed to configure hadoop with provided options: " + e.getMessage(), e);
+ return -1;
+ }
+ if ("fixed".equals(jobType)) {
+ FixedCliParser fixedParser = new FixedCliParser();
+ FixedCliConfig config = null;
+ try {
+ config = fixedParser.parse(otherArgs);
+ } catch (ParseException | java.text.ParseException e) {
+ System.err.println(e.getMessage());
+ fixedParser.printHelp();
+ return -1;
+ }
+ if (config.showHelp()) {
+ fixedParser.printHelp();
+ return 0;
+ }
+ Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+ long startTime = time.getLeft();
+ long endTime = time.getRight();
+
+ try {
+ results = jobRunner.query(
+ new Path(config.getBasePath()),
+ new Path(config.getBaseOutputPath()),
+ startTime,
+ endTime,
+ config.getFixedFields(),
+ hadoopConf,
+ FileSystem.get(hadoopConf),
+ new FixedPcapFilter.Configurator());
+ } catch (IOException | ClassNotFoundException e) {
+ LOGGER.error("Failed to execute fixed filter job: " + e.getMessage(), e);
+ return -1;
+ } catch (InterruptedException e) {
+ LOGGER.error("Failed to execute fixed filter job: " + e.getMessage(), e);
+ return -1;
+ }
+ } else if ("query".equals(jobType)) {
+ QueryCliParser queryParser = new QueryCliParser();
+ QueryCliConfig config = null;
+ try {
+ config = queryParser.parse(otherArgs);
+ } catch (ParseException | java.text.ParseException e) {
+ System.err.println(e.getMessage());
+ queryParser.printHelp();
+ return -1;
+ }
+ if (config.showHelp()) {
+ queryParser.printHelp();
+ return 0;
+ }
+ Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+ long startTime = time.getLeft();
+ long endTime = time.getRight();
+
+ try {
+ results = jobRunner.query(
+ new Path(config.getBasePath()),
+ new Path(config.getBaseOutputPath()),
+ startTime,
+ endTime,
+ config.getQuery(),
+ hadoopConf,
+ FileSystem.get(hadoopConf),
+ new QueryPcapFilter.Configurator());
+ } catch (IOException | ClassNotFoundException e) {
+ LOGGER.error("Failed to execute query filter job: " + e.getMessage(), e);
+ return -1;
+ } catch (InterruptedException e) {
+ LOGGER.error("Failed to execute query filter job: " + e.getMessage(), e);
+ return -1;
+ }
+ } else {
+ printBasicHelp();
+ return -1;
+ }
+ String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
+ String outFileName = String.format("pcap-data-%s.pcap", timestamp);
+ try {
+ resultsWriter.write(results, outFileName);
+ } catch (IOException e) {
+ LOGGER.error("Unable to write file", e);
+ return -1;
+ }
+ return 0;
+ }
+
+ private Pair<Long, Long> timeAsNanosecondsSinceEpoch(long start, long end) {
+ long revisedStart = start;
+ if (revisedStart < 0) {
+ revisedStart = 0L;
+ }
+ long revisedEnd = end;
+ if (revisedEnd < 0) {
+ revisedEnd = System.currentTimeMillis();
+ }
+ //convert to nanoseconds since the epoch
+ revisedStart = TimestampConverters.MILLISECONDS.toNanoseconds(revisedStart);
+ revisedEnd = TimestampConverters.MILLISECONDS.toNanoseconds(revisedEnd);
+ return Pair.of(revisedStart, revisedEnd);
+ }
+
+ public void printBasicHelp() {
+ System.out.println("Usage: [fixed|query]");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
new file mode 100644
index 0000000..3d06e1d
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+public class QueryCliConfig extends CliConfig {
+ private String query;
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
new file mode 100644
index 0000000..72b5a95
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class QueryCliParser extends CliParser {
+ private Options queryOptions;
+
+ public QueryCliParser() {
+ queryOptions = setupOptions();
+ }
+
+ private Options setupOptions() {
+ Options options = buildOptions();
+ options.addOption(newOption("q", "query", true, "Query string to use as a filter"));
+ return options;
+ }
+
+ /**
+ * Parses query pcap filter options and required parameters common to all filter types.
+ *
+ * @param args command line arguments to parse
+ * @return Configuration tailored to query pcap queries
+ * @throws ParseException
+ */
+ public QueryCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+ CommandLine commandLine = getParser().parse(queryOptions, args);
+ QueryCliConfig config = new QueryCliConfig();
+ super.parse(commandLine, config);
+ if (commandLine.hasOption("query")) {
+ config.setQuery(commandLine.getOptionValue("query"));
+ }
+ return config;
+ }
+
+ public void printHelp() {
+ super.printHelp("Query filter options", queryOptions);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
new file mode 100644
index 0000000..ab11770
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.metron.pcap.PcapMerger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class ResultsWriter {
+
+ public void write(List<byte[]> pcaps, String outPath) throws IOException {
+ File out = new File(outPath);
+ try (FileOutputStream fos = new FileOutputStream(out)) {
+ fos.write(mergePcaps(pcaps));
+ }
+ }
+
+ public byte[] mergePcaps(List<byte[]> pcaps) throws IOException {
+ if (pcaps == null) {
+ return new byte[]{};
+ }
+ if (pcaps.size() == 1) {
+ return pcaps.get(0);
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, pcaps);
+ return baos.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh
new file mode 100755
index 0000000..c09aa03
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_query.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export PCAP_BACKEND_JAR=${project.artifactId}-$METRON_VERSION.jar
+
+yarn jar $METRON_HOME/lib/$PCAP_BACKEND_JAR org.apache.metron.pcap.query.PcapCli "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/46e48f89/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
new file mode 100644
index 0000000..92ab26a
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.query;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.when;
+
+public class PcapCliTest {
+
+ @Mock
+ private PcapJob jobRunner;
+ @Mock
+ private ResultsWriter resultsWriter;
+ @Mock
+ private Clock clock;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Exception {
+ String[] args = {
+ "fixed",
+ "-start_time", "500",
+ "-ip_src_addr", "192.168.1.1",
+ "-ip_dst_addr", "192.168.1.2",
+ "-ip_src_port", "8081",
+ "-ip_dst_port", "8082",
+ "-protocol", "6"
+ };
+ List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+ Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
+ Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+ EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "192.168.1.1");
+ put(Constants.Fields.DST_ADDR, "192.168.1.2");
+ put(Constants.Fields.SRC_PORT, "8081");
+ put(Constants.Fields.DST_PORT, "8082");
+ put(Constants.Fields.PROTOCOL, "6");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+ Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+ }
+
+ @Test
+ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_dateformat() throws Exception {
+ String[] args = {
+ "fixed",
+ "-start_time", "500",
+ "-end_time", "1000",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-ip_src_addr", "192.168.1.1",
+ "-ip_dst_addr", "192.168.1.2",
+ "-ip_src_port", "8081",
+ "-ip_dst_port", "8082",
+ "-protocol", "6",
+ "-include_reverse"
+ };
+ List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+ Path base_path = new Path("/base/path");
+ Path base_output_path = new Path("/base/output/path");
+ EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "192.168.1.1");
+ put(Constants.Fields.DST_ADDR, "192.168.1.2");
+ put(Constants.Fields.SRC_PORT, "8081");
+ put(Constants.Fields.DST_PORT, "8082");
+ put(Constants.Fields.PROTOCOL, "6");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+ }};
+
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+ Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+ }
+
+ @Test
+ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exception {
+ String[] args = {
+ "fixed",
+ "-start_time", "2016-06-13-18:35.00",
+ "-end_time", "2016-06-15-18:35.00",
+ "-date_format", "yyyy-MM-dd-HH:mm.ss",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-ip_src_addr", "192.168.1.1",
+ "-ip_dst_addr", "192.168.1.2",
+ "-ip_src_port", "8081",
+ "-ip_dst_port", "8082",
+ "-protocol", "6",
+ "-include_reverse"
+ };
+ List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+ Path base_path = new Path("/base/path");
+ Path base_output_path = new Path("/base/output/path");
+ EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "192.168.1.1");
+ put(Constants.Fields.DST_ADDR, "192.168.1.2");
+ put(Constants.Fields.SRC_PORT, "8081");
+ put(Constants.Fields.DST_PORT, "8082");
+ put(Constants.Fields.PROTOCOL, "6");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+ }};
+
+ long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
+ long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
+ when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+ Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+ }
+
+ private long asNanos(String inDate, String format) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
+ Date date = sdf.parse(inDate);
+ return TimestampConverters.MILLISECONDS.toNanoseconds(date.getTime());
+ }
+
+ private byte[] asBytes(String val) {
+ return val.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void runs_query_pcap_filter_job_with_default_argument_list() throws Exception {
+ String[] args = {
+ "query",
+ "-start_time", "500",
+ "-query", "some query string"
+ };
+ List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+ Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
+ Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+ String query = "some query string";
+
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+ Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+ }
+
+ @Test
+ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exception {
+ String[] args = {
+ "query",
+ "-start_time", "500",
+ "-end_time", "1000",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-query", "some query string"
+ };
+ List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+
+ Path base_path = new Path("/base/path");
+ Path base_output_path = new Path("/base/output/path");
+ String query = "some query string";
+
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect no errors on run", cli.run(args), equalTo(0));
+ Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
+ }
+
+ @Test
+ public void invalid_fixed_filter_arg_prints_help() throws Exception {
+ PrintStream originalOutStream = System.out;
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream testStream = new PrintStream(new BufferedOutputStream(bos));
+ System.setOut(testStream);
+ String[] args = {
+ "fixed",
+ "-start_time", "500",
+ "-end_time", "1000",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-query", "THIS IS AN ERROR"
+ };
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect errors on run", cli.run(args), equalTo(-1));
+ assertThat(bos.toString(), bos.toString().contains("usage: Fixed filter options"), equalTo(true));
+ } finally {
+ System.setOut(originalOutStream);
+ }
+ }
+
+ @Test
+ public void invalid_query_filter_arg_prints_help() throws Exception {
+ PrintStream originalOutStream = System.out;
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(bos));
+ System.setOut(outStream);
+ String[] args = {
+ "query",
+ "-start_time", "500",
+ "-end_time", "1000",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-ip_src_addr", "THIS IS AN ERROR"
+ };
+
+ PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
+ assertThat("Expect errors on run", cli.run(args), equalTo(-1));
+ assertThat(bos.toString(), bos.toString().contains("usage: Query filter options"), equalTo(true));
+ } finally {
+ System.setOut(originalOutStream);
+ }
+ }
+
+}