You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/29 18:28:32 UTC

[2/6] incubator-metron git commit: METRON-119 - Move PCAP infrastructure from HBase closes apache/incubator-metron#93

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
new file mode 100644
index 0000000..49fd476
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+import org.apache.metron.pcap.PcapHelper;
+
+import java.io.*;
+import java.util.EnumSet;
+
+public class PartitionHDFSWriter implements AutoCloseable, Serializable {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class);
+
+
+  public static interface SyncHandler {
+    void sync(FSDataOutputStream outputStream) throws IOException;
+  }
+
+  public static enum SyncHandlers implements SyncHandler{
+    DEFAULT(new SyncHandler() {
+
+      public void sync(FSDataOutputStream outputStream) throws IOException {
+        outputStream.hflush();
+        outputStream.hsync();
+      }
+    })
+    ,HDFS(new SyncHandler() {
+      public void sync(FSDataOutputStream outputStream) throws IOException{
+
+        outputStream.hflush();
+        outputStream.hsync();
+        ((HdfsDataOutputStream)outputStream).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+      }
+    })
+    ,LOCAL(new SyncHandler() {
+
+      @Override
+      public void sync(FSDataOutputStream outputStream) throws IOException {
+        outputStream.getWrappedStream().flush();
+        outputStream.getWrappedStream();
+      }
+    })
+    ;
+    private SyncHandler func;
+    SyncHandlers(SyncHandler func) {
+      this.func = func;
+    }
+
+    private SyncHandler getHandler() {
+      return func;
+    }
+
+    @Override
+    public void sync(FSDataOutputStream input) throws IOException {
+      func.sync(input);
+    }
+  }
+
+
+  private String topic;
+  private int partition;
+  private String uuid;
+  private FileSystem fs;
+  private FSDataOutputStream outputStream;
+  private SequenceFile.Writer writer;
+  private HDFSWriterConfig config;
+  private SyncHandler syncHandler;
+  private long batchStartTime;
+  private long numWritten;
+
+  public PartitionHDFSWriter(String topic, int partition, String uuid, HDFSWriterConfig config) {
+    this.topic = topic;
+    this.partition = partition;
+    this.uuid = uuid;
+    this.config = config;
+    try {
+      this.fs = FileSystem.get(new Configuration());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get FileSystem", e);
+    }
+  }
+
+  public String timestampToString(long ts) {
+    return Long.toUnsignedString(ts);
+  }
+
+  public void handle(LongWritable ts, BytesWritable value) throws IOException {
+    turnoverIfNecessary(ts.get());
+    writer.append(ts, new BytesWritable(value.getBytes()));
+    syncHandler.sync(outputStream);
+    numWritten++;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getPartition() {
+    return partition;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    if(writer != null) {
+      writer.close();
+    }
+    if(outputStream != null) {
+      outputStream.close();
+    }
+  }
+  private Path getPath(long ts) {
+
+    String fileName = PcapHelper.toFilename(topic, ts, partition + "", uuid);
+    return new Path(config.getOutputPath(), fileName);
+  }
+
+  private void turnoverIfNecessary(long ts) throws IOException {
+    turnoverIfNecessary(ts, false);
+  }
+
+  private void turnoverIfNecessary(long ts, boolean force) throws IOException {
+    long duration = ts - batchStartTime;
+    boolean initial = outputStream == null;
+    boolean overDuration = duration >= config.getMaxTimeNS();
+    boolean tooManyPackets = numWritten >= config.getNumPackets();
+    if(force || initial || overDuration || tooManyPackets ) {
+      //turnover
+      Path path = getPath(ts);
+      close();
+
+      if(fs instanceof LocalFileSystem) {
+        outputStream = new FSDataOutputStream(new FileOutputStream(new File(path.toString())));
+        syncHandler = SyncHandlers.LOCAL.getHandler();
+      }
+      else {
+        outputStream = fs.create(path, true);
+        if (outputStream instanceof HdfsDataOutputStream) {
+          if (initial) {
+            LOG.info("Using the HDFS sync handler.");
+          }
+          syncHandler = SyncHandlers.HDFS.getHandler();
+        } else {
+          if (initial) {
+            LOG.info("Using the default sync handler, which cannot guarantee atomic appends at the record level, be forewarned!");
+          }
+          syncHandler = SyncHandlers.DEFAULT.getHandler();
+        }
+
+      }
+      writer = SequenceFile.createWriter(new Configuration()
+              , SequenceFile.Writer.keyClass(LongWritable.class)
+              , SequenceFile.Writer.valueClass(BytesWritable.class)
+              , SequenceFile.Writer.stream(outputStream)
+              , SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)
+      );
+      //reset state
+      LOG.info("Turning over and writing to " + path);
+      batchStartTime = ts;
+      numWritten = 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
new file mode 100644
index 0000000..99f9229
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap;
+
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.spout.pcap.scheme.TimestampScheme;
+import storm.kafka.BrokerHosts;
+
+public class SpoutConfig extends org.apache.metron.common.spout.kafka.SpoutConfig{
+
+  public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
+    super(hosts, topic, zkRoot, id);
+  }
+
+  public SpoutConfig withTimestampScheme(String scheme, String granularity) {
+    super.scheme = TimestampScheme.getScheme(scheme, TimestampConverters.getConverter(granularity));
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
new file mode 100644
index 0000000..28aae7a
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.spout.pcap.Endianness;
+import storm.kafka.KeyValueScheme;
+
+import java.util.List;
+
+public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
+  private static final Logger LOG = Logger.getLogger(FromKeyScheme.class);
+
+  private TimestampConverter converter = TimestampConverters.MICROSECONDS;
+  private static Endianness endianness = Endianness.getNativeEndianness();
+  @Override
+  public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
+    Long ts = converter.toNanoseconds(Bytes.toLong(key));
+    byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
+    byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
+    return new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(globalHeaderized)));
+  }
+
+  @Override
+  public List<Object> deserialize(byte[] ser) {
+    throw new UnsupportedOperationException("Really only interested in deserializing a key and a value");
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(TimestampScheme.KV_FIELD);
+  }
+
+  @Override
+  public FromKeyScheme withTimestampConverter(TimestampConverter converter) {
+    try {
+      this.converter = converter;
+    }
+    catch(IllegalArgumentException iae) {
+      LOG.error(iae.getMessage(), iae);
+    }
+    return this;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
new file mode 100644
index 0000000..9b94e2b
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.spout.MultiScheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import org.apache.metron.pcap.PcapHelper;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FromPacketScheme implements MultiScheme,KeyConvertible {
+  private static final Logger LOG = Logger.getLogger(FromPacketScheme.class);
+  @Override
+  public Iterable<List<Object>> deserialize(byte[] rawValue) {
+    byte[] value = rawValue;
+    Long ts = PcapHelper.getTimestamp(value);
+    if(ts != null) {
+      return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(rawValue))));
+    }
+    else {
+      return ImmutableList.of(new Values(Collections.EMPTY_LIST));
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(TimestampScheme.KV_FIELD);
+  }
+
+
+  @Override
+  public FromPacketScheme withTimestampConverter(TimestampConverter converter) {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
new file mode 100644
index 0000000..54e52e8
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.spout.pcap.scheme;
+
+
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+
+public interface KeyConvertible {
+  KeyConvertible withTimestampConverter(TimestampConverter converter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
new file mode 100644
index 0000000..3f98c44
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.spout.MultiScheme;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import storm.kafka.KeyValueSchemeAsMultiScheme;
+
+public enum TimestampScheme {
+   FROM_KEY( converter -> new KeyValueSchemeAsMultiScheme(new FromKeyScheme().withTimestampConverter(converter)))
+  ,FROM_PACKET(converter -> new FromPacketScheme().withTimestampConverter(converter));
+  ;
+  public static final String KV_FIELD = "kv";
+  TimestampSchemeCreator creator;
+  TimestampScheme(TimestampSchemeCreator creator)
+  {
+    this.creator = creator;
+  }
+
+  public static MultiScheme getScheme(String scheme, TimestampConverter converter) {
+    try {
+      TimestampScheme ts = TimestampScheme.valueOf(scheme.toUpperCase());
+      return ts.creator.create(converter);
+    }
+    catch(IllegalArgumentException iae) {
+      return TimestampScheme.FROM_KEY.creator.create(converter);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
new file mode 100644
index 0000000..e88926b
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.spout.MultiScheme;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+
+public interface TimestampSchemeCreator {
+  MultiScheme create(TimestampConverter converter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
new file mode 100644
index 0000000..661ab2f
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.utils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+public class PcapInspector {
+  private static abstract class OptionHandler implements Function<String, Option> {}
+  private enum InspectorOptions {
+    HELP("h", new OptionHandler() {
+
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        return new Option(s, "help", false, "Generate Help screen");
+      }
+    })
+    ,INPUT("i", new OptionHandler() {
+
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "input", true, "Input sequence file on HDFS");
+        o.setArgName("SEQ_FILE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,NUM("n", new OptionHandler() {
+
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "num_packets", true, "Number of packets to dump");
+        o.setArgName("N");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ;
+    Option option;
+    String shortCode;
+
+    InspectorOptions(String shortCode, OptionHandler optionHandler) {
+      this.shortCode = shortCode;
+      this.option = optionHandler.apply(shortCode);
+    }
+
+    public boolean has(CommandLine cli) {
+      return cli.hasOption(shortCode);
+    }
+
+    public String get(CommandLine cli) {
+      return cli.getOptionValue(shortCode);
+    }
+
+    public static CommandLine parse(CommandLineParser parser, String[] args) {
+      try {
+        CommandLine cli = parser.parse(getOptions(), args);
+        if (InspectorOptions.HELP.has(cli)) {
+          printHelp();
+          System.exit(0);
+        }
+        return cli;
+      } catch (ParseException e) {
+        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+        e.printStackTrace(System.err);
+        printHelp();
+        System.exit(-1);
+        return null;
+      }
+    }
+
+    public static void printHelp() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("PcapInspector", getOptions());
+    }
+
+    public static Options getOptions() {
+      Options ret = new Options();
+      for (InspectorOptions o : InspectorOptions.values()) {
+        ret.addOption(o.option);
+      }
+      return ret;
+    }
+  }
+
+  public static final DateFormat DATE_FORMAT = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG
+                                                                                   , SimpleDateFormat.LONG
+                                                                                   );
+  public static void main(String... argv) throws IOException {
+    Configuration conf = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+    CommandLine cli = InspectorOptions.parse(new PosixParser(), otherArgs);
+    Path inputPath = new Path(InspectorOptions.INPUT.get(cli));
+    int n = -1;
+    if(InspectorOptions.NUM.has(cli)) {
+      n = Integer.parseInt(InspectorOptions.NUM.get(cli));
+    }
+    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
+            SequenceFile.Reader.file(inputPath)
+    );
+    LongWritable key = new LongWritable();
+    BytesWritable value = new BytesWritable();
+
+    for(int i = 0;(n < 0 || i < n) && reader.next(key, value);++i) {
+      long millis = Long.divideUnsigned(key.get(), 1000000);
+      String ts = DATE_FORMAT.format(new Date(millis));
+      for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
+        EnumMap<Constants.Fields, Object> result = PcapHelper.packetToFields(pi);
+        List<String> fieldResults = new ArrayList<String>() {{
+          add("TS: " + ts);
+        }};
+        for(Constants.Fields field : Constants.Fields.values()) {
+          if(result.containsKey(field)) {
+            fieldResults.add(field + ": " + result.get(field));
+          }
+        }
+        System.out.println(Joiner.on(",").join(fieldResults));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh
new file mode 100755
index 0000000..a5a24ae
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=${project.artifactId}-$METRON_VERSION.jar
+
+yarn jar $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.PcapInspector "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh b/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh
new file mode 100644
index 0000000..a38f905
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=${project.artifactId}-$METRON_VERSION.jar
+
+storm jar $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/pcap/remote.yaml --filter $METRON_HOME/config/pcap.properties
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
new file mode 100644
index 0000000..70c973a
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.*;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.mr.PcapFilter;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+public class PcapFilterTest {
+  @Test
+  public void testTrivialEquality() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    PcapJob.addToConfig(fields, config);
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+  }
+
+  @Test
+  public void testReverseTraffic() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+    }};
+    PcapJob.addToConfig(fields, config);
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "dst_ip");
+            put(Constants.Fields.SRC_PORT, 1);
+            put(Constants.Fields.DST_ADDR, "src_ip");
+            put(Constants.Fields.DST_PORT, 0);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "dst_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "src_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertFalse(filter.apply(null));
+    }
+  }
+@Test
+public void testMissingDstAddr() throws Exception {
+  Configuration config = new Configuration();
+  final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+    put(Constants.Fields.SRC_ADDR, "src_ip");
+    put(Constants.Fields.SRC_PORT, "0");
+    put(Constants.Fields.DST_PORT, "1");
+    put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+  }};
+  PcapJob.addToConfig(fields, config);
+  {
+    PcapFilter filter = new PcapFilter(config) {
+      @Override
+      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+          put(Constants.Fields.SRC_ADDR, "src_ip");
+          put(Constants.Fields.SRC_PORT, 0);
+          put(Constants.Fields.DST_ADDR, "dst_ip");
+          put(Constants.Fields.DST_PORT, 1);
+        }};
+      }
+    };
+    Assert.assertTrue(filter.apply(null));
+  }
+  {
+    PcapFilter filter = new PcapFilter(config) {
+      @Override
+      protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+        return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+          put(Constants.Fields.SRC_ADDR, "src_ip1");
+          put(Constants.Fields.SRC_PORT, 0);
+          put(Constants.Fields.DST_ADDR, "dst_ip");
+          put(Constants.Fields.DST_PORT, 1);
+        }};
+      }
+    };
+    Assert.assertFalse(filter.apply(null));
+  }
+}
+  @Test
+  public void testMissingDstPort() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    PcapJob.addToConfig(fields, config);
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 100);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 100);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 100);
+          }};
+        }
+      };
+      Assert.assertFalse(filter.apply(null));
+    }
+  }
+  @Test
+  public void testMissingSrcAddr() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_PORT, "0");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    PcapJob.addToConfig(fields, config);
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+  }
+  @Test
+  public void testMissingSrcPort() throws Exception {
+    Configuration config = new Configuration();
+    final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+      put(Constants.Fields.SRC_ADDR, "src_ip");
+      put(Constants.Fields.DST_ADDR, "dst_ip");
+      put(Constants.Fields.DST_PORT, "1");
+      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+    }};
+    PcapJob.addToConfig(fields, config);
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 0);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+    {
+      PcapFilter filter = new PcapFilter(config) {
+        @Override
+        protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+          return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+            put(Constants.Fields.SRC_ADDR, "src_ip");
+            put(Constants.Fields.SRC_PORT, 100);
+            put(Constants.Fields.DST_ADDR, "dst_ip");
+            put(Constants.Fields.DST_PORT, 1);
+          }};
+        }
+      };
+      Assert.assertTrue(filter.apply(null));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
new file mode 100644
index 0000000..17c9325
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PcapJobTest {
+
+  @Test
+  public void test_getPaths_NoFiles() throws Exception {
+    PcapJob job;
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+      }};
+      job = new PcapJob() {
+        @Override
+        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+          return inputFiles;
+        }
+      };
+      Iterable<String> paths = job.getPaths(null, null, 0, 1000);
+      Assert.assertTrue(Iterables.isEmpty(paths));
+    }
+  }
+  @Test
+  public void test_getPaths_leftEdge() throws Exception {
+    PcapJob job;
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+        add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      }};
+      job = new PcapJob() {
+        @Override
+        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+          return inputFiles;
+        }
+      };
+      Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
+      Assert.assertEquals(1,Iterables.size(paths));
+    }
+  }
+  @Test
+  public void test_getPaths_rightEdge() throws Exception {
+    PcapJob job;
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+        add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      }};
+      job = new PcapJob() {
+        @Override
+        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+          return inputFiles;
+        }
+      };
+      Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L-1L, 1461589333993573000L + 1L);
+      Assert.assertEquals(2,Iterables.size(paths));
+    }
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+        add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      }};
+      job = new PcapJob() {
+        @Override
+        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+          return inputFiles;
+        }
+      };
+      Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L-1L, 1461589334993573000L + 1L);
+      Assert.assertEquals(2,Iterables.size(paths));
+    }
+  }
+  @Test
+  public void test_getPaths_bothEdges() throws Exception {
+    PcapJob job;
+    {
+      final List<Path> inputFiles = new ArrayList<Path>() {{
+        add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+        add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+      }};
+      job = new PcapJob() {
+        @Override
+        protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+          return inputFiles;
+        }
+      };
+      Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
+      Assert.assertEquals(3,Iterables.size(paths));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
new file mode 100644
index 0000000..587ec7c
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.integration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import kafka.consumer.ConsumerIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.metron.common.Constants;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.components.MRComponent;
+import org.apache.metron.integration.utils.KafkaUtil;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.PcapMerger;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.spout.pcap.Endianness;
+import org.apache.metron.spout.pcap.scheme.TimestampScheme;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.util.*;
+
+public class PcapTopologyIntegrationTest {
+  final static String KAFKA_TOPIC = "pcap";
+  private static String BASE_DIR = "pcap";
+  private static String DATA_DIR = BASE_DIR + "/data_dir";
+  private static String QUERY_DIR = BASE_DIR + "/query";
+  private String topologiesDir = "src/main/flux";
+  private String targetDir = "target";
+  private File getOutDir(String targetDir) {
+    File outDir = new File(new File(targetDir), DATA_DIR);
+    if (!outDir.exists()) {
+      outDir.mkdirs();
+    }
+
+    return outDir;
+  }
+
+  private File getQueryDir(String targetDir) {
+    File outDir = new File(new File(targetDir), QUERY_DIR);
+    if (!outDir.exists()) {
+      outDir.mkdirs();
+    }
+    return outDir;
+  }
+  private static void clearOutDir(File outDir) {
+    for(File f : outDir.listFiles()) {
+      f.delete();
+    }
+  }
+  private static int numFiles(File outDir, Configuration config) {
+
+    return outDir.list(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return !name.endsWith(".crc");
+      }
+    }).length;
+  }
+
+  private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
+            SequenceFile.Reader.file(pcapFile)
+    );
+    List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>();
+    IntWritable key = new IntWritable();
+    BytesWritable value = new BytesWritable();
+    while (reader.next(key, value)) {
+      byte[] pcapWithHeader = value.copyBytes();
+      long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
+      {
+
+        List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
+        for(PacketInfo pi : info) {
+          Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
+          //IF you are debugging and want to see the packets, uncomment the following.
+          //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
+        }
+      }
+      if(withHeaders) {
+        ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
+      }
+      else {
+        byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
+        System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
+        ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
+      }
+    }
+    return Iterables.limit(ret, 2*(ret.size()/2));
+  }
+
+  @Test
+  public void testTimestampInPacket() throws Exception {
+    testTopology(new Function<Properties, Void>() {
+      @Nullable
+      @Override
+      public Void apply(@Nullable Properties input) {
+        input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_PACKET.toString());
+        return null;
+      }
+    }, (kafkaComponent, pcapEntries) -> kafkaComponent.writeMessages( KAFKA_TOPIC
+                                                                    , Collections2.transform(pcapEntries
+                                                                                            , input -> input.getValue()
+                                                                                            )
+                                                                    )
+    , true
+               );
+  }
+  @Test
+  public void testTimestampInKey() throws Exception {
+    testTopology(new Function<Properties, Void>() {
+      @Nullable
+      @Override
+      public Void apply(@Nullable Properties input) {
+        input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_KEY.toString());
+        return null;
+      }
+    }, new SendEntries() {
+      @Override
+      public void send(KafkaWithZKComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> pcapEntries) throws Exception {
+        Producer<byte[], byte[]> producer = kafkaComponent.createProducer(byte[].class, byte[].class);
+        KafkaUtil.send(producer, pcapEntries, KAFKA_TOPIC, 2);
+        System.out.println("Sent pcap data: " + pcapEntries.size());
+        {
+          int numMessages = 0;
+          ConsumerIterator<?, ?> it = kafkaComponent.getStreamIterator(KAFKA_TOPIC);
+          for (int i = 0; i < pcapEntries.size(); ++i, it.next()) {
+            numMessages++;
+          }
+          Assert.assertEquals(pcapEntries.size(), numMessages);
+          System.out.println("Wrote " + pcapEntries.size() + " to kafka");
+        }
+      }
+    }, false);
+  }
+
+  private static long getTimestamp(int offset, List<Map.Entry<byte[], byte[]>> entries) {
+    return Bytes.toLong(entries.get(offset).getKey());
+  }
+
+  private static interface SendEntries {
+    public void send(KafkaWithZKComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> entries) throws Exception;
+  }
+
+  public void testTopology(Function<Properties, Void> updatePropertiesCallback
+                          ,SendEntries sendPcapEntriesCallback
+                          ,boolean withHeaders
+                          )
+          throws Exception
+  {
+    if (!new File(topologiesDir).exists()) {
+      topologiesDir = UnitTestHelper.findDir("topologies");
+    }
+    targetDir = UnitTestHelper.findDir("target");
+    final File outDir = getOutDir(targetDir);
+    final File queryDir = getQueryDir(targetDir);
+    clearOutDir(outDir);
+    clearOutDir(queryDir);
+
+    File baseDir = new File(new File(targetDir), BASE_DIR);
+    //Assert.assertEquals(0, numFiles(outDir));
+    Assert.assertNotNull(topologiesDir);
+    Assert.assertNotNull(targetDir);
+    Path pcapFile = new Path("../metron-integration-test/src/main/resources/sample/data/SampleInput/PCAPExampleOutput");
+    final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
+    Assert.assertTrue(Iterables.size(pcapEntries) > 0);
+    final Properties topologyProperties = new Properties() {{
+      setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC);
+      setProperty("kafka.pcap.start", "BEGINNING");
+      setProperty("kafka.pcap.out", outDir.getAbsolutePath());
+      setProperty("kafka.pcap.numPackets", "2");
+      setProperty("kafka.pcap.maxTimeMS", "200000000");
+      setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
+    }};
+    updatePropertiesCallback.apply(topologyProperties);
+
+    final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(KAFKA_TOPIC, 1));
+    }})
+            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+                                     @Nullable
+                                     @Override
+                                     public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+
+                                       topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+                                       return null;
+                                     }
+                                   }
+            );
+    //.withExistingZookeeper("localhost:2000");
+
+
+    final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath());
+
+    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml"))
+            .withTopologyName("pcap")
+            .withTopologyProperties(topologyProperties)
+            .build();
+    UnitTestHelper.verboseLogging();
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("mr", mr)
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("storm", fluxComponent)
+            .withMaxTimeMS(-1)
+            .withMillisecondsBetweenAttempts(2000)
+            .withNumRetries(10)
+            .build();
+    try {
+      runner.start();
+      System.out.println("Components started...");
+
+      fluxComponent.submitTopology();
+      sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
+      runner.process(new Processor<Void>() {
+        @Override
+        public ReadinessState process(ComponentRunner runner) {
+          int numFiles = numFiles(outDir, mr.getConfiguration());
+          int expectedNumFiles = pcapEntries.size() / 2;
+          if (numFiles == expectedNumFiles) {
+            return ReadinessState.READY;
+          } else {
+            return ReadinessState.NOT_READY;
+          }
+        }
+
+        @Override
+        public Void getResult() {
+          return null;
+        }
+      });
+      PcapJob job = new PcapJob();
+      {
+        //Ensure that only two pcaps are returned when we look at 4 and 5
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(4, pcapEntries)
+                        , getTimestamp(5, pcapEntries)
+                        , new EnumMap<>(Constants.Fields.class)
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                );
+        Assert.assertEquals(results.size(), 2);
+      }
+      {
+        //ensure that none get returned since that destination IP address isn't in the dataset
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(1, pcapEntries)
+                        , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+                          put(Constants.Fields.DST_ADDR, "207.28.210.1");
+                        }}
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                );
+        Assert.assertEquals(results.size(), 0);
+      }
+      {
+        //same with protocol as before with the destination addr
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(1, pcapEntries)
+                        , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+                          put(Constants.Fields.PROTOCOL, "foo");
+                        }}
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                );
+        Assert.assertEquals(results.size(), 0);
+      }
+      {
+        //make sure I get them all.
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , new EnumMap<>(Constants.Fields.class)
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                );
+        Assert.assertEquals(results.size(), pcapEntries.size());
+      }
+      {
+        List<byte[]> results =
+                job.query(new Path(outDir.getAbsolutePath())
+                        , new Path(queryDir.getAbsolutePath())
+                        , getTimestamp(0, pcapEntries)
+                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+                        , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+                          put(Constants.Fields.DST_PORT, "22");
+                        }}
+                        , new Configuration()
+                        , FileSystem.get(new Configuration())
+                );
+        Assert.assertTrue(results.size() > 0);
+        Assert.assertEquals(results.size()
+                , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+                  @Override
+                  public boolean apply(@Nullable JSONObject input) {
+                    Object prt = input.get(Constants.Fields.DST_PORT.getName());
+                    return prt != null && prt.toString().equals("22");
+                  }
+                }, withHeaders)
+                )
+        );
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PcapMerger.merge(baos, results);
+        Assert.assertTrue(baos.toByteArray().length > 0);
+      }
+      System.out.println("Ended");
+    } finally {
+      runner.stop();
+      clearOutDir(outDir);
+      clearOutDir(queryDir);
+    }
+  }
+
+  public static Function<byte[], Iterable<JSONObject>> TO_JSONS = new Function<byte[], Iterable<JSONObject>>() {
+    @Nullable
+    @Override
+    public Iterable<JSONObject> apply(@Nullable byte[] input) {
+      try {
+        return PcapHelper.toJSON(PcapHelper.toPacketInfo(input));
+      } catch (IOException e) {
+        throw new RuntimeException(e.getMessage(), e);
+      }
+    }
+  };
+
+  private Iterable<JSONObject> filterPcaps(Iterable<Map.Entry<byte[], byte[]>> pcaps
+                                          ,Predicate<JSONObject> predicate
+                                          ,boolean withHeaders
+                                          )
+  {
+    Function<Map.Entry<byte[], byte[]>, byte[]> pcapTransform = null;
+    if(!withHeaders) {
+      final Endianness endianness = Endianness.getNativeEndianness();
+      pcapTransform = kv -> PcapHelper.addGlobalHeader(PcapHelper.addPacketHeader(Bytes.toLong(kv.getKey())
+                                                                                 , kv.getValue()
+                                                                                 , endianness
+                                                                                 )
+                                                      , endianness
+                                                      );
+    }
+    else {
+      pcapTransform = kv -> kv.getValue();
+    }
+    return Iterables.filter(
+              Iterables.concat(
+                      Iterables.transform(
+                              Iterables.transform(pcaps, pcapTransform)
+                                         , TO_JSONS
+                                         )
+                              )
+                           , predicate
+                           );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index 7125382..6a45617 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -99,5 +99,25 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>0.1BETA</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java
new file mode 100644
index 0000000..4a6de4d
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap;
+
+import java.nio.ByteOrder;
+
+public enum Endianness {
+  LITTLE, BIG;
+
+  public static Endianness getNativeEndianness() {
+    if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
+      return BIG;
+    } else {
+      return LITTLE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
new file mode 100644
index 0000000..78f6fd4
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.metron.spout.pcap.Endianness;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.krakenapps.pcap.decoder.ethernet.EthernetType;
+import org.krakenapps.pcap.decoder.ip.IpDecoder;
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+import org.krakenapps.pcap.util.ByteOrderConverter;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+
+import static org.apache.metron.pcap.Constants.*;
+
+public class PcapHelper {
+
+  public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES;
+  public static final int GLOBAL_HEADER_SIZE = 24;
+  private static final Logger LOG = Logger.getLogger(PcapHelper.class);
+  public static ThreadLocal<MetronEthernetDecoder> ETHERNET_DECODER = new ThreadLocal<MetronEthernetDecoder>() {
+    @Override
+    protected MetronEthernetDecoder initialValue() {
+      return createDecoder();
+    }
+  };
+
+  public static Long getTimestamp(String filename) {
+    try {
+      return Long.parseUnsignedLong(Iterables.get(Splitter.on('_').split(filename), 2));
+    }
+    catch(Exception e) {
+      //something went wrong here.
+      return null;
+    }
+  }
+
+  public static String toFilename(String topic, long timestamp, String partition, String uuid)
+  {
+    return Joiner.on("_").join("pcap"
+                              ,topic
+                              , Long.toUnsignedString(timestamp)
+                              ,partition
+                              , uuid
+                              );
+  }
+
+  public static boolean swapBytes(org.apache.metron.spout.pcap.Endianness endianness) {
+    return endianness == org.apache.metron.spout.pcap.Endianness.LITTLE;
+  }
+
+  public static byte[] getPcapGlobalHeader(Endianness endianness) {
+    if(swapBytes(endianness)) {
+      //swap
+      return new byte[] {
+              (byte) 0xd4, (byte) 0xc3, (byte) 0xb2, (byte) 0xa1 //swapped magic number 0xa1b2c3d4
+              , 0x02, 0x00 //swapped major version 2
+              , 0x04, 0x00 //swapped minor version 4
+              , 0x00, 0x00, 0x00, 0x00 //GMT to local tz offset (= 0)
+              , 0x00, 0x00, 0x00, 0x00 //sigfigs (= 0)
+              , (byte) 0xff, (byte) 0xff, 0x00, 0x00 //snaplen (=65535)
+              , 0x01, 0x00, 0x00, 0x00 // swapped link layer header type (1 = ethernet)
+                        };
+    }
+    else {
+      //no need to swap
+      return new byte[] {
+              (byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4 //magic number 0xa1b2c3d4
+              , 0x00, 0x02 //major version 2
+              , 0x00, 0x04 //minor version 4
+              , 0x00, 0x00, 0x00, 0x00 //GMT to local tz offset (= 0)
+              , 0x00, 0x00, 0x00, 0x00 //sigfigs (= 0)
+              , 0x00, 0x00, (byte) 0xff, (byte) 0xff //snaplen (=65535)
+              , 0x00, 0x00, 0x00, 0x01 // link layer header type (1 = ethernet)
+                        };
+    }
+  }
+
+  public static Long getTimestamp(byte[] pcap) {
+    PcapByteInputStream pcapByteInputStream = null;
+    try {
+      pcapByteInputStream = new PcapByteInputStream(pcap);
+      PcapPacket packet = pcapByteInputStream.getPacket();
+      GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+      PacketHeader packetHeader = packet.getPacketHeader();
+      if ( globalHeader.getMagicNumber() == 0xa1b2c3d4 || globalHeader.getMagicNumber() == 0xd4c3b2a1 )
+      {
+        //Time is in micro assemble as nano
+        LOG.info("Times are in micro according to the magic number");
+        return packetHeader.getTsSec() * 1000000000L + packetHeader.getTsUsec() * 1000L ;
+      }
+      else if ( globalHeader.getMagicNumber() == 0xa1b23c4d || globalHeader.getMagicNumber() == 0x4d3cb2a1 ) {
+        //Time is in nano assemble as nano
+        LOG.info("Times are in nano according to the magic number");
+        return packetHeader.getTsSec() * 1000000000L + packetHeader.getTsUsec() ;
+      }
+      //Default assume time is in micro assemble as nano
+      LOG.warn("Unknown magic number. Defaulting to micro");
+      return packetHeader.getTsSec() * 1000000000L + packetHeader.getTsUsec() * 1000L ;
+    }
+    catch(IOException ioe) {
+      //we cannot read the packet, so we return null here.
+      LOG.error("Unable to read packet", ioe);
+    }
+    finally {
+      if(pcapByteInputStream != null) {
+        try {
+          pcapByteInputStream.close();
+        } catch (IOException e) {
+          LOG.error("Unable to close stream", e);
+        }
+      }
+    }
+    return null;
+  }
+  public static byte[] addGlobalHeader(byte[] packet, Endianness endianness) {
+    byte[] globalHeader = getPcapGlobalHeader(endianness);
+    byte[] ret = new byte[packet.length + GLOBAL_HEADER_SIZE];
+    int offset = 0;
+    System.arraycopy(globalHeader, 0, ret, offset, GLOBAL_HEADER_SIZE);
+    offset += globalHeader.length;
+    System.arraycopy(packet, 0, ret, offset, packet.length);
+    return ret;
+  }
+
+  public static byte[] addPacketHeader(long tsNano, byte[] packet, Endianness endianness) {
+    boolean swapBytes = swapBytes(endianness);
+    long micros = Long.divideUnsigned(tsNano, 1000);
+    int secs = (int)(micros / 1000000);
+    int usec = (int)(micros % 1000000);
+    int capLen = packet.length;
+    byte[] ret = new byte[PACKET_HEADER_SIZE + packet.length];
+    int offset = 0;
+    {
+      byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(secs):secs);
+      System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+      offset += Integer.BYTES;
+    }
+    {
+      byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(usec):usec);
+      System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+      offset += Integer.BYTES;
+    }
+    {
+      byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+      System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+      offset += Integer.BYTES;
+    }
+    {
+      byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+      System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+      offset += Integer.BYTES;
+    }
+    System.arraycopy(packet, 0, ret, offset, packet.length);
+    return ret;
+  }
+  public static EnumMap<org.apache.metron.common.Constants.Fields, Object> packetToFields(PacketInfo pi) {
+    EnumMap<org.apache.metron.common.Constants.Fields, Object> ret = new EnumMap(org.apache.metron.common.Constants.Fields.class);
+    if(pi.getTcpPacket() != null) {
+      if(pi.getTcpPacket().getSourceAddress() != null) {
+        ret.put(org.apache.metron.common.Constants.Fields.SRC_ADDR, pi.getTcpPacket().getSourceAddress().getHostAddress());
+      }
+      if(pi.getTcpPacket().getSource() != null ) {
+        ret.put(org.apache.metron.common.Constants.Fields.SRC_PORT, pi.getTcpPacket().getSource().getPort());
+      }
+      if(pi.getTcpPacket().getDestinationAddress() != null ) {
+        ret.put(org.apache.metron.common.Constants.Fields.DST_ADDR, pi.getTcpPacket().getDestinationAddress().getHostAddress());
+      }
+      if(pi.getTcpPacket().getDestination() != null ) {
+        ret.put(org.apache.metron.common.Constants.Fields.DST_PORT, pi.getTcpPacket().getDestination().getPort());
+      }
+      if(pi.getIpv4Packet() != null) {
+        ret.put(org.apache.metron.common.Constants.Fields.PROTOCOL, pi.getIpv4Packet().getProtocol());
+      }
+    }
+    return ret;
+  }
+
+  public static List<PacketInfo> toPacketInfo(byte[] packet) throws IOException {
+    return toPacketInfo(ETHERNET_DECODER.get(), packet);
+  }
+  public static MetronEthernetDecoder createDecoder() {
+    MetronEthernetDecoder ethernetDecoder = new MetronEthernetDecoder();
+    IpDecoder ipDecoder = new IpDecoder();
+    ethernetDecoder.register(EthernetType.IPV4, ipDecoder);
+    return ethernetDecoder;
+  }
+
+/**
+   * Parses the.
+   *
+   * @param pcap
+   *          the pcap
+   * @return the list * @throws IOException Signals that an I/O exception has
+   *         occurred. * @throws IOException * @throws IOException * @throws
+   *         IOException
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static List<PacketInfo> toPacketInfo(MetronEthernetDecoder decoder, byte[] pcap) throws IOException {
+    List<PacketInfo> packetInfoList = new ArrayList<>();
+
+    PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(pcap);
+
+    GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+    while (true) {
+      try
+
+      {
+        PcapPacket packet = pcapByteInputStream.getPacket();
+        // int packetCounter = 0;
+        // PacketHeader packetHeader = null;
+        // Ipv4Packet ipv4Packet = null;
+        TcpPacket tcpPacket = null;
+        UdpPacket udpPacket = null;
+        // Buffer packetDataBuffer = null;
+        int sourcePort = 0;
+        int destinationPort = 0;
+
+        // LOG.trace("Got packet # " + ++packetCounter);
+
+        // LOG.trace(packet.getPacketData());
+        decoder.decode(packet);
+
+        PacketHeader packetHeader = packet.getPacketHeader();
+        Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
+          tcpPacket = TcpPacket.parse(ipv4Packet);
+
+        }
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
+
+          Buffer packetDataBuffer = ipv4Packet.getData();
+          sourcePort = packetDataBuffer.getUnsignedShort();
+          destinationPort = packetDataBuffer.getUnsignedShort();
+
+          udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
+
+          udpPacket.setLength(packetDataBuffer.getUnsignedShort());
+          udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
+          packetDataBuffer.discardReadBytes();
+          udpPacket.setData(packetDataBuffer);
+        }
+
+        packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
+            ipv4Packet, tcpPacket, udpPacket));
+      } catch (NegativeArraySizeException ignored) {
+        LOG.debug("Ignorable exception while parsing packet.", ignored);
+      } catch (EOFException eof) { // $codepro.audit.disable logExceptions
+        // Ignore exception and break
+        break;
+      }
+    }
+    return packetInfoList;
+  }
+
+  public static List<JSONObject> toJSON(List<PacketInfo> packetInfoList) {
+    List<JSONObject> messages = new ArrayList<>();
+    for (PacketInfo packetInfo : packetInfoList) {
+      JSONObject message = (JSONObject) JSONValue.parse(packetInfo.getJsonIndexDoc());
+      messages.add(message);
+    }
+    return messages;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
new file mode 100644
index 0000000..2952a0a
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import com.google.common.base.Predicate;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+
+import javax.annotation.Nullable;
+import java.util.EnumMap;
+import java.util.Map;
+
+
+public class PcapFilter implements Predicate<PacketInfo> {
+
+  private String srcAddr;
+  private Integer srcPort;
+  private String dstAddr;
+  private Integer dstPort;
+  private String protocol;
+  private boolean includesReverseTraffic = false;
+
+
+  public PcapFilter(Iterable<Map.Entry<String, String>> config) {
+    for(Map.Entry<String, String> kv : config) {
+      if(kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) {
+        this.dstAddr = kv.getValue();
+      }
+      if(kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) {
+        this.srcAddr = kv.getValue();
+      }
+      if(kv.getKey().equals(Constants.Fields.DST_PORT.getName())) {
+        this.dstPort = Integer.parseInt(kv.getValue());
+      }
+      if(kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) {
+        this.srcPort = Integer.parseInt(kv.getValue());
+      }
+      if(kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) {
+        this.protocol= kv.getValue();
+      }
+      if(kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) {
+        this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue());
+      }
+    }
+  }
+
+  private boolean matchSourceAndDestination(Object srcAddrObj
+                                           , Object srcPortObj
+                                           , Object dstAddrObj
+                                           , Object dstPortObj
+                                            )
+  {
+    boolean isMatch = true;
+    if(srcAddr != null ) {
+      Object o = srcAddrObj;
+      isMatch &= o != null && o instanceof String && ((String)o).equals(srcAddr);
+    }
+    if(isMatch && srcPort != null ) {
+      Object o = srcPortObj;
+      isMatch &= o != null && o.toString().equals(srcPort.toString());
+    }
+    if(isMatch && dstAddr != null ) {
+      Object o = dstAddrObj;
+      isMatch &= o != null &&  o instanceof String && ((String)o).equals(dstAddr);
+    }
+    if(isMatch && dstPort != null) {
+      Object o = dstPortObj;
+      isMatch &= o != null && o.toString().equals(dstPort.toString());
+    }
+    return isMatch;
+  }
+
+
+  protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+    return PcapHelper.packetToFields(pi);
+  }
+
+  @Override
+  public boolean apply(@Nullable PacketInfo pi ) {
+    boolean isMatch = true;
+    EnumMap<Constants.Fields, Object> input= packetToFields(pi);
+    Object srcAddrObj = input.get(Constants.Fields.SRC_ADDR);
+    Object srcPortObj = input.get(Constants.Fields.SRC_PORT);
+    Object dstAddrObj = input.get(Constants.Fields.DST_ADDR);
+    Object dstPortObj = input.get(Constants.Fields.DST_PORT);
+    Object protocolObj = input.get(Constants.Fields.PROTOCOL);
+
+    //first we ensure the protocol matches if you pass one in
+    if(isMatch && protocol != null ) {
+      Object o = protocolObj;
+      isMatch &= o != null && o.toString().equals(protocol);
+    }
+    if(isMatch) {
+      //if we're still a match, then we try to match the source and destination
+      isMatch &= matchSourceAndDestination(srcAddrObj, srcPortObj, dstAddrObj, dstPortObj);
+      if (!isMatch && includesReverseTraffic) {
+        isMatch = true;
+        //then we have to try the other direction if that the forward direction isn't a match
+        isMatch &= matchSourceAndDestination(dstAddrObj, dstPortObj, srcAddrObj, srcPortObj);
+      }
+    }
+    return isMatch;
+  }
+}