You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/05/17 14:32:57 UTC

metron git commit: METRON-936: Fixes to pcap for performance and testing (mmiklavc via cestella) closes apache/incubator-metron#585

Repository: metron
Updated Branches:
  refs/heads/master be0307659 -> c0b082523


METRON-936: Fixes to pcap for performance and testing (mmiklavc via cestella) closes apache/incubator-metron#585


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c0b08252
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c0b08252
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c0b08252

Branch: refs/heads/master
Commit: c0b0825235ce5c26649282c43f3c57f092864f94
Parents: be03076
Author: mmiklavc <mi...@gmail.com>
Authored: Wed May 17 10:32:46 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Wed May 17 10:32:46 2017 -0400

----------------------------------------------------------------------
 metron-platform/metron-pcap-backend/README.md   | 198 +++++++++++++++++++
 .../src/main/config/pcap.properties             |  10 +-
 .../src/main/flux/pcap/remote.yaml              |  14 +-
 .../metron/spout/pcap/HDFSWriterCallback.java   |  76 ++++---
 .../metron/spout/pcap/HDFSWriterConfig.java     |  47 +++++
 .../metron/spout/pcap/PartitionHDFSWriter.java  |  61 ++++--
 .../pcap/deserializer/FromKeyDeserializer.java  |  22 +--
 .../deserializer/FromPacketDeserializer.java    |  16 +-
 .../pcap/deserializer/KeyValueDeserializer.java |  15 +-
 .../org/apache/metron/utils/PcapInspector.java  |  29 +--
 .../PcapTopologyIntegrationTest.java            |   9 +-
 .../deserializer/FromKeyDeserializerTest.java   |  39 ++++
 .../java/org/apache/metron/pcap/PcapHelper.java |  53 ++++-
 .../java/org/apache/metron/pcap/mr/PcapJob.java |  19 +-
 14 files changed, 505 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index 5b554b5..da19611 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -4,6 +4,16 @@ The purpose of the Metron PCAP backend is to create a storm topology
 capable of ingesting rapidly raw packet capture data directly into HDFS
 from Kafka.
 
+* [Sensors](#the-sensors-feeding-kafka)
+* [PCAP Topology](#the-pcap-topology)
+* [HDFS Files](#the-files-on-hdfs)
+* [Configuration](#configuration)
+* [Starting the Topology](#starting-the-topology)
+* [Utilities](#utilities)
+  * [Inspector Utility](#inspector-utility)
+  * [Query Filter Utility](#query-filter-utility)
+* [Performance Tuning](#performance-tuning)
+
 ## The Sensors Feeding Kafka
 
 This component must be fed by fast packet capture components upstream
@@ -150,3 +160,191 @@ The packet data will be exposed via the`packet` variable in Stellar.
 
 The format of this regular expression is described [here](https://github.com/nishihatapalmer/byteseek/blob/master/sequencesyntax.md).
 
+## Performance Tuning
+The PCAP topology is extremely lightweight and functions as a Spout-only topology. In order to tune the topology, users currently must specify a combination of
+properties in pcap.properties as well as configuration in the pcap remote.yaml flux file itself. Tuning the number of partitions in your Kafka topic
+will have a dramatic impact on performance as well. We ran data into Kafka at 1.1 Gbps and our tests resulted in configuring 128 partitions for our kakfa topic
+along with the following settings in pcap.properties and remote.yaml (unrelated properties for performance have been removed):
+
+### pcap.properties file
+```
+spout.kafka.topic.pcap=pcap
+storm.topology.workers=16
+kafka.spout.parallelism=128
+kafka.pcap.numPackets=1000000000
+kafka.pcap.maxTimeMS=0
+hdfs.replication=1
+hdfs.sync.every=10000
+```
+You'll notice that the number of kakfa partitions equals the spout parallelism, and this is no coincidence. The ordering guarantees for a partition in Kafka enforces that you may have no more
+consumers than 1 per topic. Any additional parallelism will leave you with dormant threads consuming resources but performing no additional work. For our cluster with 4 Storm Supervisors, we found 16 workers to
+provide optimal throughput as well. We were largely IO bound rather than CPU bound with the incoming PCAP data.
+
+### remote.yaml
+In the flux file, we introduced the following configuration:
+
+```
+name: "pcap"
+config:
+    topology.workers: ${storm.topology.workers}
+    topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${storm.auto.credentials}
+    topology.ackers.executors: 0
+components:
+
+  # Any kafka props for the producer go here.
+  - id: "kafkaProps"
+    className: "java.util.HashMap"
+    configMethods:
+      -   name: "put"
+          args:
+            - "value.deserializer"
+            - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+      -   name: "put"
+          args:
+            - "key.deserializer"
+            - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+      -   name: "put"
+          args:
+            - "group.id"
+            - "pcap"
+      -   name: "put"
+          args:
+            - "security.protocol"
+            - "${kafka.security.protocol}"
+      -   name: "put"
+          args:
+            - "poll.timeout.ms"
+            - 100
+      -   name: "put"
+          args:
+            - "offset.commit.period.ms"
+            - 30000
+      -   name: "put"
+          args:
+            - "session.timeout.ms"
+            - 30000
+      -   name: "put"
+          args:
+            - "max.uncommitted.offsets"
+            - 200000000
+      -   name: "put"
+          args:
+            - "max.poll.interval.ms"
+            - 10
+      -   name: "put"
+          args:
+            - "max.poll.records"
+            - 200000
+      -   name: "put"
+          args:
+            - "receive.buffer.bytes"
+            - 431072
+      -   name: "put"
+          args:
+            - "max.partition.fetch.bytes"
+            - 8097152
+
+  - id: "hdfsProps"
+    className: "java.util.HashMap"
+    configMethods:
+      -   name: "put"
+          args:
+            - "io.file.buffer.size"
+            - 1000000
+      -   name: "put"
+          args:
+            - "dfs.blocksize"
+            - 1073741824
+
+  - id: "kafkaConfig"
+    className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+    constructorArgs:
+      - ref: "kafkaProps"
+      # topic name
+      - "${spout.kafka.topic.pcap}"
+      - "${kafka.zk}"
+    configMethods:
+      -   name: "setFirstPollOffsetStrategy"
+          args:
+            # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+            - ${kafka.pcap.start}
+
+  - id: "writerConfig"
+    className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
+    configMethods:
+      -   name: "withOutputPath"
+          args:
+            - "${kafka.pcap.out}"
+      -   name: "withNumPackets"
+          args:
+            - ${kafka.pcap.numPackets}
+      -   name: "withMaxTimeMS"
+          args:
+            - ${kafka.pcap.maxTimeMS}
+      -   name: "withZookeeperQuorum"
+          args:
+            - "${kafka.zk}"
+      -   name: "withSyncEvery"
+          args:
+            - ${hdfs.sync.every}
+      -   name: "withReplicationFactor"
+          args:
+            - ${hdfs.replication}
+      -   name: "withHDFSConfig"
+          args:
+              - ref: "hdfsProps"
+      -   name: "withDeserializer"
+          args:
+            - "${kafka.pcap.ts_scheme}"
+            - "${kafka.pcap.ts_granularity}"
+spouts:
+  - id: "kafkaSpout"
+    className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"
+    parallelism: ${kafka.spout.parallelism}
+    constructorArgs:
+      - ref: "kafkaConfig"
+      - ref: "writerConfig"
+
+```
+
+#### Flux Changes Introduced
+
+##### Topology Configuration
+
+The only change here is `topology.ackers.executors: 0`, which disables Storm tuple acking for maximum throughput.
+
+##### Kafka configuration
+
+```
+poll.timeout.ms
+offset.commit.period.ms
+session.timeout.ms
+max.uncommitted.offsets
+max.poll.interval.ms
+max.poll.records
+receive.buffer.bytes
+max.partition.fetch.bytes
+```
+
+##### Writer Configuration
+
+This is a combination of settings for the HDFSWriter (see pcap.properties values above) as well as HDFS.
+
+__HDFS config__
+
+Component config HashMap with the following properties:
+```
+io.file.buffer.size
+dfs.blocksize
+```
+
+__Writer config__
+
+References the HDFS props component specified above.
+```
+ -   name: "withHDFSConfig"
+     args:
+       - ref: "hdfsProps"
+```
+

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
index 6e51dc5..7160178 100644
--- a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
+++ b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
@@ -15,12 +15,18 @@
 #  limitations under the License.
 
 spout.kafka.topic.pcap=pcap
-storm.auto.credentials=[]
+topology.worker.childopts=
+topology.auto-credentials=[]
+topology.workers=1
 kafka.zk=node1:2181
+hdfs.sync.every=1
+hdfs.replication.factor=-1
 kafka.security.protocol=PLAINTEXT
-kafka.pcap.start=END
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.pcap.start=UNCOMMITTED_EARLIEST
 kafka.pcap.numPackets=1000
 kafka.pcap.maxTimeMS=300000
 kafka.pcap.ts_scheme=FROM_KEY
 kafka.pcap.out=/apps/metron/pcap
 kafka.pcap.ts_granularity=MICROSECONDS
+kafka.spout.parallelism=1

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 2b7e0fd..d7f6f2f 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -16,8 +16,9 @@
 
 name: "pcap"
 config:
-    topology.workers: 1
-    topology.auto-credentials: ${storm.auto.credentials}
+    topology.workers: ${topology.workers}
+    topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
 
 components:
 
@@ -53,7 +54,7 @@ components:
       -   name: "setFirstPollOffsetStrategy"
           args:
             # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-            - "UNCOMMITTED_EARLIEST"
+            - ${kafka.pcap.start}
 
   - id: "writerConfig"
     className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
@@ -70,6 +71,12 @@ components:
       -   name: "withZookeeperQuorum"
           args:
             - "${kafka.zk}"
+      -   name: "withSyncEvery"
+          args:
+            - ${hdfs.sync.every}
+      -   name: "withReplicationFactor"
+          args:
+            - ${hdfs.replication.factor}
       -   name: "withDeserializer"
           args:
             - "${kafka.pcap.ts_scheme}"
@@ -77,6 +84,7 @@ components:
 spouts:
   - id: "kafkaSpout"
     className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"
+    parallelism: ${kafka.spout.parallelism}
     constructorArgs:
       - ref: "kafkaConfig"
       - ref: "writerConfig"

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
index 43cd7e0..a6823e6 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -19,14 +19,13 @@
 package org.apache.metron.spout.pcap;
 
 import com.google.common.base.Joiner;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
+import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
 import org.apache.storm.kafka.Callback;
 import org.apache.storm.kafka.EmitContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.DatatypeConverter;
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,7 +37,7 @@ import java.util.Map;
  */
 public class HDFSWriterCallback implements Callback {
     static final long serialVersionUID = 0xDEADBEEFL;
-    private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HDFSWriterCallback.class);
 
     /**
      * A topic+partition.  We split the files up by topic+partition so the writers don't clobber each other
@@ -80,29 +79,12 @@ public class HDFSWriterCallback implements Callback {
         }
     }
 
-    /**
-     * This is a static container of threadlocal LongWritables and BytesWritables.  This keeps us from having to create so
-     * many objects on the heap.  The Deserializers update these for every packet.
-     */
-    private static class KeyValue {
-        static ThreadLocal<LongWritable> key = new ThreadLocal<LongWritable> () {
-            @Override
-            protected LongWritable initialValue() {
-                return new LongWritable();
-            }
-        };
-        static ThreadLocal<BytesWritable> value = new ThreadLocal<BytesWritable> () {
-            @Override
-            protected BytesWritable initialValue() {
-                return new BytesWritable();
-            }
-        };
-    }
     private HDFSWriterConfig config;
     private EmitContext context;
     private Map<Partition, PartitionHDFSWriter> writers = new HashMap<>();
     private PartitionHDFSWriter lastWriter = null;
     private String topic;
+    private boolean inited = false;
     public HDFSWriterCallback() {
     }
 
@@ -116,35 +98,43 @@ public class HDFSWriterCallback implements Callback {
     public List<Object> apply(List<Object> tuple, EmitContext context) {
         byte[] key = (byte[]) tuple.get(0);
         byte[] value = (byte[]) tuple.get(1);
-        if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) {
-            if(LOG.isDebugEnabled()) {
-                List<String> debugStatements = new ArrayList<>();
-                if(key != null) {
-                    debugStatements.add("Key length: " + key.length);
-                    debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
-                }
-                else {
-                    debugStatements.add("Key is null!");
-                }
-
-                if(value != null) {
-                    debugStatements.add("Value length: " + value.length);
-                    debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
-                }
-                else {
-                    debugStatements.add("Value is null!");
-                }
-                LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
+        long tsDeserializeStart = System.nanoTime();
+        KeyValueDeserializer.Result result = config.getDeserializer().deserializeKeyValue(key, value);
+        long tsDeserializeEnd = System.nanoTime();
+
+        if (LOG.isDebugEnabled() && !result.foundTimestamp) {
+            List<String> debugStatements = new ArrayList<>();
+            if (key != null) {
+                debugStatements.add("Key length: " + key.length);
+                debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
+            } else {
+                debugStatements.add("Key is null!");
             }
+
+            if (value != null) {
+                debugStatements.add("Value length: " + value.length);
+                debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
+            } else {
+                debugStatements.add("Value is null!");
+            }
+            LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
         }
+
+        long tsWriteStart = System.nanoTime();
         try {
             getWriter(new Partition( topic
                                    , context.get(EmitContext.Type.PARTITION))
-                     ).handle(KeyValue.key.get(), KeyValue.value.get());
+                     ).handle(result.key, result.value);
         } catch (IOException e) {
             LOG.error(e.getMessage(), e);
             //drop?  not sure..
         }
+        long tsWriteEnd = System.nanoTime();
+        if(LOG.isDebugEnabled() && (Math.random() < 0.001 || !inited)) {
+            LOG.debug("Deserialize time (ns): " + (tsDeserializeEnd - tsDeserializeStart));
+            LOG.debug("Write time (ns): " + (tsWriteEnd - tsWriteStart));
+        }
+        inited = true;
         return tuple;
     }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
index 66bb359..b6a2809 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
@@ -25,7 +25,9 @@ import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Configure the HDFS Writer for PCap
@@ -34,9 +36,12 @@ public class HDFSWriterConfig implements Serializable {
   static final long serialVersionUID = 0xDEADBEEFL;
   private long numPackets;
   private long maxTimeNS;
+  private int syncEvery = 1;
+  private int replicationFactor = -1;
   private String outputPath;
   private String zookeeperQuorum;
   private KeyValueDeserializer deserializer;
+  private Map<String, Object> hdfsConfig = new HashMap<>();
 
   /**
    * Set the deserializer, the bit of logic that defines how the timestamp and packet are read.
@@ -70,6 +75,36 @@ public class HDFSWriterConfig implements Serializable {
   }
 
   /**
+   * The number of packets to write before a file is rolled.
+   * @param n
+   * @return
+   */
+  public HDFSWriterConfig withSyncEvery(int n) {
+    syncEvery = n;
+    return this;
+  }
+
+  /**
+   * The map config for HDFS
+   * @param config
+   * @return
+   */
+  public HDFSWriterConfig withHDFSConfig(Map<String, Object> config) {
+    hdfsConfig = config;
+    return this;
+  }
+
+  /**
+   * The HDFS replication factor to use. A value of -1 will not set replication factor.
+   * @param n
+   * @return
+   */
+  public HDFSWriterConfig withReplicationFactor(int n) {
+    replicationFactor = n;
+    return this;
+  }
+
+  /**
    * The total amount of time (in ms) to write before a file is rolled.
    * @param t
    * @return
@@ -103,6 +138,10 @@ public class HDFSWriterConfig implements Serializable {
     return out;
   }
 
+  public Map<String, Object> getHDFSConfig() {
+    return hdfsConfig;
+  }
+
   public Integer getZookeeperPort() {
     if(zookeeperQuorum != null) {
       String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null);
@@ -112,6 +151,14 @@ public class HDFSWriterConfig implements Serializable {
     return  null;
   }
 
+  public int getSyncEvery() {
+    return syncEvery;
+  }
+
+  public int getReplicationFactor() {
+    return replicationFactor;
+  }
+
   public KeyValueDeserializer getDeserializer() {
     return deserializer;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
index f0ea1eb..86697db 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -27,18 +27,23 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.log4j.Logger;
 import org.apache.metron.pcap.PcapHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.EnumSet;
+import java.util.Map;
 
 /**
  * This class is intended to handle the writing of an individual file.
  */
 public class PartitionHDFSWriter implements AutoCloseable, Serializable {
   static final long serialVersionUID = 0xDEADBEEFL;
-  private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(PartitionHDFSWriter.class);
 
 
   public static interface SyncHandler {
@@ -102,14 +107,43 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
   private SyncHandler syncHandler;
   private long batchStartTime;
   private long numWritten;
+  private Configuration fsConfig = new Configuration();
 
   public PartitionHDFSWriter(String topic, int partition, String uuid, HDFSWriterConfig config) {
     this.topic = topic;
     this.partition = partition;
     this.uuid = uuid;
     this.config = config;
+
     try {
-      this.fs = FileSystem.get(new Configuration());
+      int replicationFactor = config.getReplicationFactor();
+      if (replicationFactor > 0) {
+        fsConfig.set("dfs.replication", String.valueOf(replicationFactor));
+      }
+      if(config.getHDFSConfig() != null && !config.getHDFSConfig().isEmpty()) {
+        for(Map.Entry<String, Object> entry : config.getHDFSConfig().entrySet()) {
+          if(entry.getValue() instanceof Integer) {
+            fsConfig.setInt(entry.getKey(), (int)entry.getValue());
+          }
+          else if(entry.getValue() instanceof Boolean)
+          {
+            fsConfig.setBoolean(entry.getKey(), (Boolean) entry.getValue());
+          }
+          else if(entry.getValue() instanceof Long)
+          {
+            fsConfig.setLong(entry.getKey(), (Long) entry.getValue());
+          }
+          else if(entry.getValue() instanceof Float)
+          {
+            fsConfig.setFloat(entry.getKey(), (Float) entry.getValue());
+          }
+          else
+          {
+            fsConfig.set(entry.getKey(), String.valueOf(entry.getValue()));
+          }
+        }
+      }
+      this.fs = FileSystem.get(fsConfig);
     } catch (IOException e) {
       throw new RuntimeException("Unable to get FileSystem", e);
     }
@@ -119,11 +153,14 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
     return Long.toUnsignedString(ts);
   }
 
-  public void handle(LongWritable ts, BytesWritable value) throws IOException {
-    turnoverIfNecessary(ts.get());
-    writer.append(ts, value);
-    syncHandler.sync(outputStream);
+  public void handle(long ts, byte[] value) throws IOException {
+    turnoverIfNecessary(ts);
+    BytesWritable bw = new BytesWritable(value);
+    writer.append(new LongWritable(ts), bw);
     numWritten++;
+    if(numWritten % config.getSyncEvery() == 0) {
+      syncHandler.sync(outputStream);
+    }
   }
 
   public String getTopic() {
@@ -144,8 +181,8 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
       outputStream.close();
     }
   }
-  private Path getPath(long ts) {
 
+  private Path getPath(long ts) {
     String fileName = PcapHelper.toFilename(topic, ts, partition + "", uuid);
     return new Path(config.getOutputPath(), fileName);
   }
@@ -157,7 +194,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
   private void turnoverIfNecessary(long ts, boolean force) throws IOException {
     long duration = ts - batchStartTime;
     boolean initial = outputStream == null;
-    boolean overDuration = duration >= config.getMaxTimeNS();
+    boolean overDuration = config.getMaxTimeNS() <= 0 ? false : Long.compareUnsigned(duration, config.getMaxTimeNS()) >= 0;
     boolean tooManyPackets = numWritten >= config.getNumPackets();
     if(force || initial || overDuration || tooManyPackets ) {
       //turnover
@@ -183,14 +220,14 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
         }
 
       }
-      writer = SequenceFile.createWriter(new Configuration()
+      writer = SequenceFile.createWriter(this.fsConfig
               , SequenceFile.Writer.keyClass(LongWritable.class)
               , SequenceFile.Writer.valueClass(BytesWritable.class)
               , SequenceFile.Writer.stream(outputStream)
               , SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)
       );
       //reset state
-      LOG.info("Turning over and writing to " + path);
+      LOG.info("Turning over and writing to {}: [duration={} NS, force={}, initial={}, overDuration={}, tooManyPackets={}]", path, duration, force, initial, overDuration, tooManyPackets);
       batchStartTime = ts;
       numWritten = 0;
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
index de1e24b..749d74c 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
@@ -18,19 +18,18 @@
 
 package org.apache.metron.spout.pcap.deserializer;
 
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
 import org.apache.metron.common.utils.timestamp.TimestampConverter;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.spout.pcap.Endianness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Extract the timestamp from the key and raw data from the packet.
  */
 public class FromKeyDeserializer extends KeyValueDeserializer {
-  private static final Logger LOG = Logger.getLogger(FromKeyDeserializer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FromKeyDeserializer.class);
   private static Endianness endianness = Endianness.getNativeEndianness();
 
 
@@ -39,13 +38,12 @@ public class FromKeyDeserializer extends KeyValueDeserializer {
   }
 
   @Override
-  public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) {
-    Long ts = converter.toNanoseconds(fromBytes(key));
-    outKey.set(ts);
-    byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
-    byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
-    outValue.set(globalHeaderized, 0, globalHeaderized.length);
-    return true;
+  public Result deserializeKeyValue(byte[] key, byte[] value) {
+    if (key == null) {
+      throw new IllegalArgumentException("Expected a key but none provided");
+    }
+    long ts = converter.toNanoseconds(fromBytes(key));
+    return new Result(ts, PcapHelper.addHeaders(ts, value, endianness), true);
   }
 
   /**
@@ -65,6 +63,6 @@ public class FromKeyDeserializer extends KeyValueDeserializer {
       value |= (long)(b & 255);
     }
 
-    return Long.valueOf(value);
+    return value;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
index 6098904..0ba92f8 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
@@ -18,26 +18,24 @@
 
 package org.apache.metron.spout.pcap.deserializer;
 
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
 import org.apache.metron.pcap.PcapHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Extract the timestamp and raw data from the packet.
  */
 public class FromPacketDeserializer extends KeyValueDeserializer {
-  private static final Logger LOG = Logger.getLogger(FromPacketDeserializer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FromPacketDeserializer.class);
 
   @Override
-  public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) {
+  public Result deserializeKeyValue(byte[] key, byte[] value) {
     Long ts = PcapHelper.getTimestamp(value);
     if(ts != null) {
-      outKey.set(ts);
-      outValue.set(value, 0, value.length);
-      return true;
+      return new Result(ts, value, true);
     }
     else {
-      return false;
+      return new Result(ts, value, false);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
index 48bea87..311be04 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
@@ -18,8 +18,6 @@
 
 package org.apache.metron.spout.pcap.deserializer;
 
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.metron.common.utils.timestamp.TimestampConverter;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 
@@ -28,6 +26,17 @@ import java.io.Serializable;
 public abstract class KeyValueDeserializer implements Serializable {
   protected TimestampConverter converter;
 
+  public static class Result {
+    public byte[] value;
+    public Long key;
+    public boolean foundTimestamp;
+    public Result(Long key, byte[] value, boolean foundTimestamp) {
+      this.key = key;
+      this.value = value;
+      this.foundTimestamp = foundTimestamp;
+    }
+  }
+
   public KeyValueDeserializer() {
     this(TimestampConverters.MICROSECONDS);
   }
@@ -36,6 +45,6 @@ public abstract class KeyValueDeserializer implements Serializable {
     this.converter = converter;
   }
 
-  public abstract boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue);
+  public abstract Result deserializeKeyValue(byte[] key, byte[] value);
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
index f460db3..c887606 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
@@ -35,7 +35,10 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
 
 public class PcapInspector {
   private static abstract class OptionHandler implements Function<String, Option> {}
@@ -136,20 +139,24 @@ public class PcapInspector {
     LongWritable key = new LongWritable();
     BytesWritable value = new BytesWritable();
 
-    for(int i = 0;(n < 0 || i < n) && reader.next(key, value);++i) {
+    for (int i = 0; (n < 0 || i < n) && reader.next(key, value); ++i) {
       long millis = Long.divideUnsigned(key.get(), 1000000);
       String ts = DATE_FORMAT.format(new Date(millis));
-      for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
-        Map<String, Object> result = PcapHelper.packetToFields(pi);
-        List<String> fieldResults = new ArrayList<String>() {{
-          add("TS: " + ts);
-        }};
-        for(Constants.Fields field : Constants.Fields.values()) {
-          if(result.containsKey(field.getName())) {
-            fieldResults.add(field.getName() + ": " + result.get(field.getName()));
+      try {
+        for (PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
+          Map<String, Object> result = PcapHelper.packetToFields(pi);
+          List<String> fieldResults = new ArrayList<String>() {{
+            add("TS: " + ts);
+          }};
+          for (Constants.Fields field : Constants.Fields.values()) {
+            if (result.containsKey(field.getName())) {
+              fieldResults.add(field.getName() + ": " + result.get(field.getName()));
+            }
           }
+          System.out.println(Joiner.on(",").join(fieldResults));
         }
-        System.out.println(Joiner.on(",").join(fieldResults));
+      } catch (Exception e) {
+        System.out.println(String.format("Error: malformed packet #=%s, ts=%s, error msg=%s", i + 1, ts, e.getMessage()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index a869723..d6d54dc 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -213,14 +213,19 @@ public class PcapTopologyIntegrationTest {
     final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
     Assert.assertTrue(Iterables.size(pcapEntries) > 0);
     final Properties topologyProperties = new Properties() {{
+      setProperty("topology.workers", "1");
+      setProperty("topology.worker.childopts", "");
       setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC);
-      setProperty("kafka.pcap.start", "BEGINNING");
+      setProperty("kafka.pcap.start", "EARLIEST");
       setProperty("kafka.pcap.out", outDir.getAbsolutePath());
       setProperty("kafka.pcap.numPackets", "2");
       setProperty("kafka.pcap.maxTimeMS", "200000000");
       setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
-      setProperty("storm.auto.credentials", "[]");
+      setProperty("kafka.spout.parallelism", "1");
+      setProperty("topology.auto-credentials", "[]");
       setProperty("kafka.security.protocol", "PLAINTEXT");
+      setProperty("hdfs.sync.every", "1");
+      setProperty("hdfs.replication.factor", "-1");
     }};
     updatePropertiesCallback.apply(topologyProperties);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java
new file mode 100644
index 0000000..1d49103
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.deserializer;
+
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class FromKeyDeserializerTest {
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void empty_or_null_key_throws_illegal_argument_exception() {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Expected a key but none provided");
+
+    FromKeyDeserializer deserializer = new FromKeyDeserializer(TimestampConverters.NANOSECONDS);
+    deserializer.deserializeKeyValue(null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index ebd7ac7..e1ad3ca 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -39,9 +39,10 @@ import org.krakenapps.pcap.util.ByteOrderConverter;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.util.*;
-
-import static org.apache.metron.pcap.Constants.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class PcapHelper {
 
@@ -81,6 +82,14 @@ public class PcapHelper {
     }
   }
 
+  /**
+   *
+   * @param topic
+   * @param timestamp
+   * @param partition kafka partition
+   * @param uuid
+   * @return filename in this format: pcap_topic_timestamp_partition_uuid, e.g. pcap_pcap_1494886105667571000_0_pcap-8-1494965816
+   */
   public static String toFilename(String topic, long timestamp, String partition, String uuid)
   {
     return Joiner.on("_").join("pcap"
@@ -163,6 +172,44 @@ public class PcapHelper {
     }
     return null;
   }
+
+  public static byte[] addHeaders(long tsNano, byte[] packet, Endianness endianness) {
+    byte[] ret = new byte[GLOBAL_HEADER_SIZE + PACKET_HEADER_SIZE + packet.length];
+    byte[] globalHeader = getPcapGlobalHeader(endianness);
+    int offset = 0;
+    System.arraycopy(globalHeader, 0, ret, offset, GLOBAL_HEADER_SIZE);
+    offset += globalHeader.length;
+    {
+      boolean swapBytes = swapBytes(endianness);
+      long micros = Long.divideUnsigned(tsNano, 1000);
+      int secs = (int)(micros / 1000000);
+      int usec = (int)(micros % 1000000);
+      int capLen = packet.length;
+      {
+        byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(secs):secs);
+        System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+        offset += Integer.BYTES;
+      }
+      {
+        byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(usec):usec);
+        System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+        offset += Integer.BYTES;
+      }
+      {
+        byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+        System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+        offset += Integer.BYTES;
+      }
+      {
+        byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+        System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+        offset += Integer.BYTES;
+      }
+    }
+    System.arraycopy(packet, 0, ret, offset, packet.length);
+    return ret;
+  }
+
   public static byte[] addGlobalHeader(byte[] packet, Endianness endianness) {
     byte[] globalHeader = getPcapGlobalHeader(endianness);
     byte[] ret = new byte[packet.length + GLOBAL_HEADER_SIZE];

http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 3d4a4b3..8d40e5f 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -52,6 +52,11 @@ public class PcapJob {
   public static final String START_TS_CONF = "start_ts";
   public static final String END_TS_CONF = "end_ts";
   public static final String WIDTH_CONF = "width";
+
+  public static enum PCAP_COUNTER {
+    MALFORMED_PACKET_COUNT
+  }
+
   public static class PcapPartitioner extends Partitioner<LongWritable, BytesWritable> implements Configurable {
     private Configuration configuration;
     Long start = null;
@@ -110,15 +115,23 @@ public class PcapJob {
         // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
         // objects back to byte arrays, otherwise we could support more than one packet.
         // Note: short-circuit findAny() func on stream
-        boolean send = filteredPacketInfo(value).findAny().isPresent();
+        List<PacketInfo> packetInfos;
+        try {
+          packetInfos = PcapHelper.toPacketInfo(value.copyBytes());
+        } catch(Exception e) {
+          // toPacketInfo is throwing RuntimeExceptions. Attempt to catch and count errors with malformed packets
+          context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1);
+          return;
+        }
+        boolean send = filteredPacketInfo(packetInfos).findAny().isPresent();
         if (send) {
           context.write(key, value);
         }
       }
     }
 
-    private Stream<PacketInfo> filteredPacketInfo(BytesWritable value) throws IOException {
-      return PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter);
+    private Stream<PacketInfo> filteredPacketInfo(List<PacketInfo> packetInfos) throws IOException {
+      return packetInfos.stream().filter(filter);
     }
   }