You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:53 UTC
[34/44] metron git commit: METRON-958: PCAP Query job throws
exception when no files returned by time range query closes apache/metron#593
METRON-958: PCAP Query job throws exception when no files returned by time range query closes apache/metron#593
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e2197316
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e2197316
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e2197316
Branch: refs/heads/Metron_0.4.0
Commit: e2197316df5754aba601134ecce19b12092b6f37
Parents: 356881a
Author: mmiklavc <mi...@gmail.com>
Authored: Sun May 21 21:08:45 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sun May 21 21:08:45 2017 -0400
----------------------------------------------------------------------
.../org/apache/metron/pcap/PcapJobTest.java | 110 +-------------
.../PcapTopologyIntegrationTest.java | 123 +++++++--------
.../apache/metron/pcap/PcapFilenameHelper.java | 77 ++++++++++
.../java/org/apache/metron/pcap/PcapHelper.java | 35 ++---
.../java/org/apache/metron/pcap/mr/PcapJob.java | 105 +++++--------
.../metron/pcap/utils/FileFilterUtil.java | 138 +++++++++++++++++
.../metron/pcap/PcapFilenameHelperTest.java | 75 ++++++++++
.../metron/pcap/mr/FileFilterUtilTest.java | 150 +++++++++++++++++++
8 files changed, 568 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 81725d8..3536a7e 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,10 @@
package org.apache.metron.pcap;
-import com.google.common.collect.Iterables;
+import static java.lang.Long.toUnsignedString;
+import static org.hamcrest.CoreMatchers.equalTo;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
@@ -29,106 +29,9 @@ import org.apache.metron.pcap.mr.PcapJob;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.lang.Long.toUnsignedString;
-import static org.hamcrest.CoreMatchers.equalTo;
-
public class PcapJobTest {
@Test
- public void test_getPaths_NoFiles() throws Exception {
- PcapJob job;
- {
- final List<Path> inputFiles = new ArrayList<Path>() {{
- }};
- job = new PcapJob() {
- @Override
- protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
- return inputFiles;
- }
- };
- Iterable<String> paths = job.getPaths(null, null, 0, 1000);
- Assert.assertTrue(Iterables.isEmpty(paths));
- }
- }
-
- @Test
- public void test_getPaths_leftEdge() throws Exception {
- PcapJob job;
- {
- final List<Path> inputFiles = new ArrayList<Path>() {{
- add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- }};
- job = new PcapJob() {
- @Override
- protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
- return inputFiles;
- }
- };
- Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
- Assert.assertEquals(1, Iterables.size(paths));
- }
- }
-
- @Test
- public void test_getPaths_rightEdge() throws Exception {
- PcapJob job;
- {
- final List<Path> inputFiles = new ArrayList<Path>() {{
- add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- }};
- job = new PcapJob() {
- @Override
- protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
- return inputFiles;
- }
- };
- Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L);
- Assert.assertEquals(2, Iterables.size(paths));
- }
- {
- final List<Path> inputFiles = new ArrayList<Path>() {{
- add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- }};
- job = new PcapJob() {
- @Override
- protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
- return inputFiles;
- }
- };
- Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L);
- Assert.assertEquals(2, Iterables.size(paths));
- }
- }
-
- @Test
- public void test_getPaths_bothEdges() throws Exception {
- PcapJob job;
- {
- final List<Path> inputFiles = new ArrayList<Path>() {{
- add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
- }};
- job = new PcapJob() {
- @Override
- protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
- return inputFiles;
- }
- };
- Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
- Assert.assertEquals(3, Iterables.size(paths));
- }
- }
-
- @Test
public void partition_gives_value_in_range() throws Exception {
long start = 1473897600000000000L;
long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
@@ -138,6 +41,9 @@ public class PcapJobTest {
conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
partitioner.setConf(conf);
- Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8));
+ Assert.assertThat("Partition not in range",
+ partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10),
+ equalTo(8));
}
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index d6d54dc..7d1dba8 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -23,6 +23,17 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
import kafka.consumer.ConsumerIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -55,13 +66,6 @@ import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
-import javax.annotation.Nullable;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.*;
-
public class PcapTopologyIntegrationTest {
final static String KAFKA_TOPIC = "pcap";
private static String BASE_DIR = "pcap";
@@ -69,22 +73,7 @@ public class PcapTopologyIntegrationTest {
private static String QUERY_DIR = BASE_DIR + "/query";
private String topologiesDir = "src/main/flux";
private String targetDir = "target";
- private File getOutDir(String targetDir) {
- File outDir = new File(new File(targetDir), DATA_DIR);
- if (!outDir.exists()) {
- outDir.mkdirs();
- }
-
- return outDir;
- }
- private File getQueryDir(String targetDir) {
- File outDir = new File(new File(targetDir), QUERY_DIR);
- if (!outDir.exists()) {
- outDir.mkdirs();
- }
- return outDir;
- }
private static void clearOutDir(File outDir) {
for(File f : outDir.listFiles()) {
f.delete();
@@ -100,43 +89,6 @@ public class PcapTopologyIntegrationTest {
}).length;
}
- private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
- SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
- SequenceFile.Reader.file(pcapFile)
- );
- List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>();
- IntWritable key = new IntWritable();
- BytesWritable value = new BytesWritable();
- while (reader.next(key, value)) {
- byte[] pcapWithHeader = value.copyBytes();
- //if you are debugging and want the hex dump of the packets, uncomment the following:
-
- //for(byte b : pcapWithHeader) {
- // System.out.print(String.format("%02x", b));
- //}
- //System.out.println("");
-
- long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
- {
- List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
- for(PacketInfo pi : info) {
- Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
- //IF you are debugging and want to see the packets, uncomment the following.
- //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
- }
- }
- if(withHeaders) {
- ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
- }
- else {
- byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
- System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
- ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
- }
- }
- return Iterables.limit(ret, 2*(ret.size()/2));
- }
-
@Test
public void testTimestampInPacket() throws Exception {
testTopology(new Function<Properties, Void>() {
@@ -561,6 +513,59 @@ public class PcapTopologyIntegrationTest {
}
}
+ private File getOutDir(String targetDir) {
+ File outDir = new File(new File(targetDir), DATA_DIR);
+ if (!outDir.exists()) {
+ outDir.mkdirs();
+ }
+ return outDir;
+ }
+
+ private File getQueryDir(String targetDir) {
+ File outDir = new File(new File(targetDir), QUERY_DIR);
+ if (!outDir.exists()) {
+ outDir.mkdirs();
+ }
+ return outDir;
+ }
+
+ private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
+ SequenceFile.Reader.file(pcapFile)
+ );
+ List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>();
+ IntWritable key = new IntWritable();
+ BytesWritable value = new BytesWritable();
+ while (reader.next(key, value)) {
+ byte[] pcapWithHeader = value.copyBytes();
+ //if you are debugging and want the hex dump of the packets, uncomment the following:
+
+ //for(byte b : pcapWithHeader) {
+ // System.out.print(String.format("%02x", b));
+ //}
+ //System.out.println("");
+
+ long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
+ {
+ List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
+ for(PacketInfo pi : info) {
+ Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
+ //IF you are debugging and want to see the packets, uncomment the following.
+ //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
+ }
+ }
+ if(withHeaders) {
+ ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
+ }
+ else {
+ byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
+ System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
+ ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
+ }
+ }
+ return Iterables.limit(ret, 2*(ret.size()/2));
+ }
+
public static void assertInOrder(Iterable<byte[]> packets) {
long previous = 0;
for(byte[] packet : packets) {
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java
new file mode 100644
index 0000000..a6f8546
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import java.util.Arrays;
+
+/**
+ * Expects files in the following format
+ * pcap_$TOPIC_$TS_$PARTITION_$UUID
+ */
+public class PcapFilenameHelper {
+
+ public static final String PREFIX = "pcap_";
+
+ /**
+ * Extract kafka topic from pcap filename. Resilient to underscores and hyphens in the kafka
+ * topic name when splitting the value.
+ */
+ public static String getKafkaTopic(String pcapFilename) {
+ String[] tokens = stripPrefix(pcapFilename).split("_");
+ return String.join("_", Arrays.copyOfRange(tokens, 0, tokens.length - 3));
+ }
+
+ private static String stripPrefix(String s) {
+ return s.substring(PREFIX.length());
+ }
+
+ /**
+ * Gets unsigned long timestamp from the PCAP filename
+ *
+ * @return timestamp, or null if unable to parse
+ */
+ public static Long getTimestamp(String pcapFilename) {
+ String[] tokens = stripPrefix(pcapFilename).split("_");
+ try {
+ return Long.parseUnsignedLong(tokens[tokens.length - 3]);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Gets Kafka partition number from the PCAP filename
+ *
+ * @return partition, or null if unable to parse
+ */
+ public static Integer getKafkaPartition(String pcapFilename) {
+ String[] tokens = stripPrefix(pcapFilename).split("_");
+ try {
+ return Integer.parseInt(tokens[tokens.length - 2]);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ public static String getUUID(String pcapFilename) {
+ String[] tokens = stripPrefix(pcapFilename).split("_");
+ return tokens[tokens.length - 1];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index e1ad3ca..bb7d9f0 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -19,8 +19,12 @@
package org.apache.metron.pcap;
import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.apache.metron.spout.pcap.Endianness;
@@ -37,13 +41,6 @@ import org.krakenapps.pcap.packet.PcapPacket;
import org.krakenapps.pcap.util.Buffer;
import org.krakenapps.pcap.util.ByteOrderConverter;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class PcapHelper {
public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES;
@@ -72,16 +69,6 @@ public class PcapHelper {
}
};
- public static Long getTimestamp(String filename) {
- try {
- return Long.parseUnsignedLong(Iterables.get(Splitter.on('_').split(filename), 2));
- }
- catch(Exception e) {
- //something went wrong here.
- return null;
- }
- }
-
/**
*
* @param topic
@@ -363,4 +350,14 @@ public class PcapHelper {
}
return messages;
}
+
+ public static boolean greaterThanOrEqualTo(long a, long b) {
+ return Long.compareUnsigned(a, b) >= 0;
+ }
+
+ public static boolean lessThanOrEqualTo(long a, long b) {
+ return Long.compareUnsigned(a, b) <= 0;
+ }
+
}
+
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 8d40e5f..62f9844 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -18,7 +18,20 @@
package org.apache.metron.pcap.mr;
+import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
+import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
+
import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -33,22 +46,19 @@ import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.log4j.Logger;
import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.filter.PcapFilter;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.filter.PcapFilters;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.stream.Stream;
+import org.apache.metron.pcap.utils.FileFilterUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PcapJob {
- private static final Logger LOG = Logger.getLogger(PcapJob.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(PcapJob.class);
public static final String START_TS_CONF = "start_ts";
public static final String END_TS_CONF = "end_ts";
public static final String WIDTH_CONF = "width";
@@ -110,7 +120,7 @@ public class PcapJob {
@Override
protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
- if (Long.compareUnsigned(key.get(), start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) {
+ if (greaterThanOrEqualTo(key.get(), start) && lessThanOrEqualTo(key.get(), end)) {
// It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1
// object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
// objects back to byte arrays, otherwise we could support more than one packet.
@@ -144,56 +154,6 @@ public class PcapJob {
}
}
- protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
- List<Path> ret = new ArrayList<>();
- RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true);
- while(filesIt.hasNext()){
- ret.add(filesIt.next().getPath());
- }
- return ret;
- }
-
- public Iterable<String> getPaths(FileSystem fs, Path basePath, long begin, long end) throws IOException {
- List<String> ret = new ArrayList<>();
- Iterator<Path> files = listFiles(fs, basePath).iterator();
- /*
- The trick here is that we need a trailing left endpoint, because we only capture the start of the
- timeseries kept in the file.
- */
- boolean isFirst = true;
- Path leftEndpoint = files.hasNext()?files.next():null;
- if(leftEndpoint == null) {
- return ret;
- }
- {
- Long ts = PcapHelper.getTimestamp(leftEndpoint.getName());
- if(ts != null && Long.compareUnsigned(ts, begin) >= 0 && Long.compareUnsigned(ts, end) <= 0) {
- ret.add(leftEndpoint.toString());
- isFirst = false;
- }
- }
- while(files.hasNext()) {
- Path p = files.next();
- Long ts = PcapHelper.getTimestamp(p.getName());
- if(ts != null && Long.compareUnsigned(ts, begin) >= 0 && Long.compareUnsigned(ts, end) <= 0) {
- if(isFirst && leftEndpoint != null) {
- ret.add(leftEndpoint.toString());
- }
- if(isFirst) {
- isFirst = false;
- }
- ret.add(p.toString());
- }
- else {
- leftEndpoint = p;
- }
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Including files " + Joiner.on(",").join(ret));
- }
- return ret;
- }
-
/**
* Returns a lazily-read Iterable over a set of sequence files
*/
@@ -207,9 +167,7 @@ public class PcapJob {
}
files.add(p);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(outputPath);
- }
+ LOG.debug("Output path={}", outputPath);
Collections.sort(files, (o1,o2) -> o1.getName().compareTo(o2.getName()));
return new SequenceFileIterable(files, config);
}
@@ -244,11 +202,14 @@ public class PcapJob {
, fs
, filterImpl
);
+ if (job == null) {
+ LOG.info("No files to process with specified date range.");
+ return new SequenceFileIterable(new ArrayList<>(), conf);
+ }
boolean completed = job.waitForCompletion(true);
if(completed) {
return readResults(outputPath, conf, fs);
- }
- else {
+ } else {
throw new RuntimeException("Unable to complete query due to errors. Please check logs for full errors.");
}
}
@@ -282,11 +243,25 @@ public class PcapJob {
job.setPartitionerClass(PcapPartitioner.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(BytesWritable.class);
- SequenceFileInputFormat.addInputPaths(job, Joiner.on(',').join(getPaths(fs, basePath, beginNS, endNS )));
+ Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath));
+ String inputPaths = Joiner.on(',').join(filteredPaths);
+ if (StringUtils.isEmpty(inputPaths)) {
+ return null;
+ }
+ SequenceFileInputFormat.addInputPaths(job, inputPaths);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, outputPath);
return job;
+ }
+ protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+ List<Path> ret = new ArrayList<>();
+ RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true);
+ while (filesIt.hasNext()) {
+ ret.add(filesIt.next().getPath());
+ }
+ return ret;
}
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java
new file mode 100644
index 0000000..bbbc4bb
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.utils;
+
+import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
+import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.pcap.PcapFilenameHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileFilterUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileFilterUtil.class);
+
+ private FileFilterUtil() {
+ }
+
+ /*
+ * The trick here is that we need a trailing left endpoint, because we only capture the start of the
+ * timeseries kept in the file.
+ */
+ public static Iterable<String> getPathsInTimeRange(long beginTs, long endTs,
+ Iterable<Path> files) {
+ Map<Integer, List<Path>> filesByPartition = getFilesByPartition(files);
+ List<String> filteredFiles = filterByTimestampLT(beginTs, endTs, filesByPartition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Including files " + Joiner.on(",").join(filteredFiles));
+ }
+ return filteredFiles;
+ }
+
+ public static Map<Integer, List<Path>> getFilesByPartition(Iterable<Path> files) {
+ Iterator<Path> filesIt = files.iterator();
+ Map<Integer, List<Path>> filesByPartition = new HashMap<>();
+ while (filesIt.hasNext()) {
+ Path p = filesIt.next();
+ Integer partition = PcapFilenameHelper.getKafkaPartition(p.getName());
+ if (!filesByPartition.containsKey(partition)) {
+ filesByPartition.put(partition, new ArrayList<>());
+ }
+ filesByPartition.get(partition).add(p);
+ }
+ return filesByPartition;
+ }
+
+ /**
+ * Given a map of partition numbers to files, return a list of files filtered by the supplied
+ * beginning and ending timestamps. Includes a left-trailing file.
+ *
+ * @param filesByPartition list of files mapped to partitions. Incoming files do not need to be
+ * sorted as this method will perform a lexicographical sort in normal ascending order.
+ * @return filtered list of files, unsorted
+ */
+ public static List<String> filterByTimestampLT(long beginTs, long endTs,
+ Map<Integer, List<Path>> filesByPartition) {
+ List<String> filteredFiles = new ArrayList<>();
+ for (Integer key : filesByPartition.keySet()) {
+ List<Path> paths = filesByPartition.get(key);
+ filteredFiles.addAll(filterByTimestampLT(beginTs, endTs, paths));
+ }
+ return filteredFiles;
+ }
+
+ /**
+ * Return a list of files filtered by the supplied beginning and ending timestamps. Includes a
+ * left-trailing file.
+ *
+ * @param paths list of files. Incoming files do not need to be sorted as this method will perform
+ * a lexicographical sort in normal ascending order.
+ * @return filtered list of files
+ */
+ public static List<String> filterByTimestampLT(long beginTs, long endTs, List<Path> paths) {
+ List<String> filteredFiles = new ArrayList<>();
+
+ //noinspection unchecked - hadoop fs uses non-generic Comparable interface
+ Collections.sort(paths);
+ Iterator<Path> filesIt = paths.iterator();
+ Path leftTrailing = filesIt.hasNext() ? filesIt.next() : null;
+ if (leftTrailing == null) {
+ return filteredFiles;
+ }
+ boolean first = true;
+ Long fileTS = PcapFilenameHelper.getTimestamp(leftTrailing.getName());
+ if (fileTS != null
+ && greaterThanOrEqualTo(fileTS, beginTs) && lessThanOrEqualTo(fileTS, endTs)) {
+ filteredFiles.add(leftTrailing.toString());
+ first = false;
+ }
+
+ if (first && !filesIt.hasNext()) {
+ filteredFiles.add(leftTrailing.toString());
+ return filteredFiles;
+ }
+
+ while (filesIt.hasNext()) {
+ Path p = filesIt.next();
+ fileTS = PcapFilenameHelper.getTimestamp(p.getName());
+ if (fileTS != null
+ && greaterThanOrEqualTo(fileTS, beginTs) && lessThanOrEqualTo(fileTS, endTs)) {
+ if (first) {
+ filteredFiles.add(leftTrailing.toString());
+ first = false;
+ }
+ filteredFiles.add(p.toString());
+ } else {
+ leftTrailing = p;
+ }
+ }
+
+ return filteredFiles;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java
new file mode 100644
index 0000000..03778d0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+public class PcapFilenameHelperTest {
+
+ @Test
+ public void extracts_info_from_filename() {
+ {
+ String pcapFilename = "pcap_pcap128_1494962815457986000_18_pcap-63-1495027314";
+ assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap128"));
+ assertThat(
+ Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+ equalTo(0));
+ assertThat(PcapFilenameHelper.getKafkaPartition(pcapFilename), equalTo(18));
+ assertThat(PcapFilenameHelper.getUUID(pcapFilename), equalTo("pcap-63-1495027314"));
+ }
+ {
+ String pcapFilename = "pcap_pcap-128_1494962815457986000_18_pcap-63-1495027314";
+ assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap-128"));
+ assertThat(
+ Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+ equalTo(0));
+ }
+ {
+ String pcapFilename = "pcap_pcap_128_1494962815457986000_18_pcap-63-1495027314";
+ assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap_128"));
+ assertThat(
+ Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+ equalTo(0));
+ }
+ {
+ String pcapFilename = "pcap_pcap___128___1494962815457986000_18_pcap-63-1495027314";
+ assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap___128__"));
+ assertThat(
+ Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+ equalTo(0));
+ }
+ {
+ String pcapFilename = "pcap___pcap___128___1494962815457986000_18_pcap-63-1495027314";
+ assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("__pcap___128__"));
+ assertThat(
+ Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+ equalTo(0));
+ }
+ }
+
+ @Test
+ public void extracts_null_info_from_bad_filename_parts() {
+ String pcapFilename = "pcap_pcap128_AAA4962815457986000_BB_pcap-63-1495027314";
+ assertThat(PcapFilenameHelper.getTimestamp(pcapFilename), equalTo(null));
+ assertThat(PcapFilenameHelper.getKafkaPartition(pcapFilename), equalTo(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java
new file mode 100644
index 0000000..cc05a9a
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.utils.FileFilterUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileFilterUtilTest {
+
+ private List<Path> filesIn;
+
+ @Before
+ public void setup() {
+ filesIn = new ArrayList<>();
+ filesIn.add(new Path("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910"));
+ filesIn.add(new Path("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910"));
+ filesIn.add(new Path("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910"));
+ filesIn.add(new Path("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"));
+ filesIn.add(new Path("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910"));
+ }
+
+ @Test
+ public void returns_files_by_partition() {
+ Map<Integer, List<Path>> filesByPartition = FileFilterUtil.getFilesByPartition(filesIn);
+ Map<Integer, List<Path>> expectedFilesPartitioned = new HashMap() {{
+ put(0, toList("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910"));
+ put(1, toList("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910"));
+ put(2, toList("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910"));
+ put(3, toList("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910"));
+ put(4, toList("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"));
+ }};
+ assertThat(filesByPartition, equalTo(expectedFilesPartitioned));
+ }
+
+ private List<Path> toList(String... items) {
+ return Arrays.asList(items).stream().map(i -> new Path(i)).collect(Collectors.toList());
+ }
+
+ @Test
+ public void returns_left_trailing_filtered_list() {
+ Map<Integer, List<Path>> filesByPartition = new HashMap() {{
+ put(0, toList("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910"));
+ put(1, toList("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910"));
+ put(2, toList("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910"));
+ put(3, toList("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910"));
+ put(4, toList("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"));
+ }};
+ List<String> lt = FileFilterUtil
+ .filterByTimestampLT(1495135377055375000L, 1495135512124943000L, filesByPartition);
+ List<String> expectedFiles = Arrays.asList(
+ "/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910");
+ assertThat(lt, equalTo(expectedFiles));
+ }
+
+ @Test
+ public void returns_left_trailing_filtered_list_from_paths() {
+ Iterable<String> paths = FileFilterUtil
+ .getPathsInTimeRange(1495135377055375000L, 1495135512124943000L, filesIn);
+ List<String> expectedFiles = Arrays.asList(
+ "/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910",
+ "/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910");
+ assertThat(paths, equalTo(expectedFiles));
+ }
+
+ @Test
+ public void test_getPaths_NoFiles() throws Exception {
+ final List<Path> inputFiles = new ArrayList<Path>();
+ Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, 1000, inputFiles);
+ Assert.assertTrue(Iterables.isEmpty(paths));
+ }
+
+ @Test
+ public void test_getPaths_leftEdge() throws Exception {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()), inputFiles);
+ Assert.assertEquals(1, Iterables.size(paths));
+ }
+
+ @Test
+ public void test_getPaths_rightEdge() throws Exception {
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(1461589333993573000L - 1L, 1461589333993573000L + 1L, inputFiles);
+ Assert.assertEquals(2, Iterables.size(paths));
+ }
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(1461589334993573000L - 1L, 1461589334993573000L + 1L, inputFiles);
+ Assert.assertEquals(2, Iterables.size(paths));
+ }
+ }
+
+ @Test
+ public void test_getPaths_bothEdges() throws Exception {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()), inputFiles);
+ Assert.assertEquals(3, Iterables.size(paths));
+ }
+}