You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/18 21:48:42 UTC
[2/5] metron git commit: METRON-1614: Create job status abstraction
(mmiklavc via mmiklavc) closes apache/metron#1108
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index c7292ab..9ea7912 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import kafka.consumer.ConsumerIterator;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.kafka.clients.producer.Producer;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.HDFSUtils;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.Processor;
@@ -55,12 +57,18 @@ import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.MRComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.integration.utils.KafkaUtil;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.PcapMerger;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies;
import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.pcap.query.PcapCli;
import org.apache.metron.spout.pcap.Endianness;
import org.apache.metron.spout.pcap.deserializer.Deserializers;
import org.apache.metron.test.utils.UnitTestHelper;
@@ -73,13 +81,22 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
final static String KAFKA_TOPIC = "pcap";
private static String BASE_DIR = "pcap";
private static String DATA_DIR = BASE_DIR + "/data_dir";
- private static String QUERY_DIR = BASE_DIR + "/query";
+ private static String INTERIM_RESULT = BASE_DIR + "/query";
+ private static String OUTPUT_DIR = BASE_DIR + "/output";
+ private static final int MAX_RETRIES = 30;
+ private static final int SLEEP_MS = 500;
private String topologiesDir = "src/main/flux";
private String targetDir = "target";
- private static void clearOutDir(File outDir) {
- for(File f : outDir.listFiles()) {
- f.delete();
+ private static void clearOutDirs(File... dirs) throws IOException {
+ for(File dir: dirs) {
+ for(File f : dir.listFiles()) {
+ if (f.isDirectory()) {
+ FileUtils.deleteDirectory(f);
+ } else {
+ f.delete();
+ }
+ }
}
}
private static int numFiles(File outDir, Configuration config) {
@@ -158,10 +175,10 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
topologiesDir = UnitTestHelper.findDir("topologies");
}
targetDir = UnitTestHelper.findDir("target");
- final File outDir = getOutDir(targetDir);
- final File queryDir = getQueryDir(targetDir);
- clearOutDir(outDir);
- clearOutDir(queryDir);
+ final File inputDir = getDir(targetDir, DATA_DIR);
+ final File interimResultDir = getDir(targetDir, INTERIM_RESULT);
+ final File outputDir = getDir(targetDir, OUTPUT_DIR);
+ clearOutDirs(inputDir, interimResultDir, outputDir);
File baseDir = new File(new File(targetDir), BASE_DIR);
//Assert.assertEquals(0, numFiles(outDir));
@@ -175,7 +192,7 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
setProperty("topology.worker.childopts", "");
setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC);
setProperty("kafka.pcap.start", "EARLIEST");
- setProperty("kafka.pcap.out", outDir.getAbsolutePath());
+ setProperty("kafka.pcap.out", inputDir.getAbsolutePath());
setProperty("kafka.pcap.numPackets", "2");
setProperty("kafka.pcap.maxTimeMS", "200000000");
setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
@@ -219,7 +236,7 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
runner.process(new Processor<Void>() {
@Override
public ReadinessState process(ComponentRunner runner) {
- int numFiles = numFiles(outDir, mr.getConfiguration());
+ int numFiles = numFiles(inputDir, mr.getConfiguration());
int expectedNumFiles = pcapEntries.size() / 2;
if (numFiles == expectedNumFiles) {
return ReadinessState.READY;
@@ -233,160 +250,222 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
return null;
}
});
- PcapJob job = new PcapJob();
+
+ FixedPcapConfig configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY);
+ Configuration hadoopConf = new Configuration();
+ PcapOptions.JOB_NAME.put(configuration, "jobName");
+ PcapOptions.HADOOP_CONF.put(configuration, hadoopConf);
+ PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf));
+ PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath()));
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath()));
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
+ PcapOptions.NUM_REDUCERS.put(configuration, 10);
+ PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 2);
+ PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath()));
{
//Ensure that only two pcaps are returned when we look at 4 and 5
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(4, pcapEntries)
- , getTimestamp(5, pcapEntries)
- , 10
- , new HashMap<>()
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new FixedPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), 2);
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, new HashMap());
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(results.get().getSize(), 1);
}
{
// Ensure that only two pcaps are returned when we look at 4 and 5
// test with empty query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(4, pcapEntries)
- , getTimestamp(5, pcapEntries)
- , 10
- , ""
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), 2);
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(results.get().getSize(), 1);
}
{
//ensure that none get returned since that destination IP address isn't in the dataset
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(1, pcapEntries)
- , 10
- , new HashMap<String, String>() {{
- put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
- }}
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new FixedPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), 0);
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+ put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
+ }});
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(results.get().getSize(), 0);
}
{
// ensure that none get returned since that destination IP address isn't in the dataset
// test with query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(1, pcapEntries)
- , 10
- , "ip_dst_addr == '207.28.210.1'"
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), 0);
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(results.get().getSize(), 0);
}
{
//same with protocol as before with the destination addr
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(1, pcapEntries)
- , 10
- , new HashMap<String, String>() {{
- put(Constants.Fields.PROTOCOL.getName(), "foo");
- }}
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new FixedPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), 0);
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+ put(Constants.Fields.PROTOCOL.getName(), "foo");
+ }});
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(results.get().getSize(), 0);
}
{
//same with protocol as before with the destination addr
//test with query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(1, pcapEntries)
- , 10
- , "protocol == 'foo'"
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), 0);
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "protocol == 'foo'");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(results.get().getSize(), 0);
}
{
//make sure I get them all.
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , new HashMap<>()
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new FixedPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), pcapEntries.size());
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, new HashMap<>());
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(10, results.get().getSize());
}
{
//make sure I get them all.
//with query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , ""
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(Iterables.size(results), pcapEntries.size());
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "");
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(10, results.get().getSize());
}
{
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , new HashMap<String, String>() {{
- put(Constants.Fields.DST_PORT.getName(), "22");
- }}
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new FixedPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertTrue(Iterables.size(results) > 0);
- Assert.assertEquals(Iterables.size(results)
+ PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+ put(Constants.Fields.DST_PORT.getName(), "22");
+ }});
+ PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
+ PcapJob<Map<String, String>> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertTrue(results.get().getSize() > 0);
+ Assert.assertEquals(Iterables.size(bytes)
, Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
@Override
public boolean apply(@Nullable JSONObject input) {
@@ -397,74 +476,63 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
)
);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
- Assert.assertTrue(baos.toByteArray().length > 0);
- }
- {
- //test with query filter and byte array matching
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertEquals(1, Iterables.size(results));
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
+ PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
Assert.assertTrue(baos.toByteArray().length > 0);
}
{
+ //same with protocol as before with the destination addr
//test with query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , "ip_dst_port == 22"
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertTrue(Iterables.size(results) > 0);
- Assert.assertEquals(Iterables.size(results)
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(Iterables.size(bytes)
, Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
@Override
public boolean apply(@Nullable JSONObject input) {
Object prt = input.get(Constants.Fields.DST_PORT.getName());
- return prt != null && (Long) prt == 22;
+ return prt != null && prt.toString().equals("22");
}
}, withHeaders)
)
);
- assertInOrder(results);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
+ PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
Assert.assertTrue(baos.toByteArray().length > 0);
}
{
- //test with query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , "ip_dst_port > 20 and ip_dst_port < 55792"
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertTrue(Iterables.size(results) > 0);
- Assert.assertEquals(Iterables.size(results)
+ // test with query filter ip_dst_port > 20 and ip_dst_port < 55792
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(Iterables.size(bytes)
, Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
@Override
public boolean apply(@Nullable JSONObject input) {
@@ -474,63 +542,92 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
}, withHeaders)
)
);
- assertInOrder(results);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
+ PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
Assert.assertTrue(baos.toByteArray().length > 0);
}
{
- //test with query filter
- Iterable<byte[]> results =
- job.query(new Path(outDir.getAbsolutePath())
- , new Path(queryDir.getAbsolutePath())
- , getTimestamp(0, pcapEntries)
- , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
- , 10
- , "ip_dst_port > 55790"
- , new Configuration()
- , FileSystem.get(new Configuration())
- , new QueryPcapFilter.Configurator()
- );
- assertInOrder(results);
- Assert.assertTrue(Iterables.size(results) > 0);
- Assert.assertEquals(Iterables.size(results)
+ //test with query filter ip_dst_port > 55790
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790");
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(Iterables.size(bytes)
, Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
- @Override
- public boolean apply(@Nullable JSONObject input) {
- Object prt = input.get(Constants.Fields.DST_PORT.getName());
- return prt != null && (Long) prt > 55790;
- }
- }, withHeaders)
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && (Long) prt > 55790;
+ }
+ }, withHeaders)
)
);
- assertInOrder(results);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
+ PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
+ {
+ //test with query filter and byte array matching
+ PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator());
+ PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)");
+ PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries));
+ PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
+ PcapJob<String> job = new PcapJob<>();
+ Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration);
+ Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType());
+ Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState());
+ waitForJob(results);
+
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState());
+ Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+ try {
+ return HDFSUtils.readBytes(path);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ });
+ assertInOrder(bytes);
+ Assert.assertEquals(1, results.get().getSize());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
Assert.assertTrue(baos.toByteArray().length > 0);
}
+
System.out.println("Ended");
} finally {
runner.stop();
- clearOutDir(outDir);
- clearOutDir(queryDir);
+ clearOutDirs(inputDir, interimResultDir, outputDir);
}
}
- private File getOutDir(String targetDir) {
- File outDir = new File(new File(targetDir), DATA_DIR);
- if (!outDir.exists()) {
- outDir.mkdirs();
+ private void waitForJob(Statusable statusable) throws Exception {
+ for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
+ if (statusable.isDone()) {
+ return;
+ }
}
- return outDir;
+ throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds");
}
- private File getQueryDir(String targetDir) {
- File outDir = new File(new File(targetDir), QUERY_DIR);
- if (!outDir.exists()) {
- outDir.mkdirs();
+ private File getDir(String targetDir, String childDir) {
+ File directory = new File(new File(targetDir), childDir);
+ if (!directory.exists()) {
+ directory.mkdirs();
}
- return outDir;
+ return directory;
}
private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 3468a7c..763f0c6 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -19,13 +19,9 @@ package org.apache.metron.pcap.query;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedOutputStream;
@@ -35,28 +31,25 @@ import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import java.util.Map.Entry;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.Finalizer;
import org.apache.metron.pcap.PcapHelper;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapConfig.PrefixStrategy;
+import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.mr.PcapJob;
-import org.apache.metron.pcap.writer.ResultsWriter;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class PcapCliTest {
@@ -64,16 +57,15 @@ public class PcapCliTest {
@Mock
private PcapJob jobRunner;
@Mock
- private ResultsWriter resultsWriter;
- @Mock
private Clock clock;
private String execDir;
+ private PrefixStrategy prefixStrategy;
@Before
public void setup() throws IOException {
MockitoAnnotations.initMocks(this);
- doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), anyObject(), anyInt(), anyObject());
execDir = System.getProperty("user.dir");
+ prefixStrategy = clock -> "random_prefix";
}
@Test
@@ -88,13 +80,7 @@ public class PcapCliTest {
"-protocol", "6",
"-packet_filter", "`casey`"
};
- List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
- Iterator iterator = pcaps.iterator();
- SequenceFileIterable iterable = mock(SequenceFileIterable.class);
- when(iterable.iterator()).thenReturn(iterator);
- Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
- Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
HashMap<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
@@ -104,12 +90,44 @@ public class PcapCliTest {
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`");
}};
+ FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+ PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+ PcapOptions.FIELDS.put(config, query);
+ PcapOptions.NUM_REDUCERS.put(config, 10);
+ PcapOptions.START_TIME_MS.put(config, 500L);
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
+ when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix");
+ PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
- Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+ verify(jobRunner).get();
+ }
+
+ /**
+ * Check that "map" entries exist in the tested map "item". Note, will not work for complex
+ * Objects where equals() does not compare contents favorably. e.g. Configurator() did not work.
+ */
+ private <K, V> Matcher<Map<K, V>> mapContaining(Map<K, V> map) {
+ return new TypeSafeMatcher<Map<K, V>>() {
+ @Override
+ protected boolean matchesSafely(Map<K, V> item) {
+ return item.entrySet().containsAll(map.entrySet());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Should contain items: ");
+ for (Entry<K, V> entry : map.entrySet()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("key=");
+ sb.append(entry.getKey());
+ sb.append(",value=");
+ sb.append(entry.getValue());
+ description.appendText(sb.toString());
+ }
+ }
+ };
}
@Test
@@ -129,13 +147,6 @@ public class PcapCliTest {
"-num_reducers", "10",
"-records_per_file", "1000"
};
- List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
- Iterator iterator = pcaps.iterator();
- SequenceFileIterable iterable = mock(SequenceFileIterable.class);
- when(iterable.iterator()).thenReturn(iterator);
-
- Path base_path = new Path("/base/path");
- Path base_output_path = new Path("/base/output/path");
Map<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
@@ -144,12 +155,20 @@ public class PcapCliTest {
put(Constants.Fields.PROTOCOL.getName(), "6");
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
}};
-
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
-
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix");
+ FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+ PcapOptions.BASE_PATH.put(config, "/base/path");
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
+ PcapOptions.FIELDS.put(config, query);
+ PcapOptions.NUM_REDUCERS.put(config, 10);
+ PcapOptions.START_TIME_MS.put(config, 500L);
+ PcapOptions.END_TIME_MS.put(config, 1000L);
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
+
+ when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
+
+ PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
- Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+ verify(jobRunner).get();
}
@Test
@@ -170,13 +189,6 @@ public class PcapCliTest {
"-num_reducers", "10",
"-records_per_file", "1000"
};
- List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
- Iterator iterator = pcaps.iterator();
- SequenceFileIterable iterable = mock(SequenceFileIterable.class);
- when(iterable.iterator()).thenReturn(iterator);
-
- Path base_path = new Path("/base/path");
- Path base_output_path = new Path("/base/output/path");
Map<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
@@ -188,11 +200,23 @@ public class PcapCliTest {
long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
- when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix");
+ FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+ PcapOptions.BASE_PATH.put(config, "/base/path");
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
+ PcapOptions.FIELDS.put(config, query);
+ PcapOptions.NUM_REDUCERS.put(config, 10);
+ PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed bc defaults in config
+ PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config
+ PcapOptions.START_TIME_NS.put(config, startAsNanos);
+ PcapOptions.END_TIME_NS.put(config, endAsNanos);
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
+
+ when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
+
+ PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
- Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+ verify(jobRunner).get();
}
private long asNanos(String inDate, String format) throws ParseException {
@@ -212,20 +236,20 @@ public class PcapCliTest {
"-start_time", "500",
"-query", "some query string"
};
- List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
- Iterator iterator = pcaps.iterator();
- SequenceFileIterable iterable = mock(SequenceFileIterable.class);
- when(iterable.iterator()).thenReturn(iterator);
- Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
- Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
String query = "some query string";
+ FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+ PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+ PcapOptions.FIELDS.put(config, query);
+ PcapOptions.NUM_REDUCERS.put(config, 10);
+ PcapOptions.START_TIME_MS.put(config, 500L);
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
+ when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix");
+ PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
- Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+ verify(jobRunner).get();
}
@Test
@@ -240,20 +264,22 @@ public class PcapCliTest {
"-query", "some query string",
"-records_per_file", "1000"
};
- List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
- Iterator iterator = pcaps.iterator();
- SequenceFileIterable iterable = mock(SequenceFileIterable.class);
- when(iterable.iterator()).thenReturn(iterator);
- Path base_path = new Path("/base/path");
- Path base_output_path = new Path("/base/output/path");
String query = "some query string";
-
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
-
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix");
+ FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+ PcapOptions.BASE_PATH.put(config, "/base/path");
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
+ PcapOptions.FIELDS.put(config, query);
+ PcapOptions.NUM_REDUCERS.put(config, 10);
+ PcapOptions.START_TIME_MS.put(config, 500L); // needed bc defaults in config
+ PcapOptions.END_TIME_MS.put(config, 1000L); // needed bc defaults in config
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
+
+ when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
+
+ PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
assertThat("Expect no errors on run", cli.run(args), equalTo(0));
- Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+ verify(jobRunner).get();
}
// INVALID OPTION CHECKS
@@ -290,7 +316,7 @@ public class PcapCliTest {
PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos));
System.setErr(errOutStream);
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix");
+ PcapCli cli = new PcapCli(jobRunner, clock -> "random_prefix");
assertThat("Expect errors on run", cli.run(args), equalTo(-1));
assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true));
assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true));
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
deleted file mode 100644
index 997c5f7..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import java.util.List;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.job.Pageable;
-
-public class PcapFiles implements Pageable<Path> {
-
- private List<Path> files;
-
- public PcapFiles(List<Path> files) {
- this.files = files;
- }
-
- @Override
- public Iterable<Path> asIterable() {
- return files;
- }
-
- @Override
- public Path getPage(int num) {
- return files.get(num);
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
new file mode 100644
index 0000000..c98e681
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Pageable;
+
+public class PcapPages implements Pageable<Path> {
+
+ private final List<Path> files;
+
+ /**
+ * Copy constructor.
+ */
+ public PcapPages(Pageable<Path> pages) {
+ this.files = new ArrayList<>();
+ for (Path path : pages) {
+ files.add(new Path(path.toString()));
+ }
+ }
+
+ /**
+ * Defaults with empty list.
+ */
+ public PcapPages() {
+ this.files = new ArrayList<>();
+ }
+
+ public PcapPages(List<Path> paths) {
+ files = new ArrayList<>(paths);
+ }
+
+ @Override
+ public Path getPage(int num) {
+ return files.get(num);
+ }
+
+ @Override
+ public int getSize() {
+ return files.size();
+ }
+
+ @Override
+ public Iterator<Path> iterator() {
+ return new PcapIterator(files.iterator());
+ }
+
+ private class PcapIterator implements Iterator<Path> {
+
+ private Iterator<Path> delegateIt;
+
+ public PcapIterator(Iterator<Path> iterator) {
+ this.delegateIt = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegateIt.hasNext();
+ }
+
+ @Override
+ public Path next() {
+ return delegateIt.next();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
new file mode 100644
index 0000000..c40407b
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.config;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class FixedPcapConfig extends PcapConfig {
+
+ public FixedPcapConfig(PrefixStrategy prefixStrategy) {
+ super(prefixStrategy);
+ setFixedFields(new LinkedHashMap<>());
+ }
+
+ public Map<String, String> getFixedFields() {
+ return PcapOptions.FIELDS.get(this, Map.class);
+ }
+
+ public void setFixedFields(Map<String, String> fixedFields) {
+ PcapOptions.FIELDS.put(this, fixedFields);
+ }
+
+ public void putFixedField(String key, String value) {
+ Map<String, String> fixedFields = PcapOptions.FIELDS.get(this, Map.class);
+ String trimmedVal = value != null ? value.trim() : null;
+ if (!isNullOrEmpty(trimmedVal)) {
+ fixedFields.put(key, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
new file mode 100644
index 0000000..26509be
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.config;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.configuration.ConfigOption;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.function.Function;
+
+public class PcapConfig extends AbstractMapDecorator<String, Object>{
+ public interface PrefixStrategy extends Function<Clock, String>{}
+
+ private boolean showHelp;
+ private DateFormat dateFormat;
+
+ public PcapConfig() {
+ super(new HashMap<>());
+ }
+
+ public PcapConfig(PrefixStrategy prefixStrategy) {
+ this();
+ setShowHelp(false);
+ setBasePath("");
+ setBaseInterimResultPath("");
+ setStartTimeMs(-1L);
+ setEndTimeMs(-1L);
+ setNumReducers(0);
+ setFinalFilenamePrefix(prefixStrategy.apply(new Clock()));
+ }
+
+ public Object getOption(ConfigOption option) {
+ Object o = get(option.getKey());
+ return option.transform().apply(option.getKey(), o);
+ }
+
+ public String getFinalFilenamePrefix() {
+ return PcapOptions.FINAL_FILENAME_PREFIX.get(this, String.class);
+ }
+
+ public void setFinalFilenamePrefix(String prefix) {
+ PcapOptions.FINAL_FILENAME_PREFIX.put(this, prefix);
+ }
+
+ public int getNumReducers() {
+ return PcapOptions.NUM_REDUCERS.get(this, Integer.class);
+ }
+
+ public boolean showHelp() {
+ return showHelp;
+ }
+
+ public void setShowHelp(boolean showHelp) {
+ this.showHelp = showHelp;
+ }
+
+ public String getBasePath() {
+ return PcapOptions.BASE_PATH.get(this, String.class);
+ }
+
+ public String getBaseInterimResultPath() {
+ return PcapOptions.BASE_INTERIM_RESULT_PATH.get(this, String.class);
+ }
+
+ public long getStartTimeMs() {
+ return PcapOptions.START_TIME_MS.get(this, Long.class);
+ }
+
+ public long getEndTimeMs() {
+ return PcapOptions.END_TIME_MS.get(this, Long.class);
+ }
+
+ public void setBasePath(String basePath) {
+ PcapOptions.BASE_PATH.put(this, basePath);
+ }
+
+ public void setBaseInterimResultPath(String baseOutputPath) {
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseOutputPath);
+ }
+
+ public void setStartTimeMs(long startTime) {
+ PcapOptions.START_TIME_MS.put(this, startTime);
+ }
+
+ public void setEndTimeMs(long endTime) {
+ PcapOptions.END_TIME_MS.put(this, endTime);
+ }
+
+ public boolean isNullOrEmpty(String val) {
+ return StringUtils.isEmpty(val);
+ }
+
+ public void setDateFormat(String dateFormat) {
+ this.dateFormat = new SimpleDateFormat(dateFormat);
+ }
+
+ public DateFormat getDateFormat() {
+ return dateFormat;
+ }
+
+ public void setNumReducers(int numReducers) {
+ PcapOptions.NUM_REDUCERS.put(this, numReducers);
+ }
+
+ public int getNumRecordsPerFile() {
+ return PcapOptions.NUM_RECORDS_PER_FILE.get(this, Integer.class);
+ }
+
+ public void setNumRecordsPerFile(int numRecordsPerFile) {
+ PcapOptions.NUM_RECORDS_PER_FILE.put(this, numRecordsPerFile);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
new file mode 100644
index 0000000..09effd4
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.config;
+
+import java.util.function.BiFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum PcapOptions implements ConfigOption {
+ JOB_NAME("jobName"),
+ FINAL_FILENAME_PREFIX("finalFilenamePrefix"),
+ BASE_PATH("basePath", (s, o) -> o == null ? null : new Path(o.toString())),
+ INTERIM_RESULT_PATH("interimResultPath", (s, o) -> o == null ? null : new Path(o.toString())),
+ BASE_INTERIM_RESULT_PATH("baseInterimResultPath", (s, o) -> o == null ? null : new Path(o.toString())),
+ FINAL_OUTPUT_PATH("finalOutputPath", (s, o) -> o == null ? null : new Path(o.toString())),
+ NUM_REDUCERS("numReducers"),
+ START_TIME_MS("startTimeMs"),
+ END_TIME_MS("endTimeMs"),
+ START_TIME_NS("startTimeNs"),
+ END_TIME_NS("endTimeNs"),
+ NUM_RECORDS_PER_FILE("numRecordsPerFile"),
+ FIELDS("fields"),
+ FILTER_IMPL("filterImpl"),
+ HADOOP_CONF("hadoopConf"),
+ FILESYSTEM("fileSystem");
+
+ public static final BiFunction<String, Object, Path> STRING_TO_PATH =
+ (s, o) -> o == null ? null : new Path(o.toString());
+ private String key;
+ private BiFunction<String, Object, Object> transform = (s, o) -> o;
+
+ PcapOptions(String key) {
+ this.key = key;
+ }
+
+ PcapOptions(String key, BiFunction<String, Object, Object> transform) {
+ this.key = key;
+ this.transform = transform;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public BiFunction<String, Object, Object> transform() {
+ return transform;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
new file mode 100644
index 0000000..ef32839
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.config;
+
+public class QueryPcapConfig extends PcapConfig {
+
+ public QueryPcapConfig(PrefixStrategy prefixStrategy) {
+ super(prefixStrategy);
+ }
+
+ public String getQuery() {
+ return PcapOptions.FIELDS.get(this, String.class);
+ }
+
+ public void setQuery(String query) {
+ PcapOptions.FIELDS.put(this, query);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
new file mode 100644
index 0000000..e032158
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.pcap.config.PcapOptions;
+
+/**
+ * Write to local FS.
+ */
+public class PcapCliFinalizer extends PcapFinalizer {
+
+ /**
+ * Format will have the format <output-path>/pcap-data-<filename-prefix>+<partition-num>.pcap
+ * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig
+ * as a formatted timestamp + uuid. A final sample format will look as follows:
+ * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
+ */
+ private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap";
+
+ @Override
+ protected String getOutputFileName(Map<String, Object> config, int partition) {
+ Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class);
+ String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class);
+ return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
new file mode 100644
index 0000000..d5ac675
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.writer.PcapResultsWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Takes Pcap results from a specified path - for PCAP, it is assumed that these results are SequenceFileIterables.
+ * The results are then processed by partitioning the results based on a num records per file option
+ * into a final output file with a PCAP header for each partition, and written to a final output location.
+ * The MapReduce results are cleaned up after successfully writing out the final results.
+ */
+public abstract class PcapFinalizer implements Finalizer<Path> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private PcapResultsWriter resultsWriter;
+
+ protected PcapFinalizer() {
+ this.resultsWriter = new PcapResultsWriter();
+ }
+
+ protected PcapResultsWriter getResultsWriter() {
+ return resultsWriter;
+ }
+
+ @Override
+ public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException {
+ Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class);
+ int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, Integer.class);
+ Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH
+ .get(config, PcapOptions.STRING_TO_PATH, Path.class);
+ FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class);
+
+ SequenceFileIterable interimResults = null;
+ try {
+ interimResults = readInterimResults(interimResultPath, hadoopConfig, fs);
+ } catch (IOException e) {
+ throw new JobException("Unable to read interim job results while finalizing", e);
+ }
+ List<Path> outFiles = new ArrayList<>();
+ try {
+ Iterable<List<byte[]>> partitions = Iterables.partition(interimResults, recPerFile);
+ int part = 1;
+ if (partitions.iterator().hasNext()) {
+ for (List<byte[]> data : partitions) {
+ String outFileName = getOutputFileName(config, part++);
+ if (data.size() > 0) {
+ getResultsWriter().write(hadoopConfig, data, outFileName);
+ outFiles.add(new Path(outFileName));
+ }
+ }
+ } else {
+ LOG.info("No results returned.");
+ }
+ } catch (IOException e) {
+ throw new JobException("Failed to finalize results", e);
+ } finally {
+ try {
+ interimResults.cleanup();
+ } catch (IOException e) {
+ LOG.warn("Unable to cleanup files in HDFS", e);
+ }
+ }
+ return new PcapPages(outFiles);
+ }
+
+ protected abstract String getOutputFileName(Map<String, Object> config, int partition);
+
+ /**
+ * Returns a lazily-read Iterable over a set of sequence files.
+ */
+ protected SequenceFileIterable readInterimResults(Path interimResultPath, Configuration config,
+ FileSystem fs) throws IOException {
+ List<Path> files = new ArrayList<>();
+ for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(interimResultPath, false);
+ it.hasNext(); ) {
+ Path p = it.next().getPath();
+ if (p.getName().equals("_SUCCESS")) {
+ fs.delete(p, false);
+ continue;
+ }
+ files.add(p);
+ }
+ if (files.size() == 0) {
+ LOG.info("No files to process with specified date range.");
+ } else {
+ LOG.debug("Interim results path={}", interimResultPath);
+ Collections.sort(files, (o1, o2) -> o1.getName().compareTo(o2.getName()));
+ }
+ return new SequenceFileIterable(files, config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
new file mode 100644
index 0000000..927d602
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.Pageable;
+
+/**
+ * PcapJob runs a MapReduce job that outputs Sequence Files to HDFS. This Strategy/Factory class
+ * provides options for doing final processing on this raw MapReduce output for the CLI and REST
+ * API's.
+ */
+public enum PcapFinalizerStrategies implements Finalizer<Path> {
+ CLI(new PcapCliFinalizer()),
+ REST(new PcapRestFinalizer());
+
+ private Finalizer<Path> finalizer;
+
+ PcapFinalizerStrategies(Finalizer<Path> finalizer) {
+ this.finalizer = finalizer;
+ }
+
+ @Override
+ public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException {
+ return finalizer.finalizeJob(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
new file mode 100644
index 0000000..059bba2
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.pcap.config.PcapOptions;
+
+/**
+ * Write to HDFS.
+ */
+public class PcapRestFinalizer extends PcapFinalizer {
+
+ /**
+ * Format will have the format <output-path>/page-<page-num>.pcap
+ * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig
+ * as a formatted timestamp + uuid. A final sample format will look as follows:
+ * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
+ */
+ private static final String PCAP_CLI_FILENAME_FORMAT = "%s/page-%s.pcap";
+
+ @Override
+ protected String getOutputFileName(Map<String, Object> config, int partition) {
+ Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.getTransformed(config, Path.class);
+ return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, partition);
+ }
+
+}