You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/19 13:04:44 UTC
incubator-metron git commit: METRON-257 Enable pcap result pagination
from the Pcap CLI (mmiklavc via cestella) closes apache/incubator-metron#256
Repository: incubator-metron
Updated Branches:
refs/heads/master 095313255 -> e0c9970b1
METRON-257 Enable pcap result pagination from the Pcap CLI (mmiklavc via cestella) closes apache/incubator-metron#256
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/e0c9970b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/e0c9970b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/e0c9970b
Branch: refs/heads/master
Commit: e0c9970b1f467135f01ee212fb2a8d7f2d61de8b
Parents: 0953132
Author: mmiklavc <mi...@gmail.com>
Authored: Mon Sep 19 09:04:32 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Sep 19 09:04:32 2016 -0400
----------------------------------------------------------------------
.../pcapservice/PcapReceiverImplRestEasy.java | 38 +++--
.../PcapReceiverImplRestEasyTest.java | 4 +-
.../common/hadoop/SequenceFileIterable.java | 139 +++++++++++++++++++
.../org/apache/metron/pcap/query/CliConfig.java | 18 ++-
.../org/apache/metron/pcap/query/CliParser.java | 24 +++-
.../org/apache/metron/pcap/query/PcapCli.java | 31 +++--
.../org/apache/metron/pcap/PcapJobTest.java | 34 ++++-
.../PcapTopologyIntegrationTest.java | 48 +++----
.../apache/metron/pcap/query/PcapCliTest.java | 139 ++++++++++++-------
.../java/org/apache/metron/pcap/mr/PcapJob.java | 43 +++---
pom.xml | 2 +-
11 files changed, 383 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 18b5dc9..5a2a0ae 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -19,11 +19,13 @@ package org.apache.metron.pcapservice;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
import org.apache.metron.pcap.filter.query.QueryPcapFilter;
@@ -120,6 +122,7 @@ public class PcapReceiverImplRestEasy {
throws IOException {
PcapsResponse response = new PcapsResponse();
+ SequenceFileIterable results = null;
try {
if (startTime < 0) {
startTime = 0L;
@@ -137,7 +140,7 @@ public class PcapReceiverImplRestEasy {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Query received: " + query);
}
- response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+ results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
@@ -146,13 +149,17 @@ public class PcapReceiverImplRestEasy {
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
, new QueryPcapFilter.Configurator()
- )
);
+ response.setPcaps(results != null ? Lists.newArrayList(results) : null);
} catch (Exception e) {
LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
e);
throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+ } finally {
+ if (null != results) {
+ results.cleanup();
+ }
}
// return http status '200 OK' along with the complete pcaps response file,
@@ -205,6 +212,7 @@ public class PcapReceiverImplRestEasy {
final boolean includeReverseTrafficF = includeReverseTraffic;
PcapsResponse response = new PcapsResponse();
+ SequenceFileIterable results = null;
try {
if(startTime < 0) {
startTime = 0L;
@@ -237,22 +245,26 @@ public class PcapReceiverImplRestEasy {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet()));
}
- response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
- , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
- , startTime
- , endTime
- , numReducers
- , query
- , CONFIGURATION.get()
- , FileSystem.get(CONFIGURATION.get())
- , new FixedPcapFilter.Configurator()
- )
- );
+ results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+ , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
+ , startTime
+ , endTime
+ , numReducers
+ , query
+ , CONFIGURATION.get()
+ , FileSystem.get(CONFIGURATION.get())
+ , new FixedPcapFilter.Configurator()
+ );
+ response.setPcaps(results != null ? Lists.newArrayList(results) : null);
} catch (Exception e) {
LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
e);
throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+ } finally {
+ if (null != results) {
+ results.cleanup();
+ }
}
// return http status '200 OK' along with the complete pcaps response file,
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index 1c1c236..dba87cf 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.mr.PcapJob;
@@ -31,7 +32,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.EnumMap;
-import java.util.List;
public class PcapReceiverImplRestEasyTest {
@@ -44,7 +44,7 @@ public class PcapReceiverImplRestEasyTest {
PcapFilterConfigurator<R> filterImpl;
@Override
- public <T> List<byte[]> query( Path basePath
+ public <T> SequenceFileIterable query(Path basePath
, Path baseOutputPath
, long beginNS
, long endNS
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java
new file mode 100644
index 0000000..a57cd35
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.hadoop;
+
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static java.lang.String.format;
+
+public class SequenceFileIterable implements Iterable<byte[]> {
+ private static final Logger LOGGER = Logger.getLogger(SequenceFileIterable.class);
+ private List<Path> files;
+ private Configuration config;
+
+ public SequenceFileIterable(List<Path> files, Configuration config) {
+ this.files = files;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<byte[]> iterator() {
+ return Iterators.concat(getIterators(files, config));
+ }
+
+ private Iterator<byte[]>[] getIterators(List<Path> files, Configuration config) {
+ return files.stream().map(f -> new SequenceFileIterator(f, config)).toArray(Iterator[]::new);
+ }
+
+ /**
+ * Cleans up all files read by this Iterable
+ *
+ * @return true if success, false if any files were not deleted
+ * @throws IOException
+ */
+ public boolean cleanup() throws IOException {
+ FileSystem fileSystem = FileSystem.get(config);
+ boolean success = true;
+ for (Path file : files) {
+ success &= fileSystem.delete(file, false);
+ }
+ return success;
+ }
+
+ private static class SequenceFileIterator implements Iterator<byte[]> {
+ private Path path;
+ private Configuration config;
+ private SequenceFile.Reader reader;
+ private LongWritable key = new LongWritable();
+ private BytesWritable value = new BytesWritable();
+ private byte[] next;
+ private boolean finished = false;
+
+ public SequenceFileIterator(Path path, Configuration config) {
+ this.path = path;
+ this.config = config;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!finished && null == reader) {
+ try {
+ reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(path));
+ LOGGER.debug("Writing file: " + path.toString());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get reader", e);
+ }
+ } else {
+ LOGGER.debug(format("finished=%s, reader=%s, next=%s", finished, reader, next));
+ }
+ try {
+ //ensure hasnext is idempotent
+ if (!finished) {
+ if (null == next && reader.next(key, value)) {
+ next = value.copyBytes();
+ } else if (null == next) {
+ close();
+ }
+ }
+ } catch (IOException e) {
+ close();
+ throw new RuntimeException("Failed to get next record", e);
+ }
+ return (null != next);
+ }
+
+ private void close() {
+ LOGGER.debug("Closing file: " + path.toString());
+ finished = true;
+ try {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (IOException e) {
+ // ah well, we tried...
+ LOGGER.warn("Error closing file", e);
+ }
+ }
+
+ @Override
+ public byte[] next() {
+ byte[] ret = null;
+ if (hasNext()) {
+ ret = next;
+ next = null; //don't want same record more than once
+ } else {
+ throw new NoSuchElementException("No more records");
+ }
+ return ret;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
index f8ab0ac..294844f 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
@@ -23,22 +23,22 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
public class CliConfig {
- public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
- public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
private boolean showHelp;
private String basePath;
private String baseOutputPath;
private long startTime;
private long endTime;
- private int numReducers = 0;
+ private int numReducers;
+ private int numRecordsPerFile;
private DateFormat dateFormat;
public CliConfig() {
showHelp = false;
- basePath = BASE_PATH_DEFAULT;
- baseOutputPath = BASE_OUTPUT_PATH_DEFAULT;
+ basePath = "";
+ baseOutputPath = "";
startTime = -1L;
endTime = -1L;
+ numReducers = 0;
}
public int getNumReducers() {
@@ -100,4 +100,12 @@ public class CliConfig {
public void setNumReducers(int numReducers) {
this.numReducers = numReducers;
}
+
+ public int getNumRecordsPerFile() {
+ return numRecordsPerFile;
+ }
+
+ public void setNumRecordsPerFile(int numRecordsPerFile) {
+ this.numRecordsPerFile = numRecordsPerFile;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index ea6f8e7..83e9fcf 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -24,6 +24,10 @@ import org.apache.commons.cli.*;
* Provides commmon required fields for the PCAP filter jobs
*/
public class CliParser {
+ public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
+ public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
+ public static final int NUM_REDUCERS_DEFAULT = 10;
+ public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
private CommandLineParser parser;
public CliParser() {
@@ -33,10 +37,11 @@ public class CliParser {
public Options buildOptions() {
Options options = new Options();
options.addOption(newOption("h", "help", false, "Display help"));
- options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
- options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
+ options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT)));
+ options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT)));
options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
- options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true));
+ options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
+ options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
return options;
@@ -61,9 +66,13 @@ public class CliParser {
}
if (commandLine.hasOption("base_path")) {
config.setBasePath(commandLine.getOptionValue("base_path"));
+ } else {
+ config.setBasePath(BASE_PATH_DEFAULT);
}
if (commandLine.hasOption("base_output_path")) {
config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
+ } else {
+ config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT);
}
if (commandLine.hasOption("start_time")) {
try {
@@ -83,7 +92,14 @@ public class CliParser {
config.setNumReducers(numReducers);
}
else {
- config.setNumReducers(10);
+ config.setNumReducers(NUM_REDUCERS_DEFAULT);
+ }
+ if (commandLine.hasOption("records_per_file")) {
+ int numRecordsPerFile = Integer.parseInt(commandLine.getOptionValue("records_per_file"));
+ config.setNumRecordsPerFile(numRecordsPerFile);
+ }
+ else {
+ config.setNumRecordsPerFile(NUM_RECORDS_PER_FILE_DEFAULT);
}
if (commandLine.hasOption("end_time")) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index d96e166..d2e6807 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -17,12 +17,14 @@
*/
package org.apache.metron.pcap.query;
+import com.google.common.collect.Iterables;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
@@ -32,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -59,7 +60,7 @@ public class PcapCli {
return -1;
}
String jobType = args[0];
- List<byte[]> results = new ArrayList<>();
+ SequenceFileIterable results = null;
String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
Configuration hadoopConf = new Configuration();
String[] otherArgs = null;
@@ -69,13 +70,16 @@ public class PcapCli {
LOGGER.error("Failed to configure hadoop with provided options: " + e.getMessage(), e);
return -1;
}
+ CliConfig commonConfig = null;
if ("fixed".equals(jobType)) {
FixedCliParser fixedParser = new FixedCliParser();
FixedCliConfig config = null;
try {
config = fixedParser.parse(otherArgs);
+ commonConfig = config;
} catch (ParseException | java.text.ParseException e) {
System.err.println(e.getMessage());
+ System.err.flush();
fixedParser.printHelp();
return -1;
}
@@ -110,6 +114,7 @@ public class PcapCli {
QueryCliConfig config = null;
try {
config = queryParser.parse(otherArgs);
+ commonConfig = config;
} catch (ParseException | java.text.ParseException e) {
System.err.println(e.getMessage());
queryParser.printHelp();
@@ -145,18 +150,28 @@ public class PcapCli {
printBasicHelp();
return -1;
}
- String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
- String outFileName = String.format("pcap-data-%s.pcap", timestamp);
try {
- if(results.size() > 0) {
- resultsWriter.write(results, outFileName);
- }
- else {
+ Iterable<List<byte[]>> partitions = Iterables.partition(results, commonConfig.getNumRecordsPerFile());
+ if (partitions.iterator().hasNext()) {
+ for (List<byte[]> data : partitions) {
+ String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
+ String outFileName = String.format("pcap-data-%s.pcap", timestamp);
+ if(data.size() > 0) {
+ resultsWriter.write(data, outFileName);
+ }
+ }
+ } else {
System.out.println("No results returned.");
}
} catch (IOException e) {
LOGGER.error("Unable to write file", e);
return -1;
+ } finally {
+ try {
+ results.cleanup();
+ } catch(IOException e) {
+ LOGGER.warn("Unable to cleanup files in HDFS", e);
+ }
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 17c9325..81725d8 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -19,8 +19,11 @@
package org.apache.metron.pcap;
import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.mr.PcapJob;
import org.junit.Assert;
@@ -30,6 +33,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static java.lang.Long.toUnsignedString;
+import static org.hamcrest.CoreMatchers.equalTo;
+
public class PcapJobTest {
@Test
@@ -48,6 +54,7 @@ public class PcapJobTest {
Assert.assertTrue(Iterables.isEmpty(paths));
}
}
+
@Test
public void test_getPaths_leftEdge() throws Exception {
PcapJob job;
@@ -63,9 +70,10 @@ public class PcapJobTest {
}
};
Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
- Assert.assertEquals(1,Iterables.size(paths));
+ Assert.assertEquals(1, Iterables.size(paths));
}
}
+
@Test
public void test_getPaths_rightEdge() throws Exception {
PcapJob job;
@@ -80,8 +88,8 @@ public class PcapJobTest {
return inputFiles;
}
};
- Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L-1L, 1461589333993573000L + 1L);
- Assert.assertEquals(2,Iterables.size(paths));
+ Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L);
+ Assert.assertEquals(2, Iterables.size(paths));
}
{
final List<Path> inputFiles = new ArrayList<Path>() {{
@@ -95,10 +103,11 @@ public class PcapJobTest {
return inputFiles;
}
};
- Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L-1L, 1461589334993573000L + 1L);
- Assert.assertEquals(2,Iterables.size(paths));
+ Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L);
+ Assert.assertEquals(2, Iterables.size(paths));
}
}
+
@Test
public void test_getPaths_bothEdges() throws Exception {
PcapJob job;
@@ -115,7 +124,20 @@ public class PcapJobTest {
}
};
Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
- Assert.assertEquals(3,Iterables.size(paths));
+ Assert.assertEquals(3, Iterables.size(paths));
}
}
+
+ @Test
+ public void partition_gives_value_in_range() throws Exception {
+ long start = 1473897600000000000L;
+ long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
+ Configuration conf = new Configuration();
+ conf.set(PcapJob.START_TS_CONF, toUnsignedString(start));
+ conf.set(PcapJob.END_TS_CONF, toUnsignedString(end));
+ conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
+ PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
+ partitioner.setConf(conf);
+ Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index d4367ea..0dd07aa 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -271,7 +271,7 @@ public class PcapTopologyIntegrationTest {
PcapJob job = new PcapJob();
{
//Ensure that only two pcaps are returned when we look at 4 and 5
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(4, pcapEntries)
@@ -283,12 +283,12 @@ public class PcapTopologyIntegrationTest {
, new FixedPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), 2);
+ Assert.assertEquals(Iterables.size(results), 2);
}
{
// Ensure that only two pcaps are returned when we look at 4 and 5
// test with empty query filter
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(4, pcapEntries)
@@ -300,11 +300,11 @@ public class PcapTopologyIntegrationTest {
, new QueryPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), 2);
+ Assert.assertEquals(Iterables.size(results), 2);
}
{
//ensure that none get returned since that destination IP address isn't in the dataset
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -318,12 +318,12 @@ public class PcapTopologyIntegrationTest {
, new FixedPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), 0);
+ Assert.assertEquals(Iterables.size(results), 0);
}
{
// ensure that none get returned since that destination IP address isn't in the dataset
// test with query filter
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -335,11 +335,11 @@ public class PcapTopologyIntegrationTest {
, new QueryPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), 0);
+ Assert.assertEquals(Iterables.size(results), 0);
}
{
//same with protocol as before with the destination addr
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -353,12 +353,12 @@ public class PcapTopologyIntegrationTest {
, new FixedPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), 0);
+ Assert.assertEquals(Iterables.size(results), 0);
}
{
//same with protocol as before with the destination addr
//test with query filter
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -370,11 +370,11 @@ public class PcapTopologyIntegrationTest {
, new QueryPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), 0);
+ Assert.assertEquals(Iterables.size(results), 0);
}
{
//make sure I get them all.
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -386,12 +386,12 @@ public class PcapTopologyIntegrationTest {
, new FixedPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), pcapEntries.size());
+ Assert.assertEquals(Iterables.size(results), pcapEntries.size());
}
{
//make sure I get them all.
//with query filter
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -403,10 +403,10 @@ public class PcapTopologyIntegrationTest {
, new QueryPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertEquals(results.size(), pcapEntries.size());
+ Assert.assertEquals(Iterables.size(results), pcapEntries.size());
}
{
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -420,8 +420,8 @@ public class PcapTopologyIntegrationTest {
, new FixedPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertTrue(results.size() > 0);
- Assert.assertEquals(results.size()
+ Assert.assertTrue(Iterables.size(results) > 0);
+ Assert.assertEquals(Iterables.size(results)
, Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
@Override
public boolean apply(@Nullable JSONObject input) {
@@ -432,12 +432,12 @@ public class PcapTopologyIntegrationTest {
)
);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, results);
+ PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
Assert.assertTrue(baos.toByteArray().length > 0);
}
{
//test with query filter
- List<byte[]> results =
+ Iterable<byte[]> results =
job.query(new Path(outDir.getAbsolutePath())
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
@@ -449,8 +449,8 @@ public class PcapTopologyIntegrationTest {
, new QueryPcapFilter.Configurator()
);
assertInOrder(results);
- Assert.assertTrue(results.size() > 0);
- Assert.assertEquals(results.size()
+ Assert.assertTrue(Iterables.size(results) > 0);
+ Assert.assertEquals(Iterables.size(results)
, Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
@Override
public boolean apply(@Nullable JSONObject input) {
@@ -462,7 +462,7 @@ public class PcapTopologyIntegrationTest {
);
assertInOrder(results);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, results);
+ PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next());
Assert.assertTrue(baos.toByteArray().length > 0);
}
System.out.println("Ended");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 4d6432e..bad22e4 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
@@ -38,14 +39,12 @@ import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.List;
+import java.util.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PcapCliTest {
@@ -71,13 +70,15 @@ public class PcapCliTest {
"-ip_dst_addr", "192.168.1.2",
"-ip_src_port", "8081",
"-ip_dst_port", "8082",
- "-protocol", "6",
- "-num_reducers", "10"
+ "-protocol", "6"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+ Iterator iterator = pcaps.iterator();
+ SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+ when(iterable.iterator()).thenReturn(iterator);
- Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
- Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+ Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
+ Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.SRC_ADDR, "192.168.1.1");
put(Constants.Fields.DST_ADDR, "192.168.1.2");
@@ -87,7 +88,7 @@ public class PcapCliTest {
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
}};
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -109,9 +110,13 @@ public class PcapCliTest {
"-ip_dst_port", "8082",
"-protocol", "6",
"-include_reverse",
- "-num_reducers", "10"
+ "-num_reducers", "10",
+ "-records_per_file", "1000"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+ Iterator iterator = pcaps.iterator();
+ SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+ when(iterable.iterator()).thenReturn(iterator);
Path base_path = new Path("/base/path");
Path base_output_path = new Path("/base/output/path");
@@ -124,7 +129,7 @@ public class PcapCliTest {
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
}};
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -147,9 +152,13 @@ public class PcapCliTest {
"-ip_dst_port", "8082",
"-protocol", "6",
"-include_reverse",
- "-num_reducers", "10"
+ "-num_reducers", "10",
+ "-records_per_file", "1000"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+ Iterator iterator = pcaps.iterator();
+ SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+ when(iterable.iterator()).thenReturn(iterator);
Path base_path = new Path("/base/path");
Path base_output_path = new Path("/base/output/path");
@@ -164,7 +173,7 @@ public class PcapCliTest {
long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
- when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -187,16 +196,18 @@ public class PcapCliTest {
String[] args = {
"query",
"-start_time", "500",
- "-num_reducers", "10",
"-query", "some query string"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+ Iterator iterator = pcaps.iterator();
+ SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+ when(iterable.iterator()).thenReturn(iterator);
- Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT);
- Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT);
+ Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
+ Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
String query = "some query string";
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -213,15 +224,19 @@ public class PcapCliTest {
"-num_reducers", "10",
"-base_path", "/base/path",
"-base_output_path", "/base/output/path",
- "-query", "some query string"
+ "-query", "some query string",
+ "-records_per_file", "1000"
};
List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")});
+ Iterator iterator = pcaps.iterator();
+ SequenceFileIterable iterable = mock(SequenceFileIterable.class);
+ when(iterable.iterator()).thenReturn(iterator);
Path base_path = new Path("/base/path");
Path base_output_path = new Path("/base/output/path");
String query = "some query string";
- when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps);
+ when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000");
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
@@ -229,54 +244,76 @@ public class PcapCliTest {
Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap");
}
+ // INVALID OPTION CHECKS
+
@Test
public void invalid_fixed_filter_arg_prints_help() throws Exception {
+ String[] args = {
+ "fixed",
+ "-start_time", "500",
+ "-end_time", "1000",
+ "-num_reducers", "10",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-query", "THIS IS AN ERROR"
+ };
+ assertCliError(args, "Fixed", "Unrecognized option: -query");
+ }
+
+ /**
+ *
+ * @param args PcapJob args
+ * @param type Fixed|Query
+ * @param optMsg Expected error message
+ */
+ public void assertCliError(String[] args, String type, String optMsg) {
PrintStream originalOutStream = System.out;
+ PrintStream originalErrOutStream = System.err;
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- PrintStream testStream = new PrintStream(new BufferedOutputStream(bos));
- System.setOut(testStream);
- String[] args = {
- "fixed",
- "-start_time", "500",
- "-end_time", "1000",
- "-num_reducers", "10",
- "-base_path", "/base/path",
- "-base_output_path", "/base/output/path",
- "-query", "THIS IS AN ERROR"
- };
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(bos));
+ System.setOut(outStream);
+
+ ByteArrayOutputStream ebos = new ByteArrayOutputStream();
+ PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos));
+ System.setErr(errOutStream);
PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
assertThat("Expect errors on run", cli.run(args), equalTo(-1));
- assertThat(bos.toString(), bos.toString().contains("usage: Fixed filter options"), equalTo(true));
+ assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true));
+ assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true));
} finally {
System.setOut(originalOutStream);
+ System.setErr(originalErrOutStream);
}
}
@Test
public void invalid_query_filter_arg_prints_help() throws Exception {
- PrintStream originalOutStream = System.out;
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- PrintStream outStream = new PrintStream(new BufferedOutputStream(bos));
- System.setOut(outStream);
- String[] args = {
- "query",
- "-start_time", "500",
- "-end_time", "1000",
- "-num_reducers", "10",
- "-base_path", "/base/path",
- "-base_output_path", "/base/output/path",
- "-ip_src_addr", "THIS IS AN ERROR"
- };
+ String[] args = {
+ "query",
+ "-start_time", "500",
+ "-end_time", "1000",
+ "-num_reducers", "10",
+ "-base_path", "/base/path",
+ "-base_output_path", "/base/output/path",
+ "-ip_src_addr", "THIS IS AN ERROR"
+ };
+ assertCliError(args, "Query", "");
+ }
- PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock);
- assertThat("Expect errors on run", cli.run(args), equalTo(-1));
- assertThat(bos.toString(), bos.toString().contains("usage: Query filter options"), equalTo(true));
- } finally {
- System.setOut(originalOutStream);
- }
+ @Test
+ public void missing_start_time_arg_prints_error_and_help() throws Exception {
+ String[] args = {
+ "fixed",
+ "-ip_src_addr", "192.168.1.1",
+ "-ip_dst_addr", "192.168.1.2",
+ "-ip_src_port", "8081",
+ "-ip_dst_port", "8082",
+ "-protocol", "6",
+ "-num_reducers", "10"
+ };
+ assertCliError(args, "Fixed", "Missing required option: st");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index cce4074..f874620 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
@@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.Logger;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.filter.PcapFilter;
@@ -64,6 +64,11 @@ public class PcapJob {
}
long x = longWritable.get();
int ret = (int)Long.divideUnsigned(x - start, width);
+ if(ret > numPartitions) {
+ throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d"
+ , Long.toUnsignedString(x), width, ret, numPartitions)
+ );
+ }
return ret;
}
@@ -176,32 +181,26 @@ public class PcapJob {
return ret;
}
- private List<byte[]> readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException {
- List<byte[]> ret = new ArrayList<>();
- for(RemoteIterator<LocatedFileStatus> it= fs.listFiles(outputPath, false);it.hasNext();) {
+ /**
+ * Returns a lazily-read Iterable over a set of sequence files
+ */
+ private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException {
+ List<Path> files = new ArrayList<>();
+ for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, false); it.hasNext(); ) {
Path p = it.next().getPath();
- if(p.getName().equals("_SUCCESS")) {
+ if (p.getName().equals("_SUCCESS")) {
fs.delete(p, false);
continue;
}
- SequenceFile.Reader reader = new SequenceFile.Reader(config,
- SequenceFile.Reader.file(p));
- LongWritable key = new LongWritable();
- BytesWritable value = new BytesWritable();
- while(reader.next(key, value)) {
- ret.add(value.copyBytes());
- }
- reader.close();
- fs.delete(p, false);
+ files.add(p);
}
- fs.delete(outputPath, false);
- if(LOG.isDebugEnabled()) {
- LOG.debug(outputPath + ": Returning " + ret.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(outputPath);
}
- return ret;
+ return new SequenceFileIterable(files, config);
}
- public <T> List<byte[]> query(Path basePath
+ public <T> SequenceFileIterable query(Path basePath
, Path baseOutputPath
, long beginNS
, long endNS
@@ -240,12 +239,10 @@ public class PcapJob {
}
}
-
- public static int findWidth(long start, long end, int numReducers) {
- return (int)Long.divideUnsigned(end - start, numReducers) + 1;
+ public static long findWidth(long start, long end, int numReducers) {
+ return Long.divideUnsigned(end - start, numReducers) + 1;
}
-
public <T> Job createJob( Path basePath
, Path outputPath
, long beginNS
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e0c9970b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7e373d..659a467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,7 +202,7 @@
<exclude>metron-ui/lib/public/font/**</exclude>
<exclude>metron-ui/node_modules/**</exclude>
<!-- pickle file - binary format -->
- <exclude>metron-deployment/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p</exclude>
+ <exclude>**/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p</exclude>
</excludes>
</configuration>
</plugin>