You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/31 19:20:28 UTC
[32/50] [abbrv] metron git commit: METRON-1731: PCAP - Escape colons
in output dir names (mmiklavc via mmiklavc) closes apache/metron#1155
METRON-1731: PCAP - Escape colons in output dir names (mmiklavc via mmiklavc) closes apache/metron#1155
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/73dc63e6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/73dc63e6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/73dc63e6
Branch: refs/remotes/apache/feature/METRON-1699-create-batch-profiler
Commit: 73dc63e671b55d22d251f4be1c217259f4f5dc71
Parents: 05316a4
Author: mmiklavc <mi...@gmail.com>
Authored: Fri Aug 10 12:42:47 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Fri Aug 10 12:42:47 2018 -0600
----------------------------------------------------------------------
.../apache/metron/pcap/FixedPcapFilterTest.java | 286 ------------------
.../org/apache/metron/pcap/PcapJobTest.java | 290 -------------------
.../apache/metron/pcap/QueryPcapFilterTest.java | 228 ---------------
.../pcap/filter/fixed/FixedPcapFilter.java | 14 +-
.../pcap/filter/query/QueryPcapFilter.java | 17 +-
.../metron/pcap/mr/OutputDirFormatter.java | 37 +++
.../java/org/apache/metron/pcap/mr/PcapJob.java | 5 +-
.../pcap/filter/fixed/FixedPcapFilterTest.java | 271 ++++++++++++++++-
.../pcap/filter/query/QueryPcapFilterTest.java | 207 ++++++++++++-
.../metron/pcap/mr/OutputDirFormatterTest.java | 62 ++++
.../org/apache/metron/pcap/mr/PcapJobTest.java | 290 +++++++++++++++++++
11 files changed, 877 insertions(+), 830 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
deleted file mode 100644
index 84969d3..0000000
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FixedPcapFilterTest {
- @Test
- public void testTrivialEquality() throws Exception {
- Configuration config = new Configuration();
- final Map<String, String> fields = new HashMap<String, String>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), "0");
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
- }};
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected Map<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
-
- @Test
- public void testReverseTraffic() throws Exception {
- Configuration config = new Configuration();
- final Map<String, String> fields = new HashMap<String, String>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), "0");
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
- }};
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected Map<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected Map<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
- put(Constants.Fields.SRC_PORT.getName(), 1);
- put(Constants.Fields.DST_ADDR.getName(), "src_ip");
- put(Constants.Fields.DST_PORT.getName(), 0);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected Map<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "src_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertFalse(filter.test(null));
- }
- }
-@Test
-public void testMissingDstAddr() throws Exception {
- Configuration config = new Configuration();
- final HashMap<String, String> fields = new HashMap<String, String>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), "0");
- put(Constants.Fields.DST_PORT.getName(), "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
- }};
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip1");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertFalse(filter.test(null));
- }
-}
- @Test
- public void testMissingDstPort() throws Exception {
- Configuration config = new Configuration();
- final HashMap<String, String> fields = new HashMap<String, String>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), "0");
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
- }};
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 100);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 100);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 100);
- }};
- }
- };
- filter.configure(config);
- Assert.assertFalse(filter.test(null));
- }
- }
- @Test
- public void testMissingSrcAddr() throws Exception {
- Configuration config = new Configuration();
- final HashMap<String, String> fields = new HashMap<String, String>() {{
- put(Constants.Fields.SRC_PORT.getName(), "0");
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
- }};
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
- @Test
- public void testMissingSrcPort() throws Exception {
- Configuration config = new Configuration();
- final HashMap<String, String> fields = new HashMap<String, String>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
- }};
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new FixedPcapFilter.Configurator().addToConfig(fields, config);
- {
- FixedPcapFilter filter = new FixedPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 100);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
deleted file mode 100644
index 796c8a5..0000000
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import static java.lang.Long.toUnsignedString;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Timer;
-import java.util.TimerTask;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.metron.common.utils.timestamp.TimestampConverters;
-import org.apache.metron.job.Finalizer;
-import org.apache.metron.job.JobStatus;
-import org.apache.metron.job.JobStatus.State;
-import org.apache.metron.job.Pageable;
-import org.apache.metron.job.Statusable;
-import org.apache.metron.pcap.config.FixedPcapConfig;
-import org.apache.metron.pcap.config.PcapOptions;
-import org.apache.metron.pcap.filter.PcapFilterConfigurator;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.mr.PcapJob;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-public class PcapJobTest {
-
- @Mock
- private Job mrJob;
- @Mock
- private org.apache.hadoop.mapreduce.JobStatus mrStatus;
- @Mock
- private JobID jobId;
- @Mock
- private Finalizer<Path> finalizer;
- private TestTimer timer;
- private Pageable<Path> pageableResult;
- private FixedPcapConfig config;
- private Configuration hadoopConfig;
- private FileSystem fileSystem;
- private String jobIdVal = "job_abc_123";
- private Path basePath;
- private Path baseOutPath;
- private long startTime;
- private long endTime;
- private int numReducers;
- private int numRecordsPerFile;
- private Path finalOutputPath;
- private Map<String, String> fixedFields;
- private PcapJob<Map<String, String>> testJob;
-
- @Before
- public void setup() throws IOException {
- MockitoAnnotations.initMocks(this);
- basePath = new Path("basepath");
- baseOutPath = new Path("outpath");
- startTime = 100;
- endTime = 200;
- numReducers = 5;
- numRecordsPerFile = 5;
- fixedFields = new HashMap<>();
- fixedFields.put("ip_src_addr", "192.168.1.1");
- hadoopConfig = new Configuration();
- fileSystem = FileSystem.get(hadoopConfig);
- finalOutputPath = new Path("finaloutpath");
- when(jobId.toString()).thenReturn(jobIdVal);
- when(mrStatus.getJobID()).thenReturn(jobId);
- when(mrJob.getJobID()).thenReturn(jobId);
- pageableResult = new PcapPages();
- timer = new TestTimer();
- // handles setting the file name prefix under the hood
- config = new FixedPcapConfig(clock -> "clockprefix");
- PcapOptions.HADOOP_CONF.put(config, hadoopConfig);
- PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig));
- PcapOptions.BASE_PATH.put(config, basePath);
- PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath);
- PcapOptions.START_TIME_NS.put(config, startTime);
- PcapOptions.END_TIME_NS.put(config, endTime);
- PcapOptions.NUM_REDUCERS.put(config, numReducers);
- PcapOptions.FIELDS.put(config, fixedFields);
- PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator());
- PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile);
- PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath);
- testJob = new TestJob<>(mrJob);
- testJob.setStatusInterval(1);
- testJob.setCompleteCheckInterval(1);
- testJob.setTimer(timer);
- }
-
- private class TestJob<T> extends PcapJob<T> {
-
- private final Job mrJob;
-
- public TestJob(Job mrJob) {
- this.mrJob = mrJob;
- }
-
- @Override
- public Job createJob(Optional<String> jobName,
- Path basePath,
- Path outputPath,
- long beginNS,
- long endNS,
- int numReducers,
- T fields,
- Configuration conf,
- FileSystem fs,
- PcapFilterConfigurator<T> filterImpl) throws IOException {
- return mrJob;
- }
- }
-
- private class TestTimer extends Timer {
-
- private TimerTask task;
-
- @Override
- public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
- this.task = task;
- }
-
- public void updateJobStatus() {
- task.run();
- }
-
- }
-
- @Test
- public void partition_gives_value_in_range() throws Exception {
- long start = 1473897600000000000L;
- long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
- Configuration conf = new Configuration();
- conf.set(PcapJob.START_TS_CONF, toUnsignedString(start));
- conf.set(PcapJob.END_TS_CONF, toUnsignedString(end));
- conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
- PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
- partitioner.setConf(conf);
- Assert.assertThat("Partition not in range",
- partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10),
- equalTo(8));
- }
-
- @Test
- public void job_succeeds_synchronously() throws Exception {
- pageableResult = new PcapPages(
- Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
- when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
- when(mrJob.isComplete()).thenReturn(true);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- timer.updateJobStatus();
- Pageable<Path> results = statusable.get();
- Assert.assertThat(results.getSize(), equalTo(3));
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
- Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
- }
-
- @Test
- public void job_fails_synchronously() throws Exception {
- when(mrJob.isComplete()).thenReturn(true);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- timer.updateJobStatus();
- Pageable<Path> results = statusable.get();
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.FAILED));
- Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- Assert.assertThat(results.getSize(), equalTo(0));
- }
-
- @Test
- public void job_fails_with_killed_status_synchronously() throws Exception {
- when(mrJob.isComplete()).thenReturn(true);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- timer.updateJobStatus();
- Pageable<Path> results = statusable.get();
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.KILLED));
- Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- Assert.assertThat(results.getSize(), equalTo(0));
- }
-
- @Test
- public void job_succeeds_asynchronously() throws Exception {
- when(mrJob.isComplete()).thenReturn(true);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- timer.updateJobStatus();
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
- Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- }
-
- @Test
- public void job_reports_percent_complete() throws Exception {
- when(mrJob.isComplete()).thenReturn(false);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- when(mrJob.mapProgress()).thenReturn(0.5f);
- when(mrJob.reduceProgress()).thenReturn(0f);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- timer.updateJobStatus();
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.RUNNING));
- Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%"));
- Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
- when(mrJob.mapProgress()).thenReturn(1.0f);
- when(mrJob.reduceProgress()).thenReturn(0.5f);
- timer.updateJobStatus();
- status = statusable.getStatus();
- Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%"));
- Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
- }
-
- @Test
- public void killing_job_causes_status_to_return_KILLED_state() throws Exception {
- when(mrJob.isComplete()).thenReturn(false);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- statusable.kill();
- when(mrJob.isComplete()).thenReturn(true);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
- timer.updateJobStatus();
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.KILLED));
- }
-
- @Test
- public void handles_null_values_with_defaults() throws Exception {
- PcapOptions.START_TIME_NS.put(config, null);
- PcapOptions.END_TIME_NS.put(config, null);
- PcapOptions.NUM_REDUCERS.put(config, null);
- PcapOptions.NUM_RECORDS_PER_FILE.put(config, null);
-
- pageableResult = new PcapPages(
- Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
- when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
- when(mrJob.isComplete()).thenReturn(true);
- when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
- when(mrJob.getStatus()).thenReturn(mrStatus);
- Statusable<Path> statusable = testJob.submit(finalizer, config);
- timer.updateJobStatus();
- Pageable<Path> results = statusable.get();
- Assert.assertThat(results.getSize(), equalTo(3));
- JobStatus status = statusable.getStatus();
- Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
- Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
deleted file mode 100644
index 7e3d55c..0000000
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.filter.PcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.EnumMap;
-import java.util.HashMap;
-
-public class QueryPcapFilterTest {
-
- @Test
- public void testEmptyQueryFilter() throws Exception {
- Configuration config = new Configuration();
- String query = "";
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- PcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
-
- @Test
- public void testTrivialEquality() throws Exception {
- Configuration config = new Configuration();
- String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- PcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
-
- @Test
- public void testMissingDstAddr() throws Exception {
- Configuration config = new Configuration();
- String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_port == 1";
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertFalse(filter.test(null));
- }
- }
-
- @Test
- public void testMissingDstPort() throws Exception {
- Configuration config = new Configuration();
- String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip'";
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 100);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 100);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 100);
- }};
- }
- };
- filter.configure(config);
- Assert.assertFalse(filter.test(null));
- }
- }
-
- @Test
- public void testMissingSrcAddr() throws Exception {
- Configuration config = new Configuration();
- String query = "ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
-
- @Test
- public void testMissingSrcPort() throws Exception {
- Configuration config = new Configuration();
- String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 0);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- new QueryPcapFilter.Configurator().addToConfig(query, config);
- {
- QueryPcapFilter filter = new QueryPcapFilter() {
- @Override
- protected HashMap<String, Object> packetToFields(PacketInfo pi) {
- return new HashMap<String, Object>() {{
- put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
- put(Constants.Fields.SRC_PORT.getName(), 100);
- put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
- put(Constants.Fields.DST_PORT.getName(), 1);
- }};
- }
- };
- filter.configure(config);
- Assert.assertTrue(filter.test(null));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
index 1954f1a..314bd85 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
@@ -19,20 +19,19 @@
package org.apache.metron.pcap.filter.fixed;
import com.google.common.base.Joiner;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import javax.xml.bind.DatatypeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.metron.common.Constants;
-import org.apache.metron.stellar.dsl.MapVariableResolver;
-import org.apache.metron.stellar.dsl.VariableResolver;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.filter.PcapFilter;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.filter.PcapFilters;
import org.apache.metron.pcap.pattern.ByteArrayMatchingUtil;
-
-import javax.xml.bind.DatatypeConverter;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.VariableResolver;
public class FixedPcapFilter implements PcapFilter {
@@ -48,7 +47,8 @@ public class FixedPcapFilter implements PcapFilter {
@Override
public String queryToString(Map<String, String> fields) {
- return (fields == null ? "" : Joiner.on("_").join(fields.values()));
+ return (fields == null ? "" : Joiner.on("_").join(fields.values()).replaceAll("\\s", "_")
+ );
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
index 552a5ae..e7fff16 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
@@ -18,19 +18,18 @@
package org.apache.metron.pcap.filter.query;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.MapVariableResolver;
-import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.apache.metron.stellar.common.StellarPredicateProcessor;
-import org.apache.metron.stellar.dsl.VariableResolver;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.filter.PcapFilter;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.filter.PcapFilters;
-
-import java.util.Map;
+import org.apache.metron.stellar.common.StellarPredicateProcessor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.stellar.dsl.VariableResolver;
public class QueryPcapFilter implements PcapFilter {
public static final String QUERY_STR_CONFIG = "mql";
@@ -45,9 +44,7 @@ public class QueryPcapFilter implements PcapFilter {
@Override
public String queryToString(String fields) {
return (fields == null ? "" :
- fields.trim().replaceAll("\\s", "_")
- .replace(".", "-")
- .replace("'", "")
+ fields.trim().replaceAll("\\s", "_")
);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
new file mode 100644
index 0000000..0d464d5
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import com.google.common.base.Joiner;
+import java.util.UUID;
+
+public class OutputDirFormatter {
+
+ public String format(long beginNS, long endNS, String query) {
+ return sanitize(Joiner.on("_").join(beginNS, endNS, query, UUID.randomUUID().toString()));
+ }
+
+ private String sanitize(String path) {
+ return path
+ .replace(".", "-")
+ .replace("'", "")
+ .replace(":", "");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 10f31b4..0f5ad4d 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -35,7 +35,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.UUID;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configurable;
@@ -83,6 +82,7 @@ public class PcapJob<T> implements Statusable<Path> {
public static final String WIDTH_CONF = "width";
private static final long THREE_SECONDS = 3000;
private static final long ONE_SECOND = 1000;
+ private final OutputDirFormatter outputDirFormatter;
private volatile Job mrJob; // store a running MR job reference for async status check
private volatile JobStatus jobStatus; // overall job status, including finalization step
private Finalizer<Path> finalizer;
@@ -187,6 +187,7 @@ public class PcapJob<T> implements Statusable<Path> {
public PcapJob() {
jobStatus = new JobStatus();
finalResults = new PcapPages();
+ outputDirFormatter = new OutputDirFormatter();
timer = new Timer();
statusInterval = THREE_SECONDS;
completeCheckInterval = ONE_SECOND;
@@ -271,7 +272,7 @@ public class PcapJob<T> implements Statusable<Path> {
FileSystem fs,
PcapFilterConfigurator<T> filterImpl)
throws IOException, ClassNotFoundException, InterruptedException {
- String outputDirName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString());
+ String outputDirName = outputDirFormatter.format(beginNS, endNS, filterImpl.queryToString(fields));
if(LOG.isDebugEnabled()) {
DateFormat format = SimpleDateFormat.getDateTimeInstance(SimpleDateFormat.LONG
, SimpleDateFormat.LONG
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
index af2afd3..b32f23f 100644
--- a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
@@ -18,14 +18,17 @@
package org.apache.metron.pcap.filter.fixed;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
import org.junit.Assert;
import org.junit.Test;
-import java.util.LinkedHashMap;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-
public class FixedPcapFilterTest {
@Test
@@ -66,4 +69,264 @@ public class FixedPcapFilterTest {
}
}
+ @Test
+ public void testTrivialEquality() throws Exception {
+ Configuration config = new Configuration();
+ final Map<String, String> fields = new HashMap<String, String>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), "0");
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected Map<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testReverseTraffic() throws Exception {
+ Configuration config = new Configuration();
+ final Map<String, String> fields = new HashMap<String, String>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), "0");
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected Map<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected Map<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 1);
+ put(Constants.Fields.DST_ADDR.getName(), "src_ip");
+ put(Constants.Fields.DST_PORT.getName(), 0);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected Map<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "src_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingDstAddr() throws Exception {
+ Configuration config = new Configuration();
+ final HashMap<String, String> fields = new HashMap<String, String>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), "0");
+ put(Constants.Fields.DST_PORT.getName(), "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip1");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingDstPort() throws Exception {
+ Configuration config = new Configuration();
+ final HashMap<String, String> fields = new HashMap<String, String>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), "0");
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 100);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingSrcAddr() throws Exception {
+ Configuration config = new Configuration();
+ final HashMap<String, String> fields = new HashMap<String, String>() {{
+ put(Constants.Fields.SRC_PORT.getName(), "0");
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingSrcPort() throws Exception {
+ Configuration config = new Configuration();
+ final HashMap<String, String> fields = new HashMap<String, String>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 100);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
index 061066e..2724e06 100644
--- a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
@@ -18,18 +18,23 @@
package org.apache.metron.pcap.filter.query;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.util.HashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.filter.PcapFilter;
import org.junit.Assert;
import org.junit.Test;
-import static org.hamcrest.CoreMatchers.equalTo;
-
public class QueryPcapFilterTest {
@Test
public void string_representation_of_query_gets_formatted() throws Exception {
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
String actual = new QueryPcapFilter.Configurator().queryToString(query);
- String expected = "ip_src_addr_==_srcIp_and_ip_src_port_==_80_and_ip_dst_addr_==_dstIp_and_ip_dst_port_==_100_and_protocol_==_protocol";
+ String expected = "ip_src_addr_==_'srcIp'_and_ip_src_port_==_'80'_and_ip_dst_addr_==_'dstIp'_and_ip_dst_port_==_'100'_and_protocol_==_'protocol'";
Assert.assertThat("string representation did not match", actual, equalTo(expected));
}
@@ -55,4 +60,200 @@ public class QueryPcapFilterTest {
}
}
+ @Test
+ public void testEmptyQueryFilter() throws Exception {
+ Configuration config = new Configuration();
+ String query = "";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ PcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testTrivialEquality() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ PcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingDstAddr() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_port == 1";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingDstPort() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip'";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 100);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingSrcAddr() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingSrcPort() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == 1";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 0);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected HashMap<String, Object> packetToFields(PacketInfo pi) {
+ return new HashMap<String, Object>() {{
+ put(Constants.Fields.SRC_ADDR.getName(), "src_ip");
+ put(Constants.Fields.SRC_PORT.getName(), 100);
+ put(Constants.Fields.DST_ADDR.getName(), "dst_ip");
+ put(Constants.Fields.DST_PORT.getName(), 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
new file mode 100644
index 0000000..ae1cda4
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.junit.Test;
+
+public class OutputDirFormatterTest {
+
+ @Test
+ public void formats_directory_name_for_query_filter_types() throws Exception {
+ long beginNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+ long endNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+ String query = "ip_dst_addr == '207.28.210.1' and protocol == 'PROTOCOL: ICMP(1)";
+ String queryFilterString = new QueryPcapFilter.Configurator().queryToString(query);
+ OutputDirFormatter formatter = new OutputDirFormatter();
+ String actual = formatter.format(beginNS, endNS, queryFilterString);
+ assertThat("Formatted directory names did not match.", actual, containsString("_ip_dst_addr_==_207-28-210-1_and_protocol_==_PROTOCOL_ICMP(1)_"));
+ // no URI exception should be thrown with dir name
+ new Path(actual);
+ }
+
+ @Test
+ public void formats_directory_name_for_fixed_filter_types() throws Exception {
+ long beginNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+ long endNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis());
+ Map<String, String> fields = new HashMap<>();
+ fields.put("ip_src_address", "207.28.210.1");
+ fields.put("protocol", "PROTOCOL: ICMP(1)");
+ String fixedFilterString = new FixedPcapFilter.Configurator().queryToString(fields);
+ OutputDirFormatter formatter = new OutputDirFormatter();
+ String actual = formatter.format(beginNS, endNS, fixedFilterString);
+ assertThat("Formatted directory names did not match.", actual, containsString("PROTOCOL_ICMP(1)_207-28-210-1"));
+ // no URI exception should be thrown with dir name
+ new Path(actual);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
new file mode 100644
index 0000000..0f555d0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import static java.lang.Long.toUnsignedString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class PcapJobTest {
+
+ @Mock
+ private Job mrJob;
+ @Mock
+ private org.apache.hadoop.mapreduce.JobStatus mrStatus;
+ @Mock
+ private JobID jobId;
+ @Mock
+ private Finalizer<Path> finalizer;
+ private TestTimer timer;
+ private Pageable<Path> pageableResult;
+ private FixedPcapConfig config;
+ private Configuration hadoopConfig;
+ private FileSystem fileSystem;
+ private String jobIdVal = "job_abc_123";
+ private Path basePath;
+ private Path baseOutPath;
+ private long startTime;
+ private long endTime;
+ private int numReducers;
+ private int numRecordsPerFile;
+ private Path finalOutputPath;
+ private Map<String, String> fixedFields;
+ private PcapJob<Map<String, String>> testJob;
+
+ @Before
+ public void setup() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ basePath = new Path("basepath");
+ baseOutPath = new Path("outpath");
+ startTime = 100;
+ endTime = 200;
+ numReducers = 5;
+ numRecordsPerFile = 5;
+ fixedFields = new HashMap<>();
+ fixedFields.put("ip_src_addr", "192.168.1.1");
+ hadoopConfig = new Configuration();
+ fileSystem = FileSystem.get(hadoopConfig);
+ finalOutputPath = new Path("finaloutpath");
+ when(jobId.toString()).thenReturn(jobIdVal);
+ when(mrStatus.getJobID()).thenReturn(jobId);
+ when(mrJob.getJobID()).thenReturn(jobId);
+ pageableResult = new PcapPages();
+ timer = new TestTimer();
+ // handles setting the file name prefix under the hood
+ config = new FixedPcapConfig(clock -> "clockprefix");
+ PcapOptions.HADOOP_CONF.put(config, hadoopConfig);
+ PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig));
+ PcapOptions.BASE_PATH.put(config, basePath);
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath);
+ PcapOptions.START_TIME_NS.put(config, startTime);
+ PcapOptions.END_TIME_NS.put(config, endTime);
+ PcapOptions.NUM_REDUCERS.put(config, numReducers);
+ PcapOptions.FIELDS.put(config, fixedFields);
+ PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator());
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile);
+ PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath);
+ testJob = new TestJob<>(mrJob);
+ testJob.setStatusInterval(1);
+ testJob.setCompleteCheckInterval(1);
+ testJob.setTimer(timer);
+ }
+
+ private class TestJob<T> extends PcapJob<T> {
+
+ private final Job mrJob;
+
+ public TestJob(Job mrJob) {
+ this.mrJob = mrJob;
+ }
+
+ @Override
+ public Job createJob(Optional<String> jobName,
+ Path basePath,
+ Path outputPath,
+ long beginNS,
+ long endNS,
+ int numReducers,
+ T fields,
+ Configuration conf,
+ FileSystem fs,
+ PcapFilterConfigurator<T> filterImpl) throws IOException {
+ return mrJob;
+ }
+ }
+
+ private class TestTimer extends Timer {
+
+ private TimerTask task;
+
+ @Override
+ public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
+ this.task = task;
+ }
+
+ public void updateJobStatus() {
+ task.run();
+ }
+
+ }
+
+ @Test
+ public void partition_gives_value_in_range() throws Exception {
+ long start = 1473897600000000000L;
+ long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
+ Configuration conf = new Configuration();
+ conf.set(PcapJob.START_TS_CONF, toUnsignedString(start));
+ conf.set(PcapJob.END_TS_CONF, toUnsignedString(end));
+ conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
+ PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
+ partitioner.setConf(conf);
+ Assert.assertThat("Partition not in range",
+ partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10),
+ equalTo(8));
+ }
+
+ @Test
+ public void job_succeeds_synchronously() throws Exception {
+ pageableResult = new PcapPages(
+ Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
+ when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ Pageable<Path> results = statusable.get();
+ Assert.assertThat(results.getSize(), equalTo(3));
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+ Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+ Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
+ }
+
+ @Test
+ public void job_fails_synchronously() throws Exception {
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ Pageable<Path> results = statusable.get();
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.FAILED));
+ Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+ Assert.assertThat(results.getSize(), equalTo(0));
+ }
+
+ @Test
+ public void job_fails_with_killed_status_synchronously() throws Exception {
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ Pageable<Path> results = statusable.get();
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.KILLED));
+ Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+ Assert.assertThat(results.getSize(), equalTo(0));
+ }
+
+ @Test
+ public void job_succeeds_asynchronously() throws Exception {
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+ Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+ }
+
+ @Test
+ public void job_reports_percent_complete() throws Exception {
+ when(mrJob.isComplete()).thenReturn(false);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ when(mrJob.mapProgress()).thenReturn(0.5f);
+ when(mrJob.reduceProgress()).thenReturn(0f);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.RUNNING));
+ Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%"));
+ Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
+ when(mrJob.mapProgress()).thenReturn(1.0f);
+ when(mrJob.reduceProgress()).thenReturn(0.5f);
+ timer.updateJobStatus();
+ status = statusable.getStatus();
+ Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%"));
+ Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
+ }
+
+ @Test
+ public void killing_job_causes_status_to_return_KILLED_state() throws Exception {
+ when(mrJob.isComplete()).thenReturn(false);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ statusable.kill();
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
+ timer.updateJobStatus();
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.KILLED));
+ }
+
+ @Test
+ public void handles_null_values_with_defaults() throws Exception {
+ PcapOptions.START_TIME_NS.put(config, null);
+ PcapOptions.END_TIME_NS.put(config, null);
+ PcapOptions.NUM_REDUCERS.put(config, null);
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, null);
+
+ pageableResult = new PcapPages(
+ Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
+ when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ Pageable<Path> results = statusable.get();
+ Assert.assertThat(results.getSize(), equalTo(3));
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+ Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+ Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
+ }
+
+}