You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2016/10/31 17:38:54 UTC

[2/4] incubator-metron git commit: METRON-495: Upgrade Storm to 1.0.x (justinleet via mmiklavc) closes apache/incubator-metron#318

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-parsers/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/resources/log4j.properties b/metron-platform/metron-parsers/src/test/resources/log4j.properties
new file mode 100644
index 0000000..70be8ae
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index dc3069f..649bddd 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -28,17 +28,17 @@
     </properties>
     <dependencies>
         <dependency>
-            <groupId>org.apache.metron</groupId>
-            <artifactId>metron-common</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>${global_hbase_guava_version}</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-client</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>flux-core</artifactId>
             <version>${global_flux_version}</version>
@@ -47,10 +47,15 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-kafka</artifactId>
             <version>${global_storm_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
             <exclusions>
                 <exclusion>
-                    <artifactId>org.apache.curator</artifactId>
-                    <groupId>curator-client</groupId>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-client</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
@@ -76,6 +81,12 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-client</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -83,13 +94,38 @@
             <version>${global_hadoop_version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-common</artifactId>
             <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-client</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.2</artifactId>
+            <artifactId>kafka_2.10</artifactId>
             <version>${global_kafka_version}</version>
             <exclusions>
                 <exclusion>
@@ -99,6 +135,30 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${global_storm_version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 32f994f..272ac13 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -20,7 +20,7 @@ config:
 
 components:
   - id: "zkHosts"
-    className: "storm.kafka.ZkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
     constructorArgs:
       - "${kafka.zk}"
   - id: "kafkaConfig"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
index b2f261e..d9f6831 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.log4j.Logger;
-import storm.kafka.Callback;
-import storm.kafka.EmitContext;
-import storm.kafka.PartitionManager;
+import org.apache.storm.kafka.Callback;
+import org.apache.storm.kafka.EmitContext;
+import org.apache.storm.kafka.PartitionManager;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
index d87e2c0..e4f1113 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
@@ -18,8 +18,8 @@
 
 package org.apache.metron.spout.pcap;
 
-import storm.kafka.Callback;
-import storm.kafka.CallbackKafkaSpout;
+import org.apache.storm.kafka.Callback;
+import org.apache.storm.kafka.CallbackKafkaSpout;
 
 public class KafkaToHDFSSpout extends CallbackKafkaSpout {
   static final long serialVersionUID = 0xDEADBEEFL;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
index 99f9229..8a76548 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
@@ -20,7 +20,7 @@ package org.apache.metron.spout.pcap;
 
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.spout.pcap.scheme.TimestampScheme;
-import storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.BrokerHosts;
 
 public class SpoutConfig extends org.apache.metron.common.spout.kafka.SpoutConfig{
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
index 28aae7a..625cc2d 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
@@ -18,10 +18,10 @@
 
 package org.apache.metron.spout.pcap.scheme;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
@@ -29,8 +29,9 @@ import org.apache.metron.common.utils.timestamp.TimestampConverter;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.spout.pcap.Endianness;
-import storm.kafka.KeyValueScheme;
+import org.apache.storm.kafka.KeyValueScheme;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
@@ -39,15 +40,15 @@ public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
   private TimestampConverter converter = TimestampConverters.MICROSECONDS;
   private static Endianness endianness = Endianness.getNativeEndianness();
   @Override
-  public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
-    Long ts = converter.toNanoseconds(Bytes.toLong(key));
-    byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
+  public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
+    Long ts = converter.toNanoseconds(key.asLongBuffer().get());
+    byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, Utils.toArray(value), endianness);
     byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
     return new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(globalHeaderized)));
   }
 
   @Override
-  public List<Object> deserialize(byte[] ser) {
+  public List<Object> deserialize(ByteBuffer ser) {
     throw new UnsupportedOperationException("Really only interested in deserializing a key and a value");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
index 9b94e2b..236db0b 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
@@ -18,27 +18,29 @@
 
 package org.apache.metron.spout.pcap.scheme;
 
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.MultiScheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.utils.timestamp.TimestampConverter;
 import org.apache.metron.pcap.PcapHelper;
+import org.apache.storm.utils.Utils;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
 public class FromPacketScheme implements MultiScheme,KeyConvertible {
   private static final Logger LOG = Logger.getLogger(FromPacketScheme.class);
   @Override
-  public Iterable<List<Object>> deserialize(byte[] rawValue) {
-    byte[] value = rawValue;
+  public Iterable<List<Object>> deserialize(ByteBuffer rawValue) {
+    byte[] value = Utils.toByteArray(rawValue);
     Long ts = PcapHelper.getTimestamp(value);
     if(ts != null) {
-      return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(rawValue))));
+      return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(value))));
     }
     else {
       return ImmutableList.of(new Values(Collections.EMPTY_LIST));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
index 3f98c44..2be55a9 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
@@ -18,9 +18,9 @@
 
 package org.apache.metron.spout.pcap.scheme;
 
-import backtype.storm.spout.MultiScheme;
+import org.apache.storm.spout.MultiScheme;
 import org.apache.metron.common.utils.timestamp.TimestampConverter;
-import storm.kafka.KeyValueSchemeAsMultiScheme;
+import org.apache.storm.kafka.KeyValueSchemeAsMultiScheme;
 
 public enum TimestampScheme {
    FROM_KEY( converter -> new KeyValueSchemeAsMultiScheme(new FromKeyScheme().withTimestampConverter(converter)))

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
index e88926b..78aa527 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
@@ -18,7 +18,7 @@
 
 package org.apache.metron.spout.pcap.scheme;
 
-import backtype.storm.spout.MultiScheme;
+import org.apache.storm.spout.MultiScheme;
 import org.apache.metron.common.utils.timestamp.TimestampConverter;
 
 public interface TimestampSchemeCreator {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE b/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
index 0905df5..ed6a252 100644
--- a/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
+++ b/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
@@ -219,7 +219,7 @@ This product bundles jaxb-api 2.2.2, which is available under a "Common Developm
 This product bundles stax-api 1.0-2, which is available under a "Common Development and Distribution License v1.0" license.  For details, see https://docs.oracle.com/javase/7/docs/api/javax/xml/stream/package-summary.html
 This product bundles jopt-simple 3.2, which is available under a "MIT Software License" license.  For details, see http://jopt-simple.sourceforge.net
 This product bundles leveldbjni-all 1.8, which is available under a "BSD Software License" license.  For details, see https://github.com/fusesource/leveldbjni
-This product bundles scala-library 2.9.2, which is available under a "BSD Software License" license.  For details, see http://www.scala-lang.org/
+This product bundles scala-library 2.10.6, which is available under a "BSD Software License" license.  For details, see http://www.scala-lang.org/
 This product bundles slf4j-api 1.7.10, which is available under a "MIT Software License" license.  For details, see http://www.slf4j.org
 This product bundles slf4j-log4j12 1.7.10, which is available under a "MIT Software License" license.  For details, see http://www.slf4j.org
 This product bundles xmlenc 0.52, which is available under a "BSD Software License" license.  For details, see http://xmlenc.sourceforge.net

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 382a638..72d2b9a 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -110,7 +110,6 @@ public class PcapTopologyIntegrationTest {
       byte[] pcapWithHeader = value.copyBytes();
       long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
       {
-
         List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
         for(PacketInfo pi : info) {
           Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
@@ -248,7 +247,6 @@ public class PcapTopologyIntegrationTest {
             .build();
     try {
       runner.start();
-      System.out.println("Components started...");
 
       fluxComponent.submitTopology();
       sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index 6b14712..b0019c1 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -85,7 +85,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.2</artifactId>
+            <artifactId>kafka_2.10</artifactId>
             <version>${global_kafka_version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -120,6 +120,21 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-common</artifactId>
             <version>${global_hbase_version}</version>
@@ -132,6 +147,10 @@
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
index e2d56c8..d9bacf6 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
@@ -45,6 +45,19 @@ public class PcapByteInputStream implements PcapInputStream {
 
   /**
    * Opens pcap file input stream.
+   *
+   * @param pcap
+   *          the byte array to be read
+   *
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapByteInputStream(byte[] pcap) throws IOException {
+    this(pcap, 0, pcap.length);
+  }
+
+  /**
+   * Opens pcap file input stream.
    * 
    * @param pcap
    *          the byte array to be read
@@ -52,8 +65,9 @@ public class PcapByteInputStream implements PcapInputStream {
    * @throws IOException
    *           Signals that an I/O exception has occurred.
    */
-  public PcapByteInputStream(byte[] pcap) throws IOException {
-    is = new DataInputStream(new ByteArrayInputStream(pcap)); // $codepro.audit.disable
+  public PcapByteInputStream(byte[] pcap, int offset, int length) throws IOException {
+
+    is = new DataInputStream(new ByteArrayInputStream(pcap, offset, length )); // $codepro.audit.disable
                                                               // closeWhereCreated
     readGlobalHeader();
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index 78f6fd4..e48824f 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -108,10 +108,14 @@ public class PcapHelper {
     }
   }
 
-  public static Long getTimestamp(byte[] pcap) {
+  public static Long getTimestamp(byte[] pcap ) {
+    return getTimestamp(pcap, 0, pcap.length);
+  }
+
+  public static Long getTimestamp(byte[] pcap, int offset, int length) {
     PcapByteInputStream pcapByteInputStream = null;
     try {
-      pcapByteInputStream = new PcapByteInputStream(pcap);
+      pcapByteInputStream = new PcapByteInputStream(pcap, offset, length);
       PcapPacket packet = pcapByteInputStream.getPacket();
       GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
       PacketHeader packetHeader = packet.getPacketHeader();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
index c07a217..a67cd1b 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
@@ -17,7 +17,7 @@
  */
 package org.apache.metron.pcap.writer;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
 import org.apache.metron.hbase.writer.HBaseWriter;
 import org.apache.metron.pcap.utils.PcapUtils;
 import org.json.simple.JSONObject;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
new file mode 100644
index 0000000..8e9622c
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+    List<Object> apply(List<Object> tuple, EmitContext context);
+    void initialize(EmitContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
new file mode 100644
index 0000000..30aee30
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CallbackCollector extends SpoutOutputCollector implements Serializable {
+    static final long serialVersionUID = 0xDEADBEEFL;
+    Callback _callback;
+    SpoutOutputCollector _delegate;
+    EmitContext _context;
+    public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
+        super(collector);
+        this._callback = callback;
+        this._delegate = collector;
+        this._context = context;
+    }
+
+
+    public static int getPartition(Object messageIdObj) {
+        PartitionManager.KafkaMessageId messageId = (PartitionManager.KafkaMessageId) messageIdObj;
+        return messageId.partition.partition;
+    }
+
+    /**
+     * Emits a new tuple to the specified output stream with the given message ID.
+     * When Storm detects that this tuple has been fully processed, or has failed
+     * to be fully processed, the spout will receive an ack or fail callback respectively
+     * with the messageId as long as the messageId was not null. If the messageId was null,
+     * Storm will not track the tuple and no callback will be received. The emitted values must be
+     * immutable.
+     *
+     * @param streamId
+     * @param tuple
+     * @param messageId
+     * @return the list of task ids that this tuple was sent to
+     */
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
+                                                                       .with(EmitContext.Type.STREAM_ID, streamId)
+                                        );
+        return _delegate.emit(streamId, t, messageId);
+    }
+
+    /**
+     * Emits a new tuple to the default output stream with the given message ID.
+     * When Storm detects that this tuple has been fully processed, or has failed
+     * to be fully processed, the spout will receive an ack or fail callback respectively
+     * with the messageId as long as the messageId was not null. If the messageId was null,
+     * Storm will not track the tuple and no callback will be received. The emitted values must be
+     * immutable.
+     *
+     * @param tuple
+     * @param messageId
+     * @return the list of task ids that this tuple was sent to
+     */
+    @Override
+    public List<Integer> emit(List<Object> tuple, Object messageId) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)));
+        return _delegate.emit(t, messageId);
+    }
+
+    /**
+     * Emits a tuple to the default output stream with a null message id. Storm will
+     * not track this message so ack and fail will never be called for this tuple. The
+     * emitted values must be immutable.
+     *
+     * @param tuple
+     */
+    @Override
+    public List<Integer> emit(List<Object> tuple) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext());
+        return _delegate.emit(t);
+    }
+
+    /**
+     * Emits a tuple to the specified output stream with a null message id. Storm will
+     * not track this message so ack and fail will never be called for this tuple. The
+     * emitted values must be immutable.
+     *
+     * @param streamId
+     * @param tuple
+     */
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
+        return _delegate.emit(streamId, t);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the specified output stream. This output
+     * stream must have been declared as a direct stream, and the specified task must
+     * use a direct grouping on this stream to receive the message. The emitted values must be
+     * immutable.
+     *
+     * @param taskId
+     * @param streamId
+     * @param tuple
+     * @param messageId
+     */
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+                                                                       .with(EmitContext.Type.PARTITION, getPartition(messageId))
+                                                                       .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+                                        );
+        _delegate.emitDirect(taskId, streamId, t, messageId);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the default output stream. This output
+     * stream must have been declared as a direct stream, and the specified task must
+     * use a direct grouping on this stream to receive the message. The emitted values must be
+     * immutable.
+     *
+     * @param taskId
+     * @param tuple
+     * @param messageId
+     */
+    @Override
+    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
+                                                                       .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+                       );
+        _delegate.emitDirect(taskId, t, messageId);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the specified output stream. This output
+     * stream must have been declared as a direct stream, and the specified task must
+     * use a direct grouping on this stream to receive the message. The emitted values must be
+     * immutable.
+     * <p/>
+     * <p> Because no message id is specified, Storm will not track this message
+     * so ack and fail will never be called for this tuple.</p>
+     *
+     * @param taskId
+     * @param streamId
+     * @param tuple
+     */
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+                                                                       .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+                       );
+        _delegate.emitDirect(taskId, streamId, t);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the default output stream. This output
+     * stream must have been declared as a direct stream, and the specified task must
+     * use a direct grouping on this stream to receive the message. The emitted values must be
+     * immutable.
+     * <p/>
+     * <p> Because no message id is specified, Storm will not track this message
+     * so ack and fail will never be called for this tuple.</p>
+     *
+     * @param taskId
+     * @param tuple
+     */
+    @Override
+    public void emitDirect(int taskId, List<Object> tuple) {
+
+        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, new Integer(taskId)));
+        _delegate.emitDirect(taskId, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
new file mode 100644
index 0000000..661fb0b
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.*;
+
+public class CallbackKafkaSpout extends KafkaSpout {
+    static final long serialVersionUID = 0xDEADBEEFL;
+    Class<? extends Callback> callbackClazz;
+    Callback _callback;
+    EmitContext _context;
+    public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
+        this(spoutConfig, toCallbackClass(callbackClass));
+    }
+
+    public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
+        super(spoutConf);
+        callbackClazz = callback;
+    }
+
+    public void initialize(TopologyContext context) {
+        _callback = createCallback(callbackClazz);
+        _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
+                                    .with(EmitContext.Type.UUID, context.getStormId())
+                                    .with(EmitContext.Type.TOPIC, _spoutConfig.topic);
+        _callback.initialize(_context);
+    }
+
+
+    private static Class<? extends Callback> toCallbackClass(String callbackClass)  {
+        try{
+            return (Class<? extends Callback>) Callback.class.forName(callbackClass);
+        }
+        catch (ClassNotFoundException e) {
+            throw new RuntimeException(callbackClass + " not found", e);
+        }
+    }
+
+    protected Callback createCallback(Class<? extends Callback> callbackClass)  {
+        try {
+            return callbackClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new RuntimeException("Unable to instantiate callback", e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("Illegal access", e);
+        }
+    }
+
+    @Override
+    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        if(_callback == null) {
+            initialize(context);
+        }
+        super.open( conf, context
+                  , new CallbackCollector(_callback, collector
+                                         ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
+                                                                 .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
+                                         )
+                  );
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if(_callback != null) {
+            try {
+                _callback.close();
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to close callback", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
new file mode 100644
index 0000000..434d884
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.task.TopologyContext;
+
+import java.io.Serializable;
+import java.util.EnumMap;
+import java.util.Map;
+
+public class EmitContext implements Cloneable,Serializable {
+    static final long serialVersionUID = 0xDEADBEEFL;
+
+    public enum Type{
+        STREAM_ID(String.class)
+        ,TOPIC(String.class)
+        ,PARTITION(Integer.class)
+        ,TASK_ID(Integer.class)
+        ,UUID(String.class)
+        ,SPOUT_CONFIG(SpoutConfig.class)
+        ,OPEN_CONFIG(Map.class)
+        ,TOPOLOGY_CONTEXT(TopologyContext.class)
+        ;
+        Class<?> clazz;
+        Type(Class<?> clazz) {
+            this.clazz=  clazz;
+        }
+
+        public Class<?> clazz() {
+           return clazz;
+        }
+    }
+    public EmitContext() {
+        this(new EnumMap<>(Type.class));
+    }
+    public EmitContext(EnumMap<Type, Object> context) {
+        _context = context;
+    }
+    private EnumMap<Type, Object> _context;
+
+    public <T> EmitContext with(Type t, T o ) {
+        _context.put(t, t.clazz().cast(o));
+        return this;
+    }
+    public <T> void add(Type t, T o ) {
+        with(t, o);
+    }
+
+    public <T> T get(Type t) {
+        Object o = _context.get(t);
+        if(o == null) {
+            return null;
+        }
+        else {
+            return (T) o;
+        }
+    }
+
+    public EmitContext cloneContext() {
+        try {
+            return (EmitContext)this.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Unable to clone emit context.", e);
+        }
+    }
+
+    /**
+     * Creates and returns a copy of this object.  The precise meaning
+     * of "copy" may depend on the class of the object. The general
+     * intent is that, for any object {@code x}, the expression:
+     * <blockquote>
+     * <pre>
+     * x.clone() != x</pre></blockquote>
+     * will be true, and that the expression:
+     * <blockquote>
+     * <pre>
+     * x.clone().getClass() == x.getClass()</pre></blockquote>
+     * will be {@code true}, but these are not absolute requirements.
+     * While it is typically the case that:
+     * <blockquote>
+     * <pre>
+     * x.clone().equals(x)</pre></blockquote>
+     * will be {@code true}, this is not an absolute requirement.
+     *
+     * By convention, the returned object should be obtained by calling
+     * {@code super.clone}.  If a class and all of its superclasses (except
+     * {@code Object}) obey this convention, it will be the case that
+     * {@code x.clone().getClass() == x.getClass()}.
+     *
+     * By convention, the object returned by this method should be independent
+     * of this object (which is being cloned).  To achieve this independence,
+     * it may be necessary to modify one or more fields of the object returned
+     * by {@code super.clone} before returning it.  Typically, this means
+     * copying any mutable objects that comprise the internal "deep structure"
+     * of the object being cloned and replacing the references to these
+     * objects with references to the copies.  If a class contains only
+     * primitive fields or references to immutable objects, then it is usually
+     * the case that no fields in the object returned by {@code super.clone}
+     * need to be modified.
+     *
+     * The method {@code clone} for class {@code Object} performs a
+     * specific cloning operation. First, if the class of this object does
+     * not implement the interface {@code Cloneable}, then a
+     * {@code CloneNotSupportedException} is thrown. Note that all arrays
+     * are considered to implement the interface {@code Cloneable} and that
+     * the return type of the {@code clone} method of an array type {@code T[]}
+     * is {@code T[]} where T is any reference or primitive type.
+     * Otherwise, this method creates a new instance of the class of this
+     * object and initializes all its fields with exactly the contents of
+     * the corresponding fields of this object, as if by assignment; the
+     * contents of the fields are not themselves cloned. Thus, this method
+     * performs a "shallow copy" of this object, not a "deep copy" operation.
+     *
+     * The class {@code Object} does not itself implement the interface
+     * {@code Cloneable}, so calling the {@code clone} method on an object
+     * whose class is {@code Object} will result in throwing an
+     * exception at run time.
+     *
+     * @return a clone of this instance.
+     * @throws CloneNotSupportedException if the object's class does not
+     *                                    support the {@code Cloneable} interface. Subclasses
+     *                                    that override the {@code clone} method can also
+     *                                    throw this exception to indicate that an instance cannot
+     *                                    be cloned.
+     * @see Cloneable
+     */
+    @Override
+    protected Object clone() throws CloneNotSupportedException {
+        EmitContext context = new EmitContext(_context.clone());
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
deleted file mode 100644
index 4fc1c55..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Callback extends AutoCloseable, Serializable {
-    List<Object> apply(List<Object> tuple, EmitContext context);
-    void initialize(EmitContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java
deleted file mode 100644
index 6f2f7fa..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class CallbackCollector extends SpoutOutputCollector implements Serializable {
-    static final long serialVersionUID = 0xDEADBEEFL;
-    Callback _callback;
-    SpoutOutputCollector _delegate;
-    EmitContext _context;
-    public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
-        super(collector);
-        this._callback = callback;
-        this._delegate = collector;
-        this._context = context;
-    }
-
-
-    public static int getPartition(Object messageIdObj) {
-        PartitionManager.KafkaMessageId messageId = (PartitionManager.KafkaMessageId) messageIdObj;
-        return messageId.partition.partition;
-    }
-
-    /**
-     * Emits a new tuple to the specified output stream with the given message ID.
-     * When Storm detects that this tuple has been fully processed, or has failed
-     * to be fully processed, the spout will receive an ack or fail callback respectively
-     * with the messageId as long as the messageId was not null. If the messageId was null,
-     * Storm will not track the tuple and no callback will be received. The emitted values must be
-     * immutable.
-     *
-     * @param streamId
-     * @param tuple
-     * @param messageId
-     * @return the list of task ids that this tuple was sent to
-     */
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
-                                                                       .with(EmitContext.Type.STREAM_ID, streamId)
-                                        );
-        return _delegate.emit(streamId, t, messageId);
-    }
-
-    /**
-     * Emits a new tuple to the default output stream with the given message ID.
-     * When Storm detects that this tuple has been fully processed, or has failed
-     * to be fully processed, the spout will receive an ack or fail callback respectively
-     * with the messageId as long as the messageId was not null. If the messageId was null,
-     * Storm will not track the tuple and no callback will be received. The emitted values must be
-     * immutable.
-     *
-     * @param tuple
-     * @param messageId
-     * @return the list of task ids that this tuple was sent to
-     */
-    @Override
-    public List<Integer> emit(List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)));
-        return _delegate.emit(t, messageId);
-    }
-
-    /**
-     * Emits a tuple to the default output stream with a null message id. Storm will
-     * not track this message so ack and fail will never be called for this tuple. The
-     * emitted values must be immutable.
-     *
-     * @param tuple
-     */
-    @Override
-    public List<Integer> emit(List<Object> tuple) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext());
-        return _delegate.emit(t);
-    }
-
-    /**
-     * Emits a tuple to the specified output stream with a null message id. Storm will
-     * not track this message so ack and fail will never be called for this tuple. The
-     * emitted values must be immutable.
-     *
-     * @param streamId
-     * @param tuple
-     */
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
-        return _delegate.emit(streamId, t);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the specified output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     *
-     * @param taskId
-     * @param streamId
-     * @param tuple
-     * @param messageId
-     */
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
-                                                                       .with(EmitContext.Type.PARTITION, getPartition(messageId))
-                                                                       .with(EmitContext.Type.TASK_ID, new Integer(taskId))
-                                        );
-        _delegate.emitDirect(taskId, streamId, t, messageId);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the default output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     *
-     * @param taskId
-     * @param tuple
-     * @param messageId
-     */
-    @Override
-    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
-                                                                       .with(EmitContext.Type.TASK_ID, new Integer(taskId))
-                       );
-        _delegate.emitDirect(taskId, t, messageId);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the specified output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     * <p/>
-     * <p> Because no message id is specified, Storm will not track this message
-     * so ack and fail will never be called for this tuple.</p>
-     *
-     * @param taskId
-     * @param streamId
-     * @param tuple
-     */
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
-                                                                       .with(EmitContext.Type.TASK_ID, new Integer(taskId))
-                       );
-        _delegate.emitDirect(taskId, streamId, t);
-    }
-
-    /**
-     * Emits a tuple to the specified task on the default output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message. The emitted values must be
-     * immutable.
-     * <p/>
-     * <p> Because no message id is specified, Storm will not track this message
-     * so ack and fail will never be called for this tuple.</p>
-     *
-     * @param taskId
-     * @param tuple
-     */
-    @Override
-    public void emitDirect(int taskId, List<Object> tuple) {
-
-        List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, new Integer(taskId)));
-        _delegate.emitDirect(taskId, t);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java
deleted file mode 100644
index 3828d21..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import storm.kafka.*;
-
-import java.util.*;
-
-public class CallbackKafkaSpout extends KafkaSpout {
-    static final long serialVersionUID = 0xDEADBEEFL;
-    Class<? extends Callback> callbackClazz;
-    Callback _callback;
-    EmitContext _context;
-    public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
-        this(spoutConfig, toCallbackClass(callbackClass));
-    }
-
-    public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
-        super(spoutConf);
-        callbackClazz = callback;
-    }
-
-    public void initialize() {
-        _callback = createCallback(callbackClazz);
-        _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
-                                    .with(EmitContext.Type.UUID, _uuid)
-                                    .with(EmitContext.Type.TOPIC, _spoutConfig.topic);
-        _callback.initialize(_context);
-    }
-
-
-    private static Class<? extends Callback> toCallbackClass(String callbackClass)  {
-        try{
-            return (Class<? extends Callback>) Callback.class.forName(callbackClass);
-        }
-        catch (ClassNotFoundException e) {
-            throw new RuntimeException(callbackClass + " not found", e);
-        }
-    }
-
-    protected Callback createCallback(Class<? extends Callback> callbackClass)  {
-        try {
-            return callbackClass.newInstance();
-        } catch (InstantiationException e) {
-            throw new RuntimeException("Unable to instantiate callback", e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException("Illegal access", e);
-        }
-    }
-
-    @Override
-    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-        if(_callback == null) {
-            initialize();
-        }
-        super.open( conf, context
-                  , new CallbackCollector(_callback, collector
-                                         ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
-                                                                 .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
-                                         )
-                  );
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        if(_callback != null) {
-            try {
-                _callback.close();
-            } catch (Exception e) {
-                throw new IllegalStateException("Unable to close callback", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java
deleted file mode 100644
index d2d9df7..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.task.TopologyContext;
-
-import java.io.Serializable;
-import java.util.EnumMap;
-import java.util.Map;
-
-public class EmitContext implements Cloneable,Serializable {
-    static final long serialVersionUID = 0xDEADBEEFL;
-
-    public enum Type{
-        STREAM_ID(String.class)
-        ,TOPIC(String.class)
-        ,PARTITION(Integer.class)
-        ,TASK_ID(Integer.class)
-        ,UUID(String.class)
-        ,SPOUT_CONFIG(SpoutConfig.class)
-        ,OPEN_CONFIG(Map.class)
-        ,TOPOLOGY_CONTEXT(TopologyContext.class)
-        ;
-        Class<?> clazz;
-        Type(Class<?> clazz) {
-            this.clazz=  clazz;
-        }
-
-        public Class<?> clazz() {
-           return clazz;
-        }
-    }
-    public EmitContext() {
-        this(new EnumMap<>(Type.class));
-    }
-    public EmitContext(EnumMap<Type, Object> context) {
-        _context = context;
-    }
-    private EnumMap<Type, Object> _context;
-
-    public <T> EmitContext with(Type t, T o ) {
-        _context.put(t, t.clazz().cast(o));
-        return this;
-    }
-    public <T> void add(Type t, T o ) {
-        with(t, o);
-    }
-
-    public <T> T get(Type t) {
-        Object o = _context.get(t);
-        if(o == null) {
-            return null;
-        }
-        else {
-            return (T) o;
-        }
-    }
-
-    public EmitContext cloneContext() {
-        try {
-            return (EmitContext)this.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException("Unable to clone emit context.", e);
-        }
-    }
-
-    /**
-     * Creates and returns a copy of this object.  The precise meaning
-     * of "copy" may depend on the class of the object. The general
-     * intent is that, for any object {@code x}, the expression:
-     * <blockquote>
-     * <pre>
-     * x.clone() != x</pre></blockquote>
-     * will be true, and that the expression:
-     * <blockquote>
-     * <pre>
-     * x.clone().getClass() == x.getClass()</pre></blockquote>
-     * will be {@code true}, but these are not absolute requirements.
-     * While it is typically the case that:
-     * <blockquote>
-     * <pre>
-     * x.clone().equals(x)</pre></blockquote>
-     * will be {@code true}, this is not an absolute requirement.
-     *
-     * By convention, the returned object should be obtained by calling
-     * {@code super.clone}.  If a class and all of its superclasses (except
-     * {@code Object}) obey this convention, it will be the case that
-     * {@code x.clone().getClass() == x.getClass()}.
-     *
-     * By convention, the object returned by this method should be independent
-     * of this object (which is being cloned).  To achieve this independence,
-     * it may be necessary to modify one or more fields of the object returned
-     * by {@code super.clone} before returning it.  Typically, this means
-     * copying any mutable objects that comprise the internal "deep structure"
-     * of the object being cloned and replacing the references to these
-     * objects with references to the copies.  If a class contains only
-     * primitive fields or references to immutable objects, then it is usually
-     * the case that no fields in the object returned by {@code super.clone}
-     * need to be modified.
-     *
-     * The method {@code clone} for class {@code Object} performs a
-     * specific cloning operation. First, if the class of this object does
-     * not implement the interface {@code Cloneable}, then a
-     * {@code CloneNotSupportedException} is thrown. Note that all arrays
-     * are considered to implement the interface {@code Cloneable} and that
-     * the return type of the {@code clone} method of an array type {@code T[]}
-     * is {@code T[]} where T is any reference or primitive type.
-     * Otherwise, this method creates a new instance of the class of this
-     * object and initializes all its fields with exactly the contents of
-     * the corresponding fields of this object, as if by assignment; the
-     * contents of the fields are not themselves cloned. Thus, this method
-     * performs a "shallow copy" of this object, not a "deep copy" operation.
-     *
-     * The class {@code Object} does not itself implement the interface
-     * {@code Cloneable}, so calling the {@code clone} method on an object
-     * whose class is {@code Object} will result in throwing an
-     * exception at run time.
-     *
-     * @return a clone of this instance.
-     * @throws CloneNotSupportedException if the object's class does not
-     *                                    support the {@code Cloneable} interface. Subclasses
-     *                                    that override the {@code clone} method can also
-     *                                    throw this exception to indicate that an instance cannot
-     *                                    be cloned.
-     * @see Cloneable
-     */
-    @Override
-    protected Object clone() throws CloneNotSupportedException {
-        EmitContext context = new EmitContext(_context.clone());
-        return context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml
index 65077e0..d87e860 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -112,7 +112,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.2</artifactId>
+            <artifactId>kafka_2.10</artifactId>
             <version>${global_kafka_version}</version>
             <exclusions>
                 <exclusion>
@@ -122,6 +122,19 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-integration-test</artifactId>
             <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 4441b29..50b11eb 100644
--- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -17,7 +17,9 @@
  */
 package org.apache.metron.solr.writer;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml
index 1700ee0..04cfe14 100644
--- a/metron-platform/metron-test-utilities/pom.xml
+++ b/metron-platform/metron-test-utilities/pom.xml
@@ -41,11 +41,58 @@
       <version>${global_hbase_guava_version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <version>${global_hadoop_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${global_hadoop_version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-httpclient</artifactId>
+          <groupId>commons-httpclient</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${global_hadoop_version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-httpclient</artifactId>
+          <groupId>commons-httpclient</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
       <version>${global_hbase_version}</version>
       <exclusions>
         <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+        <exclusion>
           <artifactId>log4j</artifactId>
           <groupId>log4j</groupId>
         </exclusion>
@@ -69,7 +116,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.9.2</artifactId>
+      <artifactId>kafka_2.10</artifactId>
       <version>${global_kafka_version}</version>
       <exclusions>
         <exclusion>
@@ -81,12 +128,12 @@
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-recipes</artifactId>
-      <version>2.7.1</version>
+      <version>${global_curator_version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>
-      <version>2.7.1</version>
+      <version>${global_curator_version}</version>
     </dependency>
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
index a655e14..3faa1e1 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
@@ -17,11 +17,11 @@
  */
 package org.apache.metron.test.bolt;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
index 83681c7..ff9e0cf 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
@@ -20,11 +20,11 @@ package org.apache.metron.test.bolt;
 
 import java.util.Map;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
 
 @SuppressWarnings("serial")
 public class PrintingBolt extends BaseRichBolt {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
index a36d99d..58b6fef 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
@@ -29,13 +29,13 @@ import org.apache.metron.test.converters.BinaryConverters;
 import org.apache.metron.test.converters.IConverter;
 import org.apache.metron.test.filereaders.FileReader;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 
 public class GenericInternalTestSpout extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
index 1859c7f..53de800 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -31,9 +31,53 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${global_hadoop_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${global_hadoop_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-common</artifactId>
             <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
+            </exclusions>
             <scope>provided</scope>
         </dependency>
         <dependency>
@@ -54,6 +98,14 @@
                     <groupId>com.google.guava</groupId>
                     <artifactId>guava</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -74,7 +126,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.2</artifactId>
+            <artifactId>kafka_2.10</artifactId>
             <version>${global_kafka_version}</version>
             <scope>provided</scope>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 897ef42..738ccac 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -18,8 +18,8 @@
 
 package org.apache.metron.writer;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index c6d220e..e323056 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -17,7 +17,7 @@
  */
 package org.apache.metron.writer;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index 3b1321b..e3e8150 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -18,7 +18,7 @@
 
 package org.apache.metron.writer;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 161f4bc..a60ddae 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,11 +17,11 @@
  */
 package org.apache.metron.writer.bolt;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index e4d56d9..3af0a93 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -17,7 +17,8 @@
  */
 package org.apache.metron.writer.hdfs;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
index ae0242d..fe9e3e3 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
@@ -18,7 +18,7 @@
 
 package org.apache.metron.writer.hdfs;
 
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 
 import java.util.Map;