You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/03/29 12:04:48 UTC

[2/3] incubator-metron git commit: METRON-793: Migrate to storm-kafka-client kafka spout from storm-kafka closes apache/incubator-metron#486

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
index 9ac6d41..43cd7e0 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -18,34 +18,31 @@
 
 package org.apache.metron.spout.pcap;
 
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.log4j.Logger;
 import org.apache.storm.kafka.Callback;
 import org.apache.storm.kafka.EmitContext;
-import org.apache.storm.kafka.PartitionManager;
 
-import javax.annotation.Nullable;
+import javax.xml.bind.DatatypeConverter;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.EnumSet;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * A callback which gets executed as part of the spout to write pcap data to HDFS.
+ */
 public class HDFSWriterCallback implements Callback {
     static final long serialVersionUID = 0xDEADBEEFL;
     private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
 
+    /**
+     * A topic+partition.  We split the files up by topic+partition so the writers don't clobber each other
+     */
     static class Partition {
         String topic;
         int partition;
@@ -83,6 +80,24 @@ public class HDFSWriterCallback implements Callback {
         }
     }
 
+    /**
+     * This is a static container of threadlocal LongWritables and BytesWritables.  This keeps us from having to create so
+     * many objects on the heap.  The Deserializers update these for every packet.
+     */
+    private static class KeyValue {
+        static ThreadLocal<LongWritable> key = new ThreadLocal<LongWritable> () {
+            @Override
+            protected LongWritable initialValue() {
+                return new LongWritable();
+            }
+        };
+        static ThreadLocal<BytesWritable> value = new ThreadLocal<BytesWritable> () {
+            @Override
+            protected BytesWritable initialValue() {
+                return new BytesWritable();
+            }
+        };
+    }
     private HDFSWriterConfig config;
     private EmitContext context;
     private Map<Partition, PartitionHDFSWriter> writers = new HashMap<>();
@@ -96,16 +111,36 @@ public class HDFSWriterCallback implements Callback {
         this.config = config;
         return this;
     }
+
     @Override
     public List<Object> apply(List<Object> tuple, EmitContext context) {
-
-        List<Object> keyValue = (List<Object>) tuple.get(0);
-        LongWritable ts = (LongWritable) keyValue.get(0);
-        BytesWritable rawPacket = (BytesWritable)keyValue.get(1);
+        byte[] key = (byte[]) tuple.get(0);
+        byte[] value = (byte[]) tuple.get(1);
+        if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) {
+            if(LOG.isDebugEnabled()) {
+                List<String> debugStatements = new ArrayList<>();
+                if(key != null) {
+                    debugStatements.add("Key length: " + key.length);
+                    debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
+                }
+                else {
+                    debugStatements.add("Key is null!");
+                }
+
+                if(value != null) {
+                    debugStatements.add("Value length: " + value.length);
+                    debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
+                }
+                else {
+                    debugStatements.add("Value is null!");
+                }
+                LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
+            }
+        }
         try {
             getWriter(new Partition( topic
                                    , context.get(EmitContext.Type.PARTITION))
-                     ).handle(ts, rawPacket);
+                     ).handle(KeyValue.key.get(), KeyValue.value.get());
         } catch (IOException e) {
             LOG.error(e.getMessage(), e);
             //drop?  not sure..
@@ -133,45 +168,15 @@ public class HDFSWriterCallback implements Callback {
     @Override
     public void initialize(EmitContext context) {
         this.context = context;
-        this.topic = context.get(EmitContext.Type.TOPIC);
+        Object topics = context.get(EmitContext.Type.TOPIC);
+        if(topics instanceof List) {
+            this.topic = Joiner.on(",").join((List<String>)topics);
+        }
+        else {
+            this.topic = "" + topics;
+        }
     }
 
-    /**
-     * Closes this resource, relinquishing any underlying resources.
-     * This method is invoked automatically on objects managed by the
-     * {@code try}-with-resources statement.
-     *
-     * <p>While this interface method is declared to throw {@code
-     * Exception}, implementers are <em>strongly</em> encouraged to
-     * declare concrete implementations of the {@code close} method to
-     * throw more specific exceptions, or to throw no exception at all
-     * if the close operation cannot fail.
-     *
-     * <p><em>Implementers of this interface are also strongly advised
-     * to not have the {@code close} method throw {@link
-     * InterruptedException}.</em>
-     *
-     * <p>This exception interacts with a thread's interrupted status,
-     * and runtime misbehavior is likely to occur if an {@code
-     * InterruptedException} is {@linkplain Throwable#addSuppressed
-     * suppressed}.
-     *
-     * <p>More generally, if it would cause problems for an
-     * exception to be suppressed, the {@code AutoCloseable.close}
-     * method should not throw it.
-     *
-     * <p>Note that unlike the {@link Closeable#close close}
-     * method of {@link Closeable}, this {@code close} method
-     * is <em>not</em> required to be idempotent.  In other words,
-     * calling this {@code close} method more than once may have some
-     * visible side effect, unlike {@code Closeable.close} which is
-     * required to have no effect if called more than once.
-     *
-     * <p>However, implementers of this interface are strongly encouraged
-     * to make their {@code close} methods idempotent.
-     *
-     * @throws Exception if this resource cannot be closed
-     */
     @Override
     public void close() throws Exception {
         for(PartitionHDFSWriter writer : writers.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
index b6efbc5..66bb359 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
@@ -20,33 +20,70 @@ package org.apache.metron.spout.pcap;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.spout.pcap.deserializer.Deserializers;
+import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Configure the HDFS Writer for PCap
+ */
 public class HDFSWriterConfig implements Serializable {
   static final long serialVersionUID = 0xDEADBEEFL;
   private long numPackets;
   private long maxTimeNS;
   private String outputPath;
   private String zookeeperQuorum;
+  private KeyValueDeserializer deserializer;
 
+  /**
+   * Set the deserializer, the bit of logic that defines how the timestamp and packet are read.
+   * @param deserializer One of the Deserializers in org.apache.metron.spout.pcap.deserializer.Deserializers
+   * @param timestampConverter One of org.apache.metron.common.utils.timestamp.TimestampConverters.  This defines the units of our timestamp.
+   * @return
+   */
+  public HDFSWriterConfig withDeserializer(String deserializer, String timestampConverter) {
+    this.deserializer = Deserializers.create(deserializer, timestampConverter);
+    return this;
+  }
+
+  /**
+   * The output path in HDFS to write to.
+   * @param path
+   * @return
+   */
   public HDFSWriterConfig withOutputPath(String path) {
     outputPath = path;
     return this;
   }
 
+  /**
+   * The number of packets to write before a file is rolled.
+   * @param n
+   * @return
+   */
   public HDFSWriterConfig withNumPackets(long n) {
     numPackets = n;
     return this;
   }
 
+  /**
+   * The total amount of time (in ms) to write before a file is rolled.
+   * @param t
+   * @return
+   */
   public HDFSWriterConfig withMaxTimeMS(long t) {
     maxTimeNS = TimestampConverters.MILLISECONDS.toNanoseconds(t);
     return this;
   }
 
+  /**
+   * The zookeeper quorum to use.
+   * @param zookeeperQuorum
+   * @return
+   */
   public HDFSWriterConfig withZookeeperQuorum(String zookeeperQuorum) {
     this.zookeeperQuorum = zookeeperQuorum;
     return this;
@@ -75,6 +112,10 @@ public class HDFSWriterConfig implements Serializable {
     return  null;
   }
 
+  public KeyValueDeserializer getDeserializer() {
+    return deserializer;
+  }
+
   public String getOutputPath() {
     return outputPath;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
index e4f1113..ddfd14a 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
@@ -18,14 +18,20 @@
 
 package org.apache.metron.spout.pcap;
 
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.storm.kafka.Callback;
 import org.apache.storm.kafka.CallbackKafkaSpout;
 
-public class KafkaToHDFSSpout extends CallbackKafkaSpout {
+public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> {
   static final long serialVersionUID = 0xDEADBEEFL;
   HDFSWriterConfig config = null;
-  public KafkaToHDFSSpout(SpoutConfig spoutConfig, HDFSWriterConfig config) {
-    super(spoutConfig, HDFSWriterCallback.class);
+  public KafkaToHDFSSpout( SimpleStormKafkaBuilder<byte[], byte[]> spoutConfig
+                         , HDFSWriterConfig config
+                         )
+  {
+    super(spoutConfig
+         , HDFSWriterCallback.class
+         );
     this.config = config;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
index d99a594..f0ea1eb 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -33,6 +33,9 @@ import org.apache.metron.pcap.PcapHelper;
 import java.io.*;
 import java.util.EnumSet;
 
+/**
+ * This class is intended to handle the writing of an individual file.
+ */
 public class PartitionHDFSWriter implements AutoCloseable, Serializable {
   static final long serialVersionUID = 0xDEADBEEFL;
   private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class);
@@ -42,6 +45,10 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
     void sync(FSDataOutputStream outputStream) throws IOException;
   }
 
+  /*
+  The sync handlers are FileSystem specific implementations of sync'ing.  The more often you sync, the more atomic the
+  writing is.  There is a natural tradeoff between sync'ing often and performance.
+   */
   public static enum SyncHandlers implements SyncHandler{
     DEFAULT(new SyncHandler() {
 
@@ -114,7 +121,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
 
   public void handle(LongWritable ts, BytesWritable value) throws IOException {
     turnoverIfNecessary(ts.get());
-    writer.append(ts, new BytesWritable(value.getBytes()));
+    writer.append(ts, value);
     syncHandler.sync(outputStream);
     numWritten++;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
deleted file mode 100644
index 8a76548..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.spout.pcap;
-
-import org.apache.metron.common.utils.timestamp.TimestampConverters;
-import org.apache.metron.spout.pcap.scheme.TimestampScheme;
-import org.apache.storm.kafka.BrokerHosts;
-
-public class SpoutConfig extends org.apache.metron.common.spout.kafka.SpoutConfig{
-
-  public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
-    super(hosts, topic, zkRoot, id);
-  }
-
-  public SpoutConfig withTimestampScheme(String scheme, String granularity) {
-    super.scheme = TimestampScheme.getScheme(scheme, TimestampConverters.getConverter(granularity));
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java
new file mode 100644
index 0000000..e139c27
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.deserializer;
+
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+
+import java.util.function.Function;
+
+/**
+ * Deserializers take the raw bytes from kafka key and value and construct the timestamp and raw bytes for PCAP.
+ */
+public enum Deserializers {
+  /**
+   * Extract the timestamp from the key and the raw packet (global-headerless) from the value
+   */
+   FROM_KEY( converter -> new FromKeyDeserializer(converter))
+  /**
+   * Ignore the key and pull the timestamp directly from the packet itself.  Also, assume that the packet isn't global-headerless.
+   */
+  ,FROM_PACKET(converter -> new FromPacketDeserializer());
+  ;
+  Function<TimestampConverter, KeyValueDeserializer> creator;
+  Deserializers(Function<TimestampConverter, KeyValueDeserializer> creator)
+  {
+    this.creator = creator;
+  }
+
+  public static KeyValueDeserializer create(String scheme, TimestampConverter converter) {
+    try {
+      Deserializers ts = Deserializers.valueOf(scheme.toUpperCase());
+      return ts.creator.apply(converter);
+    }
+    catch(IllegalArgumentException iae) {
+      return Deserializers.FROM_KEY.creator.apply(converter);
+    }
+  }
+
+  public static KeyValueDeserializer create(String scheme, String converter) {
+    return create(scheme, TimestampConverters.getConverter(converter));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
new file mode 100644
index 0000000..de1e24b
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.deserializer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.spout.pcap.Endianness;
+
+
+/**
+ * Extract the timestamp from the key and raw data from the packet.
+ */
+public class FromKeyDeserializer extends KeyValueDeserializer {
+  private static final Logger LOG = Logger.getLogger(FromKeyDeserializer.class);
+  private static Endianness endianness = Endianness.getNativeEndianness();
+
+
+  public FromKeyDeserializer(TimestampConverter converter) {
+    super(converter);
+  }
+
+  @Override
+  public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) {
+    Long ts = converter.toNanoseconds(fromBytes(key));
+    outKey.set(ts);
+    byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
+    byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
+    outValue.set(globalHeaderized, 0, globalHeaderized.length);
+    return true;
+  }
+
+  /**
+   * Convert the byte array representation for a long into a proper long.
+   * @param data
+   * @return a long
+   */
+  private static long fromBytes(byte[] data) {
+    long value = 0L;
+    int len = data.length;
+
+    for(int i = 0; i < len; ++i) {
+      byte b = data[i];
+      //make room in the long
+      value <<= 8;
+      //drop the byte in
+      value |= (long)(b & 255);
+    }
+
+    return Long.valueOf(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
new file mode 100644
index 0000000..6098904
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.deserializer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.metron.pcap.PcapHelper;
+/**
+ * Extract the timestamp and raw data from the packet.
+ */
+public class FromPacketDeserializer extends KeyValueDeserializer {
+  private static final Logger LOG = Logger.getLogger(FromPacketDeserializer.class);
+
+  @Override
+  public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) {
+    Long ts = PcapHelper.getTimestamp(value);
+    if(ts != null) {
+      outKey.set(ts);
+      outValue.set(value, 0, value.length);
+      return true;
+    }
+    else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
new file mode 100644
index 0000000..48bea87
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.deserializer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+
+import java.io.Serializable;
+
+public abstract class KeyValueDeserializer implements Serializable {
+  protected TimestampConverter converter;
+
+  public KeyValueDeserializer() {
+    this(TimestampConverters.MICROSECONDS);
+  }
+
+  public KeyValueDeserializer(TimestampConverter converter) {
+    this.converter = converter;
+  }
+
+  public abstract boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
deleted file mode 100644
index 625cc2d..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.spout.pcap.scheme;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-import org.apache.metron.common.utils.timestamp.TimestampConverter;
-import org.apache.metron.common.utils.timestamp.TimestampConverters;
-import org.apache.metron.pcap.PcapHelper;
-import org.apache.metron.spout.pcap.Endianness;
-import org.apache.storm.kafka.KeyValueScheme;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
-  private static final Logger LOG = Logger.getLogger(FromKeyScheme.class);
-
-  private TimestampConverter converter = TimestampConverters.MICROSECONDS;
-  private static Endianness endianness = Endianness.getNativeEndianness();
-  @Override
-  public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
-    Long ts = converter.toNanoseconds(key.asLongBuffer().get());
-    byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, Utils.toArray(value), endianness);
-    byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
-    return new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(globalHeaderized)));
-  }
-
-  @Override
-  public List<Object> deserialize(ByteBuffer ser) {
-    throw new UnsupportedOperationException("Really only interested in deserializing a key and a value");
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(TimestampScheme.KV_FIELD);
-  }
-
-  @Override
-  public FromKeyScheme withTimestampConverter(TimestampConverter converter) {
-    try {
-      this.converter = converter;
-    }
-    catch(IllegalArgumentException iae) {
-      LOG.error(iae.getMessage(), iae);
-    }
-    return this;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
deleted file mode 100644
index 236db0b..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.spout.pcap.scheme;
-
-import org.apache.storm.spout.MultiScheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-import org.apache.metron.common.utils.timestamp.TimestampConverter;
-import org.apache.metron.pcap.PcapHelper;
-import org.apache.storm.utils.Utils;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class FromPacketScheme implements MultiScheme,KeyConvertible {
-  private static final Logger LOG = Logger.getLogger(FromPacketScheme.class);
-  @Override
-  public Iterable<List<Object>> deserialize(ByteBuffer rawValue) {
-    byte[] value = Utils.toByteArray(rawValue);
-    Long ts = PcapHelper.getTimestamp(value);
-    if(ts != null) {
-      return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(value))));
-    }
-    else {
-      return ImmutableList.of(new Values(Collections.EMPTY_LIST));
-    }
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(TimestampScheme.KV_FIELD);
-  }
-
-
-  @Override
-  public FromPacketScheme withTimestampConverter(TimestampConverter converter) {
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
deleted file mode 100644
index 54e52e8..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.spout.pcap.scheme;
-
-
-import org.apache.metron.common.utils.timestamp.TimestampConverter;
-
-public interface KeyConvertible {
-  KeyConvertible withTimestampConverter(TimestampConverter converter);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
deleted file mode 100644
index 2be55a9..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.spout.pcap.scheme;
-
-import org.apache.storm.spout.MultiScheme;
-import org.apache.metron.common.utils.timestamp.TimestampConverter;
-import org.apache.storm.kafka.KeyValueSchemeAsMultiScheme;
-
-public enum TimestampScheme {
-   FROM_KEY( converter -> new KeyValueSchemeAsMultiScheme(new FromKeyScheme().withTimestampConverter(converter)))
-  ,FROM_PACKET(converter -> new FromPacketScheme().withTimestampConverter(converter));
-  ;
-  public static final String KV_FIELD = "kv";
-  TimestampSchemeCreator creator;
-  TimestampScheme(TimestampSchemeCreator creator)
-  {
-    this.creator = creator;
-  }
-
-  public static MultiScheme getScheme(String scheme, TimestampConverter converter) {
-    try {
-      TimestampScheme ts = TimestampScheme.valueOf(scheme.toUpperCase());
-      return ts.creator.create(converter);
-    }
-    catch(IllegalArgumentException iae) {
-      return TimestampScheme.FROM_KEY.creator.create(converter);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
deleted file mode 100644
index 78aa527..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.spout.pcap.scheme;
-
-import org.apache.storm.spout.MultiScheme;
-import org.apache.metron.common.utils.timestamp.TimestampConverter;
-
-public interface TimestampSchemeCreator {
-  MultiScheme create(TimestampConverter converter);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 00cc62d..8b292d7 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -49,7 +49,7 @@ import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
 import org.apache.metron.spout.pcap.Endianness;
-import org.apache.metron.spout.pcap.scheme.TimestampScheme;
+import org.apache.metron.spout.pcap.deserializer.Deserializers;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
@@ -136,7 +136,7 @@ public class PcapTopologyIntegrationTest {
       @Nullable
       @Override
       public Void apply(@Nullable Properties input) {
-        input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_PACKET.toString());
+        input.setProperty("kafka.pcap.ts_scheme", Deserializers.FROM_PACKET.toString());
         return null;
       }
     }, (kafkaComponent, pcapEntries) -> kafkaComponent.writeMessages( KAFKA_TOPIC
@@ -153,7 +153,7 @@ public class PcapTopologyIntegrationTest {
       @Nullable
       @Override
       public Void apply(@Nullable Properties input) {
-        input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_KEY.toString());
+        input.setProperty("kafka.pcap.ts_scheme", Deserializers.FROM_KEY.toString());
         return null;
       }
     }, new SendEntries() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index 0c20ed8..07fdd46 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -116,15 +116,9 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-            <version>${global_storm_version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>org.apache.curator</artifactId>
-                    <groupId>curator-client</groupId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-storm-kafka</artifactId>
+            <version>${project.parent.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
deleted file mode 100644
index 8e9622c..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Callback extends AutoCloseable, Serializable {
-    List<Object> apply(List<Object> tuple, EmitContext context);
-    void initialize(EmitContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
deleted file mode 100644
index 01b2f5c..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class CallbackCollector extends SpoutOutputCollector implements Serializable {
-    static final long serialVersionUID = 0xDEADBEEFL;
-    Callback _callback;
-    SpoutOutputCollector _delegate;
-    EmitContext _context;
-    public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
-        super(collector);
-        this._callback = callback;
-        this._delegate = collector;
-        this._context = context;
-    }
-
-
-    public static int getPartition(Object messageIdObj) {
-        PartitionManager.KafkaMessageId messageId = (PartitionManager.KafkaMessageId) messageIdObj;
-        return messageId.partition.partition;
-    }
-
-    /**
-     * Emits a new tuple to the specified output stream with the given message ID.
-     * When Storm detects that this tuple has been fully processed, or has failed
-     * to be fully processed, the spout will receive an ack or fail callback respectively
-     * with the messageId as long as the messageId was not null. If the messageId was null,
-     * Storm will not track the tuple and no callback will be received. The emitted values must be
-     * immutable.
-     *
-     * @param streamId
-     * @param tuple
-     * @param messageId
-     * @return the list of task ids that this tuple was sent to
-     */
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
-                                                                       .with(EmitContext.Type.STREAM_ID, streamId)
-                                        );
-        return _delegate.emit(streamId, t, messageId);
-    }
-
-    /**
-     * Emits a new tuple to the default output stream with the given message ID.
-     * When Storm detects that this tuple has been fully processed, or has failed
-     * to be fully processed, the spout will receive an ack or fail callback respectively
-     * with the messageId as long as the messageId was not null. If the messageId was null,
-     * Storm will not track the tuple and no callback will be received. The emitted values must be
-     * immutable.
-     *
-     * @param tuple
-     * @param messageId
-     * @return the list of task ids that this tuple was sent to
-     */
-    @Override
-    public List<Integer> emit(List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)));
-        return _delegate.emit(t, messageId);
-    }
-
-    /**
-     * Emits a tuple to the default output stream with a null message id. Storm will
-     * not track this message so ack and fail will never be called for this tuple. The
-     * emitted values must be immutable.
-     *
-     * @param tuple
-     */
-    @Override
-    public List<Integer> emit(List<Object> tuple) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext());
-        return _delegate.emit(t);
-    }
-
-    /**
-     * Emits a tuple to the specified output stream with a null message id. Storm will
-     * not track this message so ack and fail will never be called for this tuple. The
-     * emitted values must be immutable.
-     *
-     * @param streamId
-     * @param tuple
-     */
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
-        return _delegate.emit(streamId, t);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the specified output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     *
-     * @param taskId
-     * @param streamId
-     * @param tuple
-     * @param messageId
-     */
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
-                                                                       .with(EmitContext.Type.PARTITION, getPartition(messageId))
-                                                                       .with(EmitContext.Type.TASK_ID, taskId)
-                                        );
-        _delegate.emitDirect(taskId, streamId, t, messageId);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the default output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     *
-     * @param taskId
-     * @param tuple
-     * @param messageId
-     */
-    @Override
-    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
-                                                                       .with(EmitContext.Type.TASK_ID, taskId)
-                       );
-        _delegate.emitDirect(taskId, t, messageId);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the specified output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     *
-     * <p> Because no message id is specified, Storm will not track this message
-     * so ack and fail will never be called for this tuple.
-     *
-     * @param taskId
-     * @param streamId
-     * @param tuple
-     */
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
-                                                                       .with(EmitContext.Type.TASK_ID, taskId)
-                       );
-        _delegate.emitDirect(taskId, streamId, t);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the default output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     *
-     * <p> Because no message id is specified, Storm will not track this message
-     * so ack and fail will never be called for this tuple.
-     *
-     * @param taskId
-     * @param tuple
-     */
-    @Override
-    public void emitDirect(int taskId, List<Object> tuple) {
-
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, taskId));
-        _delegate.emitDirect(taskId, t);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
deleted file mode 100644
index 21f675b..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.*;
-
-public class CallbackKafkaSpout extends KafkaSpout {
-    static final long serialVersionUID = 0xDEADBEEFL;
-    Class<? extends Callback> callbackClazz;
-    Callback _callback;
-    EmitContext _context;
-    public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
-        this(spoutConfig, toCallbackClass(callbackClass));
-    }
-
-    public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
-        super(spoutConf);
-        callbackClazz = callback;
-    }
-
-    public void initialize(TopologyContext context) {
-        _callback = createCallback(callbackClazz);
-        _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
-                                    .with(EmitContext.Type.UUID, context.getStormId())
-                                    .with(EmitContext.Type.TOPIC, _spoutConfig.topic);
-        _callback.initialize(_context);
-    }
-
-
-    private static Class<? extends Callback> toCallbackClass(String callbackClass)  {
-        try{
-            return (Class<? extends Callback>) Callback.class.forName(callbackClass);
-        }
-        catch (ClassNotFoundException e) {
-            throw new RuntimeException(callbackClass + " not found", e);
-        }
-    }
-
-    protected Callback createCallback(Class<? extends Callback> callbackClass)  {
-        try {
-            return callbackClass.getConstructor().newInstance();
-        } catch (InstantiationException | NoSuchMethodException | InvocationTargetException e) {
-            throw new RuntimeException("Unable to instantiate callback", e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException("Illegal access", e);
-        }
-    }
-
-    @Override
-    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-        if(_callback == null) {
-            initialize(context);
-        }
-        super.open( conf, context
-                  , new CallbackCollector(_callback, collector
-                                         ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
-                                                                 .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
-                                         )
-                  );
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        if(_callback != null) {
-            try {
-                _callback.close();
-            } catch (Exception e) {
-                throw new IllegalStateException("Unable to close callback", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
deleted file mode 100644
index 434d884..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka;
-
-import org.apache.storm.task.TopologyContext;
-
-import java.io.Serializable;
-import java.util.EnumMap;
-import java.util.Map;
-
-public class EmitContext implements Cloneable,Serializable {
-    static final long serialVersionUID = 0xDEADBEEFL;
-
-    public enum Type{
-        STREAM_ID(String.class)
-        ,TOPIC(String.class)
-        ,PARTITION(Integer.class)
-        ,TASK_ID(Integer.class)
-        ,UUID(String.class)
-        ,SPOUT_CONFIG(SpoutConfig.class)
-        ,OPEN_CONFIG(Map.class)
-        ,TOPOLOGY_CONTEXT(TopologyContext.class)
-        ;
-        Class<?> clazz;
-        Type(Class<?> clazz) {
-            this.clazz=  clazz;
-        }
-
-        public Class<?> clazz() {
-           return clazz;
-        }
-    }
-    public EmitContext() {
-        this(new EnumMap<>(Type.class));
-    }
-    public EmitContext(EnumMap<Type, Object> context) {
-        _context = context;
-    }
-    private EnumMap<Type, Object> _context;
-
-    public <T> EmitContext with(Type t, T o ) {
-        _context.put(t, t.clazz().cast(o));
-        return this;
-    }
-    public <T> void add(Type t, T o ) {
-        with(t, o);
-    }
-
-    public <T> T get(Type t) {
-        Object o = _context.get(t);
-        if(o == null) {
-            return null;
-        }
-        else {
-            return (T) o;
-        }
-    }
-
-    public EmitContext cloneContext() {
-        try {
-            return (EmitContext)this.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException("Unable to clone emit context.", e);
-        }
-    }
-
-    /**
-     * Creates and returns a copy of this object.  The precise meaning
-     * of "copy" may depend on the class of the object. The general
-     * intent is that, for any object {@code x}, the expression:
-     * <blockquote>
-     * <pre>
-     * x.clone() != x</pre></blockquote>
-     * will be true, and that the expression:
-     * <blockquote>
-     * <pre>
-     * x.clone().getClass() == x.getClass()</pre></blockquote>
-     * will be {@code true}, but these are not absolute requirements.
-     * While it is typically the case that:
-     * <blockquote>
-     * <pre>
-     * x.clone().equals(x)</pre></blockquote>
-     * will be {@code true}, this is not an absolute requirement.
-     *
-     * By convention, the returned object should be obtained by calling
-     * {@code super.clone}.  If a class and all of its superclasses (except
-     * {@code Object}) obey this convention, it will be the case that
-     * {@code x.clone().getClass() == x.getClass()}.
-     *
-     * By convention, the object returned by this method should be independent
-     * of this object (which is being cloned).  To achieve this independence,
-     * it may be necessary to modify one or more fields of the object returned
-     * by {@code super.clone} before returning it.  Typically, this means
-     * copying any mutable objects that comprise the internal "deep structure"
-     * of the object being cloned and replacing the references to these
-     * objects with references to the copies.  If a class contains only
-     * primitive fields or references to immutable objects, then it is usually
-     * the case that no fields in the object returned by {@code super.clone}
-     * need to be modified.
-     *
-     * The method {@code clone} for class {@code Object} performs a
-     * specific cloning operation. First, if the class of this object does
-     * not implement the interface {@code Cloneable}, then a
-     * {@code CloneNotSupportedException} is thrown. Note that all arrays
-     * are considered to implement the interface {@code Cloneable} and that
-     * the return type of the {@code clone} method of an array type {@code T[]}
-     * is {@code T[]} where T is any reference or primitive type.
-     * Otherwise, this method creates a new instance of the class of this
-     * object and initializes all its fields with exactly the contents of
-     * the corresponding fields of this object, as if by assignment; the
-     * contents of the fields are not themselves cloned. Thus, this method
-     * performs a "shallow copy" of this object, not a "deep copy" operation.
-     *
-     * The class {@code Object} does not itself implement the interface
-     * {@code Cloneable}, so calling the {@code clone} method on an object
-     * whose class is {@code Object} will result in throwing an
-     * exception at run time.
-     *
-     * @return a clone of this instance.
-     * @throws CloneNotSupportedException if the object's class does not
-     *                                    support the {@code Cloneable} interface. Subclasses
-     *                                    that override the {@code clone} method can also
-     *                                    throw this exception to indicate that an instance cannot
-     *                                    be cloned.
-     * @see Cloneable
-     */
-    @Override
-    protected Object clone() throws CloneNotSupportedException {
-        EmitContext context = new EmitContext(_context.clone());
-        return context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties
index d8dd25f..35f368c 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -22,7 +22,8 @@ indexing.executors=0
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
-kafka.start=WHERE_I_LEFT_OFF
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start=UNCOMMITTED_EARLIEST
 
 ##### Indexing #####
 index.input.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml
new file mode 100644
index 0000000..62b844c
--- /dev/null
+++ b/metron-platform/metron-storm-kafka/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.3.1</version>
+    </parent>
+    <artifactId>metron-storm-kafka</artifactId>
+    <name>metron-storm-kafka</name>
+    <description>Components that extend the Storm/Kafka spout</description>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.config.version>1.10</commons.config.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka-client</artifactId>
+            <version>${global_storm_kafka_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+
+    <reporting>
+        <plugins>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
+
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <configuration>
+                  <targetJdk>${global_java_version}</targetJdk>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <build>
+        <plugins>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
new file mode 100644
index 0000000..bf5250b
--- /dev/null
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.storm.kafka.flux;
+
+import com.google.common.base.Joiner;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.storm.kafka.spout.*;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * This is a convenience layer on top of the KafkaSpoutConfig.Builder available in storm-kafka-client.
+ * The justification for this class is two-fold.  First, there are a lot of moving parts and a simplified
+ * approach to constructing spouts is useful.  Secondly, and perhaps more importantly, the Builder pattern
+ * is decidedly unfriendly to use inside of Flux.  Finally, we can make things a bit more friendly by only requiring
+ * zookeeper and automatically figuring out the brokers for the bootstrap server.
+ *
+ * @param <K> The kafka key type
+ * @param <V> The kafka value type
+ */
+public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V> {
+  final static String STREAM = "default";
+
+  /**
+   * The fields exposed by the kafka consumer.  These will show up in the Storm tuple.
+   */
+  public enum FieldsConfiguration {
+    KEY("key", record -> record.key()),
+    VALUE("value", record -> record.value()),
+    PARTITION("partition", record -> record.partition()),
+    TOPIC("topic", record -> record.topic())
+    ;
+    String fieldName;
+    Function<ConsumerRecord,Object> recordExtractor;
+
+    FieldsConfiguration(String fieldName, Function<ConsumerRecord,Object> recordExtractor) {
+      this.recordExtractor = recordExtractor;
+      this.fieldName = fieldName;
+    }
+
+    /**
+     * Return a list of the enums
+     * @param configs
+     * @return
+     */
+    public static List<FieldsConfiguration> toList(String... configs) {
+      List<FieldsConfiguration> ret = new ArrayList<>();
+      for(String config : configs) {
+        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
+      }
+      return ret;
+    }
+
+    /**
+     * Return a list of the enums from their string representation.
+     * @param configs
+     * @return
+     */
+    public static List<FieldsConfiguration> toList(List<String> configs) {
+      List<FieldsConfiguration> ret = new ArrayList<>();
+      for(String config : configs) {
+        ret.add(FieldsConfiguration.valueOf(config.toUpperCase()));
+      }
+      return ret;
+    }
+
+    /**
+     * Construct a Fields object from an iterable of enums.  These fields are the fields
+     * exposed in the Storm tuple emitted from the spout.
+     * @param configs
+     * @return
+     */
+    public static Fields getFields(Iterable<FieldsConfiguration> configs) {
+      List<String> fields = new ArrayList<>();
+      for(FieldsConfiguration config : configs) {
+        fields.add(config.fieldName);
+      }
+      return new Fields(fields);
+    }
+  }
+
+  /**
+   * Build a tuple given the fields and the topic.  We want to use our FieldsConfiguration enum
+   * to define what this tuple looks like.
+   * @param <K> The key type in kafka
+   * @param <V> The value type in kafka
+   */
+  public static class TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    private List<FieldsConfiguration> configurations;
+    private TupleBuilder(String topic, List<FieldsConfiguration> configurations) {
+      super(topic);
+      this.configurations = configurations;
+    }
+
+    /**
+     * Builds a list of tuples using the ConsumerRecord specified as parameter
+     *
+     * @param consumerRecord whose contents are used to build tuples
+     * @return list of tuples
+     */
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+      Values ret = new Values();
+      for(FieldsConfiguration config : configurations) {
+        ret.add(config.recordExtractor.apply(consumerRecord));
+      }
+      return ret;
+    }
+  }
+
+  private String topic;
+
+  /**
+   * Create an object with the specified properties.  This will expose fields "key" and "value."
+   * @param kafkaProps The special kafka properties
+   * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns.
+   * @param zkQuorum The zookeeper quorum.  We will use this to pull the brokers from this.
+   */
+  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
+                                , String topic
+                                , String zkQuorum
+                                )
+  {
+    this(kafkaProps, topic, zkQuorum, Arrays.asList("key", "value"));
+  }
+
+  /**
+   * Create an object with the specified properties and exposing the specified fields.
+   * @param kafkaProps The special kafka properties
+   * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns.
+   * @param zkQuorum The zookeeper quorum.  We will use this to pull the brokers from this.
+   * @param fieldsConfiguration The fields to expose in the storm tuple emitted.
+   */
+  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
+                                , String topic
+                                , String zkQuorum
+                                , List<String> fieldsConfiguration
+                                )
+  {
+    super( modifyKafkaProps(kafkaProps, zkQuorum)
+         , createStreams(fieldsConfiguration, topic)
+         , createTuplesBuilder(fieldsConfiguration, topic)
+         );
+    this.topic = topic;
+  }
+
+  /**
+   * Get the kafka topic.  TODO: In the future, support multiple topics and regex patterns.
+   * @return
+   */
+  public String getTopic() {
+    return topic;
+  }
+
+  /**
+   * Create a StormKafkaSpout from a given topic, zookeeper quorum and fields.  Also, configure the spout
+   * using a Map that configures both kafka as well as the spout (see the properties in SpoutConfiguration).
+   * @param topic
+   * @param zkQuorum
+   * @param fieldsConfiguration
+   * @param kafkaProps  The aforementioned map.
+   * @return
+   */
+  public static <K, V> StormKafkaSpout<K, V> create( String topic
+                                                   , String zkQuorum
+                                                   , List<String> fieldsConfiguration
+                                                   , Map<String, Object> kafkaProps
+  )
+  {
+    Map<String, Object> spoutConfig = SpoutConfiguration.separate(kafkaProps);
+
+    SimpleStormKafkaBuilder<K, V> builder = new SimpleStormKafkaBuilder<>(kafkaProps, topic, zkQuorum, fieldsConfiguration);
+    SpoutConfiguration.configure(builder, spoutConfig);
+    return new StormKafkaSpout<>(builder);
+  }
+
+  private static Map<String, Object> modifyKafkaProps(Map<String, Object> props, String zkQuorum) {
+    try {
+      if(!props.containsKey(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS)) {
+        //this isn't a putIfAbsent because I only want to pull the brokers from zk if it's absent.
+        List<String> brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum);
+        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, Joiner.on(",").join(brokers));
+      }
+      props.putIfAbsent(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
+      props.putIfAbsent(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
+
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to retrieve brokers from zookeeper: " + e.getMessage(), e);
+    }
+    return props;
+  }
+
+  private static <K,V> KafkaSpoutTuplesBuilder<K, V> createTuplesBuilder(List<String> config, String topic) {
+    TupleBuilder<K, V> tb =  new TupleBuilder<K, V>(topic, FieldsConfiguration.toList(config));
+    return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(tb).build();
+  }
+
+
+  private static KafkaSpoutStreams createStreams(List<String> config, String topic) {
+    final Fields fields = FieldsConfiguration.getFields(FieldsConfiguration.toList(config));
+    return new KafkaSpoutStreamsNamedTopics.Builder(fields, STREAM, new String[] { topic} ).build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
new file mode 100644
index 0000000..6c0f148
--- /dev/null
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.storm.kafka.flux;
+
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * To enable the configuration of spouts with a single map containing both kafka properties as well as spout properties,
+ * this enum exists to expose spout-specific configurations and expose utility functions to split the kafka bits from the
+ * spout-specific bits of configuration.
+ */
+public enum SpoutConfiguration {
+  /**
+   * The poll timeout for the kafka consumer in milliseconds
+   */
+  POLL_TIMEOUT_MS("spout.pollTimeoutMs"
+                 , container -> container.builder.setPollTimeoutMs(ConversionUtils.convert(container.value, Long.class))
+                 )
+  /**
+   * The offset strategy to use.  This can be one of
+   *  * EARLIEST,
+   *  * LATEST,
+   *  * UNCOMMITTED_EARLIEST,
+   *  * UNCOMMITTED_LATEST
+   */
+  ,FIRST_POLL_OFFSET_STRATEGY("spout.firstPollOffsetStrategy"
+                 , container -> container.builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.valueOf(container.value.toString()))
+                 )
+  /**
+   * The maximum number of retries
+   */
+  ,MAX_RETRIES("spout.maxRetries"
+                 , container -> container.builder.setMaxRetries(ConversionUtils.convert(container.value, Integer.class))
+                 )
+  /**
+   * The maximum amount of uncommitted offsets
+   */
+  ,MAX_UNCOMMITTED_OFFSETS("spout.maxUncommittedOffsets"
+                 , container -> container.builder.setMaxUncommittedOffsets(ConversionUtils.convert(container.value, Integer.class))
+                 )
+  /**
+   * The offset commit period in milliseconds
+   */
+  ,OFFSET_COMMIT_PERIOD_MS("spout.offsetCommitPeriodMs"
+                 , container -> container.builder.setOffsetCommitPeriodMs(ConversionUtils.convert(container.value, Long.class))
+                 )
+  ;
+  private static class Container {
+    Map<String, Object> config;
+    KafkaSpoutConfig.Builder builder;
+    Object value;
+    public Container(Map<String, Object> config, KafkaSpoutConfig.Builder builder, Object value) {
+      this.config = config;
+      this.builder = builder;
+      this.value = value;
+    }
+  }
+  Consumer<Container> consumer;
+  public String key;
+  SpoutConfiguration(String key, Consumer<Container> consumer) {
+    this.consumer = consumer;
+    this.key = key;
+  }
+
+  /**
+   * Split the spout-specific configuration from this Map.  NOTE: This mutates the parameter and removes the spout-specific config.
+   * @param config
+   * @return The spout-specific configuration
+   */
+  public static Map<String, Object> separate(Map<String, Object> config) {
+    Map<String, Object> ret = new HashMap<>();
+    for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) {
+      if(config.containsKey(spoutConfig.key)) {
+        Object val = config.get(spoutConfig.key);
+        config.remove(spoutConfig.key);
+        ret.put(spoutConfig.key, val);
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Configure a builder from a configuration.
+   * @param builder
+   * @param config
+   * @param <K>
+   * @param <V>
+   * @return
+   */
+  public static <K, V> KafkaSpoutConfig.Builder configure( KafkaSpoutConfig.Builder<K, V> builder
+                                                         , Map<String, Object> config
+                                                         )
+  {
+    for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) {
+      if(config.containsKey(spoutConfig.key)) {
+        Container container = new Container(config, builder, config.get(spoutConfig.key));
+        spoutConfig.consumer.accept(container);
+      }
+    }
+    return builder;
+  }
+
+  /**
+   * List all of the spout-specific and kafka configuration options.
+   * @return
+   */
+  public static List<String> allOptions() {
+    List<String> ret = new ArrayList<>();
+    for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) {
+      ret.add(spoutConfig.key);
+    }
+    ret.add(KafkaSpoutConfig.Consumer.GROUP_ID);
+    ret.add(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS);
+    ret.add(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT);
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
new file mode 100644
index 0000000..030348f
--- /dev/null
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.storm.kafka.flux;
+
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.log4j.Logger;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * A thin wrapper atop the KafkaSpout to allow us to pass in the Builder rather than the SpoutConfig.
+ * This enables creating a simplified interface suitable for use in flux for this spout.
+ * @param <K>
+ * @param <V>
+ */
+public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> {
+  private static final Logger LOG = Logger.getLogger(StormKafkaSpout.class);
+  protected KafkaSpoutConfig<K,V> _spoutConfig;
+  protected String _topic;
+  public StormKafkaSpout(SimpleStormKafkaBuilder<K,V> builder) {
+    super(builder.build());
+    this._topic = builder.getTopic();
+    this._spoutConfig = builder.build();
+  }
+
+  @Override
+  public void deactivate() {
+    try {
+      super.deactivate();
+    }
+    catch(WakeupException we) {
+      //see https://issues.apache.org/jira/browse/STORM-2184
+      LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      super.close();
+    }
+    catch(WakeupException we) {
+      //see https://issues.apache.org/jira/browse/STORM-2184
+      LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we);
+    }
+  }
+}