You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/08/07 21:03:08 UTC
metron git commit: METRON-1726: Refactor PcapTopologyIntegrationTest
(mmiklavc via mmiklavc) closes apache/metron#1140
Repository: metron
Updated Branches:
refs/heads/feature/METRON-1554-pcap-query-panel 3e7785920 -> 7967f358c
METRON-1726: Refactor PcapTopologyIntegrationTest (mmiklavc via mmiklavc) closes apache/metron#1140
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7967f358
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7967f358
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7967f358
Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 7967f358c6c4b8437935c43e54179126e48e248f
Parents: 3e77859
Author: mmiklavc <mi...@gmail.com>
Authored: Tue Aug 7 15:02:20 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Tue Aug 7 15:02:20 2018 -0600
----------------------------------------------------------------------
.../org/apache/metron/pcap/query/PcapCli.java | 2 +
.../PcapTopologyIntegrationTest.java | 892 ++++++++++---------
.../metron/pcap/finalizer/PcapCliFinalizer.java | 5 +-
3 files changed, 473 insertions(+), 426 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index 0b06b0c..c23f037 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -87,6 +87,7 @@ public class PcapCli {
try {
config = fixedParser.parse(otherArgs);
commonConfig = config;
+ PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path(execDir));
} catch (ParseException | java.text.ParseException e) {
System.err.println(e.getMessage());
System.err.flush();
@@ -112,6 +113,7 @@ public class PcapCli {
try {
config = queryParser.parse(otherArgs);
commonConfig = config;
+ PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path(execDir));
} catch (ParseException | java.text.ParseException e) {
System.err.println(e.getMessage());
queryParser.printHelp();
http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 108fd2b..c30267d 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -58,6 +58,7 @@ import org.apache.metron.integration.components.MRComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.integration.utils.KafkaUtil;
import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
import org.apache.metron.job.Statusable;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
@@ -73,7 +74,10 @@ import org.apache.metron.spout.pcap.Endianness;
import org.apache.metron.spout.pcap.deserializer.Deserializers;
import org.apache.metron.test.utils.UnitTestHelper;
import org.json.simple.JSONObject;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -85,12 +89,19 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
private static String OUTPUT_DIR = BASE_DIR + "/output";
private static final int MAX_RETRIES = 30;
private static final int SLEEP_MS = 500;
- private String topologiesDir = "src/main/flux";
- private String targetDir = "target";
+ private static String topologiesDir = "src/main/flux";
+ private static String targetDir = "target";
+ private static ComponentRunner runner;
+ private static File inputDir;
+ private static File interimResultDir;
+ private static File outputDir;
+ private static List<Map.Entry<byte[], byte[]>> pcapEntries;
+ private static boolean withHeaders;
+ private FixedPcapConfig configuration;
private static void clearOutDirs(File... dirs) throws IOException {
- for(File dir: dirs) {
- for(File f : dir.listFiles()) {
+ for (File dir : dirs) {
+ for (File f : dir.listFiles()) {
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
} else {
@@ -99,8 +110,8 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
}
}
}
- private static int numFiles(File outDir, Configuration config) {
+ private static int numFiles(File outDir, Configuration config) {
return outDir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
@@ -109,11 +120,12 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
}).length;
}
- // This will eventually be completely deprecated. As it takes a significant amount of testing, the test is being disabled.
+ // This will eventually be completely deprecated.
+ // As it takes a significant amount of testing, the test is being disabled.
@Ignore
@Test
public void testTimestampInPacket() throws Exception {
- testTopology(new Function<Properties, Void>() {
+ setupTopology(new Function<Properties, Void>() {
@Nullable
@Override
public Void apply(@Nullable Properties input) {
@@ -129,9 +141,14 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
);
}
- @Test
- public void testTimestampInKey() throws Exception {
- testTopology(new Function<Properties, Void>() {
+ /**
+ * Sets up component infrastructure once for all tests.
+ */
+ @BeforeClass
+ public static void setupAll() throws Exception {
+ System.out.println("Setting up test components");
+ withHeaders = false;
+ setupTopology(new Function<Properties, Void>() {
@Nullable
@Override
public Void apply(@Nullable Properties input) {
@@ -154,7 +171,30 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
System.out.println("Wrote " + pcapEntries.size() + " to kafka");
}
}
- }, false);
+ }, withHeaders);
+ System.out.println("Done with setup.");
+ }
+
+ private static File getDir(String targetDir, String childDir) {
+ File directory = new File(new File(targetDir), childDir);
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+ return directory;
+ }
+
+ /**
+ * Cleans up component infrastructure after all tests finish running.
+ */
+ @AfterClass
+ public static void teardownAll() throws Exception {
+ System.out.println("Tearing down test infrastructure");
+ System.out.println("Stopping runner");
+ runner.stop();
+ System.out.println("Done stopping runner");
+ System.out.println("Clearing output directories");
+ clearOutDirs(inputDir, interimResultDir, outputDir);
+ System.out.println("Finished");
}
private static long getTimestamp(int offset, List<Map.Entry<byte[], byte[]>> entries) {
@@ -165,27 +205,27 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
public void send(KafkaComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> entries) throws Exception;
}
- public void testTopology(Function<Properties, Void> updatePropertiesCallback
+ public static void setupTopology(Function<Properties, Void> updatePropertiesCallback
,SendEntries sendPcapEntriesCallback
,boolean withHeaders
)
- throws Exception
- {
+ throws Exception {
if (!new File(topologiesDir).exists()) {
topologiesDir = UnitTestHelper.findDir("topologies");
}
targetDir = UnitTestHelper.findDir("target");
- final File inputDir = getDir(targetDir, DATA_DIR);
- final File interimResultDir = getDir(targetDir, INTERIM_RESULT);
- final File outputDir = getDir(targetDir, OUTPUT_DIR);
+ inputDir = getDir(targetDir, DATA_DIR);
+ interimResultDir = getDir(targetDir, INTERIM_RESULT);
+ outputDir = getDir(targetDir, OUTPUT_DIR);
clearOutDirs(inputDir, interimResultDir, outputDir);
File baseDir = new File(new File(targetDir), BASE_DIR);
//Assert.assertEquals(0, numFiles(outDir));
Assert.assertNotNull(topologiesDir);
Assert.assertNotNull(targetDir);
- Path pcapFile = new Path("../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput");
- final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
+ Path pcapFile = new Path(
+ "../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput");
+ pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
Assert.assertTrue(Iterables.size(pcapEntries) > 0);
final Properties topologyProperties = new Properties() {{
setProperty("topology.workers", "1");
@@ -206,415 +246,428 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
- final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, Collections.singletonList(
+ final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties,
+ Collections.singletonList(
new KafkaComponent.Topic(KAFKA_TOPIC, 1)));
-
final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath());
FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
- .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml"))
- .withTopologyName("pcap")
- .withTopologyProperties(topologyProperties)
- .build();
+ .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml"))
+ .withTopologyName("pcap")
+ .withTopologyProperties(topologyProperties)
+ .build();
//UnitTestHelper.verboseLogging();
- ComponentRunner runner = new ComponentRunner.Builder()
- .withComponent("mr", mr)
- .withComponent("zk",zkServerComponent)
- .withComponent("kafka", kafkaComponent)
- .withComponent("storm", fluxComponent)
- .withMaxTimeMS(-1)
- .withMillisecondsBetweenAttempts(2000)
- .withNumRetries(10)
- .withCustomShutdownOrder(new String[]{"storm","kafka","zk","mr"})
- .build();
- try {
- runner.start();
-
- fluxComponent.submitTopology();
- sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
- runner.process(new Processor<Void>() {
- @Override
- public ReadinessState process(ComponentRunner runner) {
- int numFiles = numFiles(inputDir, mr.getConfiguration());
- int expectedNumFiles = pcapEntries.size() / 2;
- if (numFiles == expectedNumFiles) {
- return ReadinessState.READY;
- } else {
- return ReadinessState.NOT_READY;
- }
- }
-
- @Override
- public ProcessorResult<Void> getResult() {
- return null;
+ runner = new ComponentRunner.Builder()
+ .withComponent("mr", mr)
+ .withComponent("zk", zkServerComponent)
+ .withComponent("kafka", kafkaComponent)
+ .withComponent("storm", fluxComponent)
+ .withMaxTimeMS(-1)
+ .withMillisecondsBetweenAttempts(2000)
+ .withNumRetries(10)
+ .withCustomShutdownOrder(new String[]{"storm", "kafka", "zk", "mr"})
+ .build();
+ runner.start();
+
+ fluxComponent.submitTopology();
+ sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
+ runner.process(new Processor<Void>() {
+ @Override
+ public ReadinessState process(ComponentRunner runner) {
+ int numFiles = numFiles(inputDir, mr.getConfiguration());
+ int expectedNumFiles = pcapEntries.size() / 2;
+ if (numFiles == expectedNumFiles) {
+ return ReadinessState.READY;
+ } else {
+ return ReadinessState.NOT_READY;
}
- });
-
- FixedPcapConfig configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY);
- Configuration hadoopConf = new Configuration();
- PcapOptions.JOB_NAME.put(configuration, "jobName");
- PcapOptions.HADOOP_CONF.put(configuration, hadoopConf);
- PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf));
- PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath()));
- PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath()));
- PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries));
- PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
- PcapOptions.NUM_REDUCERS.put(configuration, 10);
- PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 2);
- PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath()));
- {
- //Ensure that only two pcaps are returned when we look at 4 and 5
- PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, new HashMap());
- PcapJob<Map<String, String>> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(results.get().getSize(), 1);
- }
- {
- // Ensure that only two pcaps are returned when we look at 4 and 5
- // test with empty query filter
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "");
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(results.get().getSize(), 1);
- }
- {
- //ensure that none get returned since date range has no results
- PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, new HashMap<>());
- PcapOptions.START_TIME_NS.put(configuration, 0);
- PcapOptions.END_TIME_NS.put(configuration, 1);
- PcapJob<Map<String, String>> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0);
- Assert.assertEquals("No results in specified date range.",
- results.getStatus().getDescription());
- Assert.assertEquals(results.get().getSize(), 0);
- }
- {
- //ensure that none get returned since that destination IP address isn't in the dataset
- PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
- put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
- }});
- PcapJob<Map<String, String>> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(results.get().getSize(), 0);
- }
- {
- // ensure that none get returned since that destination IP address isn't in the dataset
- // test with query filter
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'");
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(results.get().getSize(), 0);
}
- {
- //same with protocol as before with the destination addr
- PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
- put(Constants.Fields.PROTOCOL.getName(), "foo");
- }});
- PcapJob<Map<String, String>> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(results.get().getSize(), 0);
+
+ @Override
+ public ProcessorResult<Void> getResult() {
+ return null;
}
- {
- //same with protocol as before with the destination addr
- //test with query filter
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "protocol == 'foo'");
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(results.get().getSize(), 0);
+ });
+ }
+
+ /**
+ * This is executed before each individual test.
+ */
+ @Before
+ public void setup() throws IOException {
+ configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY);
+ Configuration hadoopConf = new Configuration();
+ PcapOptions.JOB_NAME.put(configuration, "jobName");
+ PcapOptions.HADOOP_CONF.put(configuration, hadoopConf);
+ PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf));
+ PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath()));
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath()));
+ PcapOptions.NUM_REDUCERS.put(configuration, 10);
+ PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
+ PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath()));
+ }
+
+ @Test
+ public void filters_pcaps_by_start_end_ns_with_fixed_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
+ PcapOptions.FIELDS.put(configuration, new HashMap());
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- //make sure I get them all.
- PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, new HashMap<>());
- PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
- PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
- PcapJob<Map<String, String>> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(10, results.get().getSize());
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals("Expected 2 records returned.", 2, resultPages.getSize());
+ Assert.assertEquals("Expected 1 record in first file.", 1,
+ PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size());
+ Assert.assertEquals("Expected 1 record in second file.", 1,
+ PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size());
+ }
+
+ @Test
+ public void filters_pcaps_by_start_end_ns_with_empty_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
+ PcapOptions.FIELDS.put(configuration, "");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- //make sure I get them all.
- //with query filter
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "");
- PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
- PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(10, results.get().getSize());
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals("Expected 2 records returned.", 2, resultPages.getSize());
+ Assert.assertEquals("Expected 1 record in first file.", 1,
+ PcapHelper.toPacketInfo(Iterables.get(bytes, 0)).size());
+ Assert.assertEquals("Expected 1 record in second file.", 1,
+ PcapHelper.toPacketInfo(Iterables.get(bytes, 1)).size());
+ }
+
+ @Test
+ public void date_range_filters_out_all_results() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, new HashMap<>());
+ PcapOptions.START_TIME_NS.put(configuration, 0);
+ PcapOptions.END_TIME_NS.put(configuration, 1);
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0);
+ Assert.assertEquals("No results in specified date range.",
+ results.getStatus().getDescription());
+ Assert.assertEquals(results.get().getSize(), 0);
+ }
+
+ @Test
+ public void ip_address_filters_out_all_results_with_fixed_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+ PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+ put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
+ }});
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Assert.assertEquals(results.get().getSize(), 0);
+ }
+
+ @Test
+ public void ip_address_filters_out_all_results_with_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+ PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Assert.assertEquals(results.get().getSize(), 0);
+ }
+
+ @Test
+ public void protocol_filters_out_all_results_with_fixed_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+ PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+ put(Constants.Fields.PROTOCOL.getName(), "foo");
+ }});
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Assert.assertEquals(results.get().getSize(), 0);
+ }
+
+ @Test
+ public void protocol_filters_out_all_results_with_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(1, pcapEntries));
+ PcapOptions.FIELDS.put(configuration, "protocol == 'foo'");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Assert.assertEquals(results.get().getSize(), 0);
+ }
+
+ @Test
+ public void fixed_filter_returns_all_results_for_full_date_range() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapOptions.FIELDS.put(configuration, new HashMap<>());
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
- put(Constants.Fields.DST_PORT.getName(), "22");
- }});
- PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
- PcapJob<Map<String, String>> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertTrue(results.get().getSize() > 0);
- Assert.assertEquals(Iterables.size(bytes)
- , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
- @Override
- public boolean apply(@Nullable JSONObject input) {
- Object prt = input.get(Constants.Fields.DST_PORT.getName());
- return prt != null && prt.toString().equals("22");
- }
- }, withHeaders)
- )
- );
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
- Assert.assertTrue(baos.toByteArray().length > 0);
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(pcapEntries.size(), resultPages.getSize());
+ }
+
+ @Test
+ public void query_filter_returns_all_results_for_full_date_range() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapOptions.FIELDS.put(configuration, "");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- //same with protocol as before with the destination addr
- //test with query filter
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22");
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(Iterables.size(bytes)
- , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
- @Override
- public boolean apply(@Nullable JSONObject input) {
- Object prt = input.get(Constants.Fields.DST_PORT.getName());
- return prt != null && prt.toString().equals("22");
- }
- }, withHeaders)
- )
- );
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
- Assert.assertTrue(baos.toByteArray().length > 0);
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(pcapEntries.size(), resultPages.getSize());
+ }
+
+ @Test
+ public void filters_results_by_dst_port_with_fixed_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+ put(Constants.Fields.DST_PORT.getName(), "22");
+ }});
+ PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- // test with query filter ip_dst_port > 20 and ip_dst_port < 55792
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792");
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
+ });
+ assertInOrder(bytes);
+ Assert.assertTrue(resultPages.getSize() > 0);
+ Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && prt.toString().equals("22");
}
- });
- assertInOrder(bytes);
- Assert.assertEquals(Iterables.size(bytes)
- , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
- @Override
- public boolean apply(@Nullable JSONObject input) {
- Object prt = input.get(Constants.Fields.DST_PORT.getName());
- return prt != null && ((Long) prt > 20 && (Long) prt < 55792);
- }
- }, withHeaders)
- )
- );
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
- Assert.assertTrue(baos.toByteArray().length > 0);
+ }, withHeaders)
+ ), resultPages.getSize()
+ );
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
+
+ @Test
+ public void filters_results_by_dst_port_with_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- //test with query filter ip_dst_port > 55790
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790");
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(Iterables.size(bytes)
- , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
- @Override
- public boolean apply(@Nullable JSONObject input) {
- Object prt = input.get(Constants.Fields.DST_PORT.getName());
- return prt != null && (Long) prt > 55790;
- }
- }, withHeaders)
- )
- );
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
- Assert.assertTrue(baos.toByteArray().length > 0);
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && prt.toString().equals("22");
+ }
+ }, withHeaders)
+ ), resultPages.getSize()
+ );
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
+
+ @Test
+ public void filters_results_by_dst_port_range_with_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
- {
- //test with query filter and byte array matching
- PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
- PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)");
- PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
- PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
- PcapJob<String> job = new PcapJob<>();
- Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
- Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
- waitForJob(results);
-
- Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
- Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
- try {
- return HDFSUtils.readBytes(path);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- });
- assertInOrder(bytes);
- Assert.assertEquals(1, results.get().getSize());
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
- Assert.assertTrue(baos.toByteArray().length > 0);
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && ((Long) prt > 20 && (Long) prt < 55792);
+ }
+ }, withHeaders)
+ ), resultPages.getSize()
+ );
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
+
+ @Test
+ public void filters_results_by_dst_port_greater_than_value_with_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Pageable<Path> resultPages = results.get();
+ Iterable<byte[]> bytes = Iterables.transform(resultPages, path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && (Long) prt > 55790;
+ }
+ }, withHeaders)
+ ), resultPages.getSize()
+ );
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, HDFSUtils.readBytes(resultPages.getPage(0)));
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
- System.out.println("Ended");
- } finally {
- runner.stop();
- clearOutDirs(inputDir, interimResultDir, outputDir);
- }
+ @Test
+ public void filters_results_by_BYTEARRAY_MATCHER_with_query_filter() throws Exception {
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)");
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS
+ .put(configuration, getTimestamp(pcapEntries.size() - 1, pcapEntries) + 1);
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(1, results.get().getSize());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
+ Assert.assertTrue(baos.toByteArray().length > 0);
}
private void waitForJob(Statusable statusable) throws Exception {
@@ -628,14 +681,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds");
}
- private File getDir(String targetDir, String childDir) {
- File directory = new File(new File(targetDir), childDir);
- if (!directory.exists()) {
- directory.mkdirs();
- }
- return directory;
- }
-
private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
SequenceFile.Reader.file(pcapFile)
@@ -655,28 +700,27 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
{
List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
- for(PacketInfo pi : info) {
+ for (PacketInfo pi : info) {
Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
//IF you are debugging and want to see the packets, uncomment the following.
//System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
}
}
- if(withHeaders) {
+ if (withHeaders) {
ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
- }
- else {
+ } else {
byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
}
}
- return Iterables.limit(ret, 2*(ret.size()/2));
+ return Iterables.limit(ret, 2 * (ret.size() / 2));
}
public static void assertInOrder(Iterable<byte[]> packets) {
long previous = 0;
- for(byte[] packet : packets) {
- for(JSONObject json : TO_JSONS.apply(packet)) {
+ for (byte[] packet : packets) {
+ for (JSONObject json : TO_JSONS.apply(packet)) {
Long current = Long.parseLong(json.get("ts_micro").toString());
Assert.assertNotNull(current);
Assert.assertTrue(Long.compareUnsigned(current, previous) >= 0);
http://git-wip-us.apache.org/repos/asf/metron/blob/7967f358/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
index c912e58..e4e9b95 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
@@ -37,7 +37,7 @@ public class PcapCliFinalizer extends PcapFinalizer {
* as a formatted timestamp + uuid. A final sample format will look as follows:
* /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
*/
- private static final String PCAP_CLI_FILENAME_FORMAT = "pcap-data-%s+%04d.pcap";
+ private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap";
@Override
protected void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig,
@@ -47,8 +47,9 @@ public class PcapCliFinalizer extends PcapFinalizer {
@Override
protected Path getOutputPath(Map<String, Object> config, int partition) {
+ Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class);
String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class);
- return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, prefix, partition));
+ return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition));
}
}