You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:53 UTC

[34/44] metron git commit: METRON-958: PCAP Query job throws exception when no files returned by time range query closes apache/metron#593

METRON-958: PCAP Query job throws exception when no files returned by time range query closes apache/metron#593


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e2197316
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e2197316
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e2197316

Branch: refs/heads/Metron_0.4.0
Commit: e2197316df5754aba601134ecce19b12092b6f37
Parents: 356881a
Author: mmiklavc <mi...@gmail.com>
Authored: Sun May 21 21:08:45 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sun May 21 21:08:45 2017 -0400

----------------------------------------------------------------------
 .../org/apache/metron/pcap/PcapJobTest.java     | 110 +-------------
 .../PcapTopologyIntegrationTest.java            | 123 +++++++--------
 .../apache/metron/pcap/PcapFilenameHelper.java  |  77 ++++++++++
 .../java/org/apache/metron/pcap/PcapHelper.java |  35 ++---
 .../java/org/apache/metron/pcap/mr/PcapJob.java | 105 +++++--------
 .../metron/pcap/utils/FileFilterUtil.java       | 138 +++++++++++++++++
 .../metron/pcap/PcapFilenameHelperTest.java     |  75 ++++++++++
 .../metron/pcap/mr/FileFilterUtilTest.java      | 150 +++++++++++++++++++
 8 files changed, 568 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 81725d8..3536a7e 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,10 @@
 
 package org.apache.metron.pcap;
 
-import com.google.common.collect.Iterables;
+import static java.lang.Long.toUnsignedString;
+import static org.hamcrest.CoreMatchers.equalTo;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
@@ -29,106 +29,9 @@ import org.apache.metron.pcap.mr.PcapJob;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.lang.Long.toUnsignedString;
-import static org.hamcrest.CoreMatchers.equalTo;
-
 public class PcapJobTest {
 
   @Test
-  public void test_getPaths_NoFiles() throws Exception {
-    PcapJob job;
-    {
-      final List<Path> inputFiles = new ArrayList<Path>() {{
-      }};
-      job = new PcapJob() {
-        @Override
-        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
-          return inputFiles;
-        }
-      };
-      Iterable<String> paths = job.getPaths(null, null, 0, 1000);
-      Assert.assertTrue(Iterables.isEmpty(paths));
-    }
-  }
-
-  @Test
-  public void test_getPaths_leftEdge() throws Exception {
-    PcapJob job;
-    {
-      final List<Path> inputFiles = new ArrayList<Path>() {{
-        add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-        add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-      }};
-      job = new PcapJob() {
-        @Override
-        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
-          return inputFiles;
-        }
-      };
-      Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
-      Assert.assertEquals(1, Iterables.size(paths));
-    }
-  }
-
-  @Test
-  public void test_getPaths_rightEdge() throws Exception {
-    PcapJob job;
-    {
-      final List<Path> inputFiles = new ArrayList<Path>() {{
-        add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-      }};
-      job = new PcapJob() {
-        @Override
-        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
-          return inputFiles;
-        }
-      };
-      Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L);
-      Assert.assertEquals(2, Iterables.size(paths));
-    }
-    {
-      final List<Path> inputFiles = new ArrayList<Path>() {{
-        add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-      }};
-      job = new PcapJob() {
-        @Override
-        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
-          return inputFiles;
-        }
-      };
-      Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L);
-      Assert.assertEquals(2, Iterables.size(paths));
-    }
-  }
-
-  @Test
-  public void test_getPaths_bothEdges() throws Exception {
-    PcapJob job;
-    {
-      final List<Path> inputFiles = new ArrayList<Path>() {{
-        add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-        add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
-      }};
-      job = new PcapJob() {
-        @Override
-        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
-          return inputFiles;
-        }
-      };
-      Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
-      Assert.assertEquals(3, Iterables.size(paths));
-    }
-  }
-
-  @Test
   public void partition_gives_value_in_range() throws Exception {
     long start = 1473897600000000000L;
     long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L);
@@ -138,6 +41,9 @@ public class PcapJobTest {
     conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10));
     PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner();
     partitioner.setConf(conf);
-    Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8));
+    Assert.assertThat("Partition not in range",
+        partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10),
+        equalTo(8));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index d6d54dc..7d1dba8 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -23,6 +23,17 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
 import kafka.consumer.ConsumerIterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,13 +66,6 @@ import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.*;
-
 public class PcapTopologyIntegrationTest {
   final static String KAFKA_TOPIC = "pcap";
   private static String BASE_DIR = "pcap";
@@ -69,22 +73,7 @@ public class PcapTopologyIntegrationTest {
   private static String QUERY_DIR = BASE_DIR + "/query";
   private String topologiesDir = "src/main/flux";
   private String targetDir = "target";
-  private File getOutDir(String targetDir) {
-    File outDir = new File(new File(targetDir), DATA_DIR);
-    if (!outDir.exists()) {
-      outDir.mkdirs();
-    }
-
-    return outDir;
-  }
 
-  private File getQueryDir(String targetDir) {
-    File outDir = new File(new File(targetDir), QUERY_DIR);
-    if (!outDir.exists()) {
-      outDir.mkdirs();
-    }
-    return outDir;
-  }
   private static void clearOutDir(File outDir) {
     for(File f : outDir.listFiles()) {
       f.delete();
@@ -100,43 +89,6 @@ public class PcapTopologyIntegrationTest {
     }).length;
   }
 
-  private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
-    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
-            SequenceFile.Reader.file(pcapFile)
-    );
-    List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>();
-    IntWritable key = new IntWritable();
-    BytesWritable value = new BytesWritable();
-    while (reader.next(key, value)) {
-      byte[] pcapWithHeader = value.copyBytes();
-      //if you are debugging and want the hex dump of the packets, uncomment the following:
-
-      //for(byte b : pcapWithHeader) {
-      //  System.out.print(String.format("%02x", b));
-      //}
-      //System.out.println("");
-
-      long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
-      {
-        List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
-        for(PacketInfo pi : info) {
-          Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
-          //IF you are debugging and want to see the packets, uncomment the following.
-          //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
-        }
-      }
-      if(withHeaders) {
-        ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
-      }
-      else {
-        byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
-        System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
-        ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
-      }
-    }
-    return Iterables.limit(ret, 2*(ret.size()/2));
-  }
-
   @Test
   public void testTimestampInPacket() throws Exception {
     testTopology(new Function<Properties, Void>() {
@@ -561,6 +513,59 @@ public class PcapTopologyIntegrationTest {
     }
   }
 
+  private File getOutDir(String targetDir) {
+    File outDir = new File(new File(targetDir), DATA_DIR);
+    if (!outDir.exists()) {
+      outDir.mkdirs();
+    }
+    return outDir;
+  }
+
+  private File getQueryDir(String targetDir) {
+    File outDir = new File(new File(targetDir), QUERY_DIR);
+    if (!outDir.exists()) {
+      outDir.mkdirs();
+    }
+    return outDir;
+  }
+
+  private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
+        SequenceFile.Reader.file(pcapFile)
+    );
+    List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>();
+    IntWritable key = new IntWritable();
+    BytesWritable value = new BytesWritable();
+    while (reader.next(key, value)) {
+      byte[] pcapWithHeader = value.copyBytes();
+      //if you are debugging and want the hex dump of the packets, uncomment the following:
+
+      //for(byte b : pcapWithHeader) {
+      //  System.out.print(String.format("%02x", b));
+      //}
+      //System.out.println("");
+
+      long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
+      {
+        List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
+        for(PacketInfo pi : info) {
+          Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
+          //IF you are debugging and want to see the packets, uncomment the following.
+          //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
+        }
+      }
+      if(withHeaders) {
+        ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
+      }
+      else {
+        byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
+        System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
+        ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
+      }
+    }
+    return Iterables.limit(ret, 2*(ret.size()/2));
+  }
+
   public static void assertInOrder(Iterable<byte[]> packets) {
     long previous = 0;
     for(byte[] packet : packets) {

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java
new file mode 100644
index 0000000..a6f8546
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import java.util.Arrays;
+
+/**
+ * Expects files in the following format
+ * pcap_$TOPIC_$TS_$PARTITION_$UUID
+ */
+public class PcapFilenameHelper {
+
+  public static final String PREFIX = "pcap_";
+
+  /**
+   * Extract kafka topic from pcap filename. Resilient to underscores and hyphens in the kafka
+   * topic name when splitting the value.
+   */
+  public static String getKafkaTopic(String pcapFilename) {
+    String[] tokens = stripPrefix(pcapFilename).split("_");
+    return String.join("_", Arrays.copyOfRange(tokens, 0, tokens.length - 3));
+  }
+
+  private static String stripPrefix(String s) {
+    return s.substring(PREFIX.length());
+  }
+
+  /**
+   * Gets unsigned long timestamp from the PCAP filename
+   *
+   * @return timestamp, or null if unable to parse
+   */
+  public static Long getTimestamp(String pcapFilename) {
+    String[] tokens = stripPrefix(pcapFilename).split("_");
+    try {
+      return Long.parseUnsignedLong(tokens[tokens.length - 3]);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Gets Kafka partition number from the PCAP filename
+   *
+   * @return partition, or null if unable to parse
+   */
+  public static Integer getKafkaPartition(String pcapFilename) {
+    String[] tokens = stripPrefix(pcapFilename).split("_");
+    try {
+      return Integer.parseInt(tokens[tokens.length - 2]);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+  public static String getUUID(String pcapFilename) {
+    String[] tokens = stripPrefix(pcapFilename).split("_");
+    return tokens[tokens.length - 1];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index e1ad3ca..bb7d9f0 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -19,8 +19,12 @@
 package org.apache.metron.pcap;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.log4j.Logger;
 import org.apache.metron.spout.pcap.Endianness;
@@ -37,13 +41,6 @@ import org.krakenapps.pcap.packet.PcapPacket;
 import org.krakenapps.pcap.util.Buffer;
 import org.krakenapps.pcap.util.ByteOrderConverter;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class PcapHelper {
 
   public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES;
@@ -72,16 +69,6 @@ public class PcapHelper {
     }
   };
 
-  public static Long getTimestamp(String filename) {
-    try {
-      return Long.parseUnsignedLong(Iterables.get(Splitter.on('_').split(filename), 2));
-    }
-    catch(Exception e) {
-      //something went wrong here.
-      return null;
-    }
-  }
-
   /**
    *
    * @param topic
@@ -363,4 +350,14 @@ public class PcapHelper {
     }
     return messages;
   }
+
+  public static boolean greaterThanOrEqualTo(long a, long b) {
+    return Long.compareUnsigned(a, b) >= 0;
+  }
+
+  public static boolean lessThanOrEqualTo(long a, long b) {
+    return Long.compareUnsigned(a, b) <= 0;
+  }
+
 }
+

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 8d40e5f..62f9844 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -18,7 +18,20 @@
 
 package org.apache.metron.pcap.mr;
 
+import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
+import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
+
 import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,22 +46,19 @@ import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.log4j.Logger;
 import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.filter.PcapFilter;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.PcapFilters;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.stream.Stream;
+import org.apache.metron.pcap.utils.FileFilterUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PcapJob {
-  private static final Logger LOG = Logger.getLogger(PcapJob.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(PcapJob.class);
   public static final String START_TS_CONF = "start_ts";
   public static final String END_TS_CONF = "end_ts";
   public static final String WIDTH_CONF = "width";
@@ -110,7 +120,7 @@ public class PcapJob {
 
     @Override
     protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
-      if (Long.compareUnsigned(key.get(), start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) {
+      if (greaterThanOrEqualTo(key.get(), start) && lessThanOrEqualTo(key.get(), end)) {
         // It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1
         // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
         // objects back to byte arrays, otherwise we could support more than one packet.
@@ -144,56 +154,6 @@ public class PcapJob {
     }
   }
 
-  protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
-    List<Path> ret = new ArrayList<>();
-    RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true);
-    while(filesIt.hasNext()){
-      ret.add(filesIt.next().getPath());
-    }
-    return ret;
-  }
-
-  public Iterable<String> getPaths(FileSystem fs, Path basePath, long begin, long end) throws IOException {
-    List<String> ret = new ArrayList<>();
-    Iterator<Path> files = listFiles(fs, basePath).iterator();
-    /*
-    The trick here is that we need a trailing left endpoint, because we only capture the start of the
-    timeseries kept in the file.
-     */
-    boolean isFirst = true;
-    Path leftEndpoint = files.hasNext()?files.next():null;
-    if(leftEndpoint == null) {
-      return ret;
-    }
-    {
-      Long ts = PcapHelper.getTimestamp(leftEndpoint.getName());
-      if(ts != null && Long.compareUnsigned(ts, begin) >= 0 && Long.compareUnsigned(ts, end) <= 0) {
-        ret.add(leftEndpoint.toString());
-        isFirst = false;
-      }
-    }
-    while(files.hasNext()) {
-      Path p = files.next();
-      Long ts = PcapHelper.getTimestamp(p.getName());
-      if(ts != null && Long.compareUnsigned(ts, begin) >= 0 && Long.compareUnsigned(ts, end) <= 0) {
-        if(isFirst && leftEndpoint != null) {
-          ret.add(leftEndpoint.toString());
-        }
-        if(isFirst) {
-          isFirst = false;
-        }
-        ret.add(p.toString());
-      }
-      else {
-        leftEndpoint = p;
-      }
-    }
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Including files " + Joiner.on(",").join(ret));
-    }
-    return ret;
-  }
-
   /**
    * Returns a lazily-read Iterable over a set of sequence files
    */
@@ -207,9 +167,7 @@ public class PcapJob {
       }
       files.add(p);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(outputPath);
-    }
+    LOG.debug("Output path={}", outputPath);
     Collections.sort(files, (o1,o2) -> o1.getName().compareTo(o2.getName()));
     return new SequenceFileIterable(files, config);
   }
@@ -244,11 +202,14 @@ public class PcapJob {
                        , fs
                        , filterImpl
                        );
+    if (job == null) {
+      LOG.info("No files to process with specified date range.");
+      return new SequenceFileIterable(new ArrayList<>(), conf);
+    }
     boolean completed = job.waitForCompletion(true);
     if(completed) {
       return readResults(outputPath, conf, fs);
-    }
-    else {
+    } else {
       throw new RuntimeException("Unable to complete query due to errors.  Please check logs for full errors.");
     }
   }
@@ -282,11 +243,25 @@ public class PcapJob {
     job.setPartitionerClass(PcapPartitioner.class);
     job.setOutputKeyClass(LongWritable.class);
     job.setOutputValueClass(BytesWritable.class);
-    SequenceFileInputFormat.addInputPaths(job, Joiner.on(',').join(getPaths(fs, basePath, beginNS, endNS )));
+    Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath));
+    String inputPaths = Joiner.on(',').join(filteredPaths);
+    if (StringUtils.isEmpty(inputPaths)) {
+      return null;
+    }
+    SequenceFileInputFormat.addInputPaths(job, inputPaths);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     SequenceFileOutputFormat.setOutputPath(job, outputPath);
     return job;
+  }
 
+  protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+    List<Path> ret = new ArrayList<>();
+    RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true);
+    while (filesIt.hasNext()) {
+      ret.add(filesIt.next().getPath());
+    }
+    return ret;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java
new file mode 100644
index 0000000..bbbc4bb
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.utils;
+
+import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
+import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.pcap.PcapFilenameHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileFilterUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FileFilterUtil.class);
+
+  private FileFilterUtil() {
+  }
+
+  /*
+   * The trick here is that we need a trailing left endpoint, because we only capture the start of the
+   * timeseries kept in the file.
+   */
+  public static Iterable<String> getPathsInTimeRange(long beginTs, long endTs,
+      Iterable<Path> files) {
+    Map<Integer, List<Path>> filesByPartition = getFilesByPartition(files);
+    List<String> filteredFiles = filterByTimestampLT(beginTs, endTs, filesByPartition);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Including files " + Joiner.on(",").join(filteredFiles));
+    }
+    return filteredFiles;
+  }
+
+  public static Map<Integer, List<Path>> getFilesByPartition(Iterable<Path> files) {
+    Iterator<Path> filesIt = files.iterator();
+    Map<Integer, List<Path>> filesByPartition = new HashMap<>();
+    while (filesIt.hasNext()) {
+      Path p = filesIt.next();
+      Integer partition = PcapFilenameHelper.getKafkaPartition(p.getName());
+      if (!filesByPartition.containsKey(partition)) {
+        filesByPartition.put(partition, new ArrayList<>());
+      }
+      filesByPartition.get(partition).add(p);
+    }
+    return filesByPartition;
+  }
+
+  /**
+   * Given a map of partition numbers to files, return a list of files filtered by the supplied
+   * beginning and ending timestamps. Includes a left-trailing file.
+   *
+   * @param filesByPartition list of files mapped to partitions. Incoming files do not need to be
+   * sorted as this method will perform a lexicographical sort in normal ascending order.
+   * @return filtered list of files, unsorted
+   */
+  public static List<String> filterByTimestampLT(long beginTs, long endTs,
+      Map<Integer, List<Path>> filesByPartition) {
+    List<String> filteredFiles = new ArrayList<>();
+    for (Integer key : filesByPartition.keySet()) {
+      List<Path> paths = filesByPartition.get(key);
+      filteredFiles.addAll(filterByTimestampLT(beginTs, endTs, paths));
+    }
+    return filteredFiles;
+  }
+
+  /**
+   * Return a list of files filtered by the supplied beginning and ending timestamps. Includes a
+   * left-trailing file.
+   *
+   * @param paths list of files. Incoming files do not need to be sorted as this method will perform
+   * a lexicographical sort in normal ascending order.
+   * @return filtered list of files
+   */
+  public static List<String> filterByTimestampLT(long beginTs, long endTs, List<Path> paths) {
+    List<String> filteredFiles = new ArrayList<>();
+
+    //noinspection unchecked - hadoop fs uses non-generic Comparable interface
+    Collections.sort(paths);
+    Iterator<Path> filesIt = paths.iterator();
+    Path leftTrailing = filesIt.hasNext() ? filesIt.next() : null;
+    if (leftTrailing == null) {
+      return filteredFiles;
+    }
+    boolean first = true;
+    Long fileTS = PcapFilenameHelper.getTimestamp(leftTrailing.getName());
+    if (fileTS != null
+        && greaterThanOrEqualTo(fileTS, beginTs) && lessThanOrEqualTo(fileTS, endTs)) {
+      filteredFiles.add(leftTrailing.toString());
+      first = false;
+    }
+
+    if (first && !filesIt.hasNext()) {
+      filteredFiles.add(leftTrailing.toString());
+      return filteredFiles;
+    }
+
+    while (filesIt.hasNext()) {
+      Path p = filesIt.next();
+      fileTS = PcapFilenameHelper.getTimestamp(p.getName());
+      if (fileTS != null
+          && greaterThanOrEqualTo(fileTS, beginTs) && lessThanOrEqualTo(fileTS, endTs)) {
+        if (first) {
+          filteredFiles.add(leftTrailing.toString());
+          first = false;
+        }
+        filteredFiles.add(p.toString());
+      } else {
+        leftTrailing = p;
+      }
+    }
+
+    return filteredFiles;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java
new file mode 100644
index 0000000..03778d0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+public class PcapFilenameHelperTest {
+
+  @Test
+  public void extracts_info_from_filename() {
+    {
+      String pcapFilename = "pcap_pcap128_1494962815457986000_18_pcap-63-1495027314";
+      assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap128"));
+      assertThat(
+          Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+          equalTo(0));
+      assertThat(PcapFilenameHelper.getKafkaPartition(pcapFilename), equalTo(18));
+      assertThat(PcapFilenameHelper.getUUID(pcapFilename), equalTo("pcap-63-1495027314"));
+    }
+    {
+      String pcapFilename = "pcap_pcap-128_1494962815457986000_18_pcap-63-1495027314";
+      assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap-128"));
+      assertThat(
+          Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+          equalTo(0));
+    }
+    {
+      String pcapFilename = "pcap_pcap_128_1494962815457986000_18_pcap-63-1495027314";
+      assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap_128"));
+      assertThat(
+          Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+          equalTo(0));
+    }
+    {
+      String pcapFilename = "pcap_pcap___128___1494962815457986000_18_pcap-63-1495027314";
+      assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap___128__"));
+      assertThat(
+          Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+          equalTo(0));
+    }
+    {
+      String pcapFilename = "pcap___pcap___128___1494962815457986000_18_pcap-63-1495027314";
+      assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("__pcap___128__"));
+      assertThat(
+          Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L),
+          equalTo(0));
+    }
+  }
+
+  @Test
+  public void extracts_null_info_from_bad_filename_parts() {
+    String pcapFilename = "pcap_pcap128_AAA4962815457986000_BB_pcap-63-1495027314";
+    assertThat(PcapFilenameHelper.getTimestamp(pcapFilename), equalTo(null));
+    assertThat(PcapFilenameHelper.getKafkaPartition(pcapFilename), equalTo(null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java
new file mode 100644
index 0000000..cc05a9a
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.utils.FileFilterUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileFilterUtilTest {
+
+  private List<Path> filesIn;
+
+  @Before
+  public void setup() {
+    filesIn = new ArrayList<>();
+    filesIn.add(new Path("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910"));
+    filesIn.add(new Path("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910"));
+    filesIn.add(new Path("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910"));
+    filesIn.add(new Path("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"));
+    filesIn.add(new Path("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910"));
+  }
+
+  @Test
+  public void returns_files_by_partition() {
+    Map<Integer, List<Path>> filesByPartition = FileFilterUtil.getFilesByPartition(filesIn);
+    Map<Integer, List<Path>> expectedFilesPartitioned = new HashMap() {{
+      put(0, toList("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910"));
+      put(1, toList("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910"));
+      put(2, toList("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910"));
+      put(3, toList("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910"));
+      put(4, toList("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"));
+    }};
+    assertThat(filesByPartition, equalTo(expectedFilesPartitioned));
+  }
+
+  private List<Path> toList(String... items) {
+    return Arrays.asList(items).stream().map(i -> new Path(i)).collect(Collectors.toList());
+  }
+
+  @Test
+  public void returns_left_trailing_filtered_list() {
+    Map<Integer, List<Path>> filesByPartition = new HashMap() {{
+      put(0, toList("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910"));
+      put(1, toList("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910"));
+      put(2, toList("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910"));
+      put(3, toList("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910"));
+      put(4, toList("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"));
+    }};
+    List<String> lt = FileFilterUtil
+        .filterByTimestampLT(1495135377055375000L, 1495135512124943000L, filesByPartition);
+    List<String> expectedFiles = Arrays.asList(
+        "/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910");
+    assertThat(lt, equalTo(expectedFiles));
+  }
+
+  @Test
+  public void returns_left_trailing_filtered_list_from_paths() {
+    Iterable<String> paths = FileFilterUtil
+        .getPathsInTimeRange(1495135377055375000L, 1495135512124943000L, filesIn);
+    List<String> expectedFiles = Arrays.asList(
+        "/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910",
+        "/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910");
+    assertThat(paths, equalTo(expectedFiles));
+  }
+
+  @Test
+  public void test_getPaths_NoFiles() throws Exception {
+    final List<Path> inputFiles = new ArrayList<Path>();
+    Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, 1000, inputFiles);
+    Assert.assertTrue(Iterables.isEmpty(paths));
+  }
+
+  @Test
+  public void test_getPaths_leftEdge() throws Exception {
+    final List<Path> inputFiles = new ArrayList<Path>() {{
+      add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+    }};
+    Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()), inputFiles);
+    Assert.assertEquals(1, Iterables.size(paths));
+  }
+
+  @Test
+  public void test_getPaths_rightEdge() throws Exception {
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+        add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      }};
+      Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(1461589333993573000L - 1L, 1461589333993573000L + 1L, inputFiles);
+      Assert.assertEquals(2, Iterables.size(paths));
+    }
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+        add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      }};
+      Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(1461589334993573000L - 1L, 1461589334993573000L + 1L, inputFiles);
+      Assert.assertEquals(2, Iterables.size(paths));
+    }
+  }
+
+  @Test
+  public void test_getPaths_bothEdges() throws Exception {
+    final List<Path> inputFiles = new ArrayList<Path>() {{
+      add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+    }};
+    Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()), inputFiles);
+    Assert.assertEquals(3, Iterables.size(paths));
+  }
+}