You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2016/10/31 17:38:54 UTC
[2/4] incubator-metron git commit: METRON-495: Upgrade Storm to 1.0.x
(justinleet via mmiklavc) closes apache/incubator-metron#318
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-parsers/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/resources/log4j.properties b/metron-platform/metron-parsers/src/test/resources/log4j.properties
new file mode 100644
index 0000000..70be8ae
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index dc3069f..649bddd 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -28,17 +28,17 @@
</properties>
<dependencies>
<dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${global_hbase_guava_version}</version>
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
@@ -47,10 +47,15 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${global_storm_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
<exclusions>
<exclusion>
- <artifactId>org.apache.curator</artifactId>
- <groupId>curator-client</groupId>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -76,6 +81,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -83,13 +94,38 @@
<version>${global_hadoop_version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
+ <artifactId>kafka_2.10</artifactId>
<version>${global_kafka_version}</version>
<exclusions>
<exclusion>
@@ -99,6 +135,30 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${global_kafka_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${global_kafka_version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${global_storm_version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 32f994f..272ac13 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -20,7 +20,7 @@ config:
components:
- id: "zkHosts"
- className: "storm.kafka.ZkHosts"
+ className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "${kafka.zk}"
- id: "kafkaConfig"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
index b2f261e..d9f6831 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.log4j.Logger;
-import storm.kafka.Callback;
-import storm.kafka.EmitContext;
-import storm.kafka.PartitionManager;
+import org.apache.storm.kafka.Callback;
+import org.apache.storm.kafka.EmitContext;
+import org.apache.storm.kafka.PartitionManager;
import javax.annotation.Nullable;
import java.io.Closeable;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
index d87e2c0..e4f1113 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
@@ -18,8 +18,8 @@
package org.apache.metron.spout.pcap;
-import storm.kafka.Callback;
-import storm.kafka.CallbackKafkaSpout;
+import org.apache.storm.kafka.Callback;
+import org.apache.storm.kafka.CallbackKafkaSpout;
public class KafkaToHDFSSpout extends CallbackKafkaSpout {
static final long serialVersionUID = 0xDEADBEEFL;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
index 99f9229..8a76548 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
@@ -20,7 +20,7 @@ package org.apache.metron.spout.pcap;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.spout.pcap.scheme.TimestampScheme;
-import storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.BrokerHosts;
public class SpoutConfig extends org.apache.metron.common.spout.kafka.SpoutConfig{
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
index 28aae7a..625cc2d 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
@@ -18,10 +18,10 @@
package org.apache.metron.spout.pcap.scheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
@@ -29,8 +29,9 @@ import org.apache.metron.common.utils.timestamp.TimestampConverter;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.spout.pcap.Endianness;
-import storm.kafka.KeyValueScheme;
+import org.apache.storm.kafka.KeyValueScheme;
+import java.nio.ByteBuffer;
import java.util.List;
public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
@@ -39,15 +40,15 @@ public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
private TimestampConverter converter = TimestampConverters.MICROSECONDS;
private static Endianness endianness = Endianness.getNativeEndianness();
@Override
- public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
- Long ts = converter.toNanoseconds(Bytes.toLong(key));
- byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
+ public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
+ Long ts = converter.toNanoseconds(key.asLongBuffer().get());
+ byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, Utils.toArray(value), endianness);
byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
return new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(globalHeaderized)));
}
@Override
- public List<Object> deserialize(byte[] ser) {
+ public List<Object> deserialize(ByteBuffer ser) {
throw new UnsupportedOperationException("Really only interested in deserializing a key and a value");
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
index 9b94e2b..236db0b 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
@@ -18,27 +18,29 @@
package org.apache.metron.spout.pcap.scheme;
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.MultiScheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
import org.apache.metron.common.utils.timestamp.TimestampConverter;
import org.apache.metron.pcap.PcapHelper;
+import org.apache.storm.utils.Utils;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
public class FromPacketScheme implements MultiScheme,KeyConvertible {
private static final Logger LOG = Logger.getLogger(FromPacketScheme.class);
@Override
- public Iterable<List<Object>> deserialize(byte[] rawValue) {
- byte[] value = rawValue;
+ public Iterable<List<Object>> deserialize(ByteBuffer rawValue) {
+ byte[] value = Utils.toByteArray(rawValue);
Long ts = PcapHelper.getTimestamp(value);
if(ts != null) {
- return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(rawValue))));
+ return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(value))));
}
else {
return ImmutableList.of(new Values(Collections.EMPTY_LIST));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
index 3f98c44..2be55a9 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
@@ -18,9 +18,9 @@
package org.apache.metron.spout.pcap.scheme;
-import backtype.storm.spout.MultiScheme;
+import org.apache.storm.spout.MultiScheme;
import org.apache.metron.common.utils.timestamp.TimestampConverter;
-import storm.kafka.KeyValueSchemeAsMultiScheme;
+import org.apache.storm.kafka.KeyValueSchemeAsMultiScheme;
public enum TimestampScheme {
FROM_KEY( converter -> new KeyValueSchemeAsMultiScheme(new FromKeyScheme().withTimestampConverter(converter)))
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
index e88926b..78aa527 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
@@ -18,7 +18,7 @@
package org.apache.metron.spout.pcap.scheme;
-import backtype.storm.spout.MultiScheme;
+import org.apache.storm.spout.MultiScheme;
import org.apache.metron.common.utils.timestamp.TimestampConverter;
public interface TimestampSchemeCreator {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE b/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
index 0905df5..ed6a252 100644
--- a/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
+++ b/metron-platform/metron-pcap-backend/src/main/resources/META-INF/LICENSE
@@ -219,7 +219,7 @@ This product bundles jaxb-api 2.2.2, which is available under a "Common Developm
This product bundles stax-api 1.0-2, which is available under a "Common Development and Distribution License v1.0" license. For details, see https://docs.oracle.com/javase/7/docs/api/javax/xml/stream/package-summary.html
This product bundles jopt-simple 3.2, which is available under a "MIT Software License" license. For details, see http://jopt-simple.sourceforge.net
This product bundles leveldbjni-all 1.8, which is available under a "BSD Software License" license. For details, see https://github.com/fusesource/leveldbjni
-This product bundles scala-library 2.9.2, which is available under a "BSD Software License" license. For details, see http://www.scala-lang.org/
+This product bundles scala-library 2.10.6, which is available under a "BSD Software License" license. For details, see http://www.scala-lang.org/
This product bundles slf4j-api 1.7.10, which is available under a "MIT Software License" license. For details, see http://www.slf4j.org
This product bundles slf4j-log4j12 1.7.10, which is available under a "MIT Software License" license. For details, see http://www.slf4j.org
This product bundles xmlenc 0.52, which is available under a "BSD Software License" license. For details, see http://xmlenc.sourceforge.net
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 382a638..72d2b9a 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -110,7 +110,6 @@ public class PcapTopologyIntegrationTest {
byte[] pcapWithHeader = value.copyBytes();
long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
{
-
List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
for(PacketInfo pi : info) {
Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
@@ -248,7 +247,6 @@ public class PcapTopologyIntegrationTest {
.build();
try {
runner.start();
- System.out.println("Components started...");
fluxComponent.submitTopology();
sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index 6b14712..b0019c1 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -85,7 +85,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
+ <artifactId>kafka_2.10</artifactId>
<version>${global_kafka_version}</version>
<scope>provided</scope>
<exclusions>
@@ -120,6 +120,21 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${global_hbase_version}</version>
@@ -132,6 +147,10 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
index e2d56c8..d9bacf6 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
@@ -45,6 +45,19 @@ public class PcapByteInputStream implements PcapInputStream {
/**
* Opens pcap file input stream.
+ *
+ * @param pcap
+ * the byte array to be read
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public PcapByteInputStream(byte[] pcap) throws IOException {
+ this(pcap, 0, pcap.length);
+ }
+
+ /**
+ * Opens pcap file input stream.
*
* @param pcap
* the byte array to be read
@@ -52,8 +65,9 @@ public class PcapByteInputStream implements PcapInputStream {
* @throws IOException
* Signals that an I/O exception has occurred.
*/
- public PcapByteInputStream(byte[] pcap) throws IOException {
- is = new DataInputStream(new ByteArrayInputStream(pcap)); // $codepro.audit.disable
+ public PcapByteInputStream(byte[] pcap, int offset, int length) throws IOException {
+
+ is = new DataInputStream(new ByteArrayInputStream(pcap, offset, length )); // $codepro.audit.disable
// closeWhereCreated
readGlobalHeader();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index 78f6fd4..e48824f 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -108,10 +108,14 @@ public class PcapHelper {
}
}
- public static Long getTimestamp(byte[] pcap) {
+ public static Long getTimestamp(byte[] pcap ) {
+ return getTimestamp(pcap, 0, pcap.length);
+ }
+
+ public static Long getTimestamp(byte[] pcap, int offset, int length) {
PcapByteInputStream pcapByteInputStream = null;
try {
- pcapByteInputStream = new PcapByteInputStream(pcap);
+ pcapByteInputStream = new PcapByteInputStream(pcap, offset, length);
PcapPacket packet = pcapByteInputStream.getPacket();
GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
PacketHeader packetHeader = packet.getPacketHeader();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
index c07a217..a67cd1b 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
@@ -17,7 +17,7 @@
*/
package org.apache.metron.pcap.writer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
import org.apache.metron.hbase.writer.HBaseWriter;
import org.apache.metron.pcap.utils.PcapUtils;
import org.json.simple.JSONObject;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
new file mode 100644
index 0000000..8e9622c
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+ List<Object> apply(List<Object> tuple, EmitContext context);
+ void initialize(EmitContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
new file mode 100644
index 0000000..30aee30
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CallbackCollector extends SpoutOutputCollector implements Serializable {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ Callback _callback;
+ SpoutOutputCollector _delegate;
+ EmitContext _context;
+ public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
+ super(collector);
+ this._callback = callback;
+ this._delegate = collector;
+ this._context = context;
+ }
+
+
+ public static int getPartition(Object messageIdObj) {
+ PartitionManager.KafkaMessageId messageId = (PartitionManager.KafkaMessageId) messageIdObj;
+ return messageId.partition.partition;
+ }
+
+ /**
+ * Emits a new tuple to the specified output stream with the given message ID.
+ * When Storm detects that this tuple has been fully processed, or has failed
+ * to be fully processed, the spout will receive an ack or fail callback respectively
+ * with the messageId as long as the messageId was not null. If the messageId was null,
+ * Storm will not track the tuple and no callback will be received. The emitted values must be
+ * immutable.
+ *
+ * @param streamId
+ * @param tuple
+ * @param messageId
+ * @return the list of task ids that this tuple was sent to
+ */
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
+ .with(EmitContext.Type.STREAM_ID, streamId)
+ );
+ return _delegate.emit(streamId, t, messageId);
+ }
+
+ /**
+ * Emits a new tuple to the default output stream with the given message ID.
+ * When Storm detects that this tuple has been fully processed, or has failed
+ * to be fully processed, the spout will receive an ack or fail callback respectively
+ * with the messageId as long as the messageId was not null. If the messageId was null,
+ * Storm will not track the tuple and no callback will be received. The emitted values must be
+ * immutable.
+ *
+ * @param tuple
+ * @param messageId
+ * @return the list of task ids that this tuple was sent to
+ */
+ @Override
+ public List<Integer> emit(List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)));
+ return _delegate.emit(t, messageId);
+ }
+
+ /**
+ * Emits a tuple to the default output stream with a null message id. Storm will
+ * not track this message so ack and fail will never be called for this tuple. The
+ * emitted values must be immutable.
+ *
+ * @param tuple
+ */
+ @Override
+ public List<Integer> emit(List<Object> tuple) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext());
+ return _delegate.emit(t);
+ }
+
+ /**
+ * Emits a tuple to the specified output stream with a null message id. Storm will
+ * not track this message so ack and fail will never be called for this tuple. The
+ * emitted values must be immutable.
+ *
+ * @param streamId
+ * @param tuple
+ */
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
+ return _delegate.emit(streamId, t);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the specified output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ *
+ * @param taskId
+ * @param streamId
+ * @param tuple
+ * @param messageId
+ */
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+ .with(EmitContext.Type.PARTITION, getPartition(messageId))
+ .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+ );
+ _delegate.emitDirect(taskId, streamId, t, messageId);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the default output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ *
+ * @param taskId
+ * @param tuple
+ * @param messageId
+ */
+ @Override
+ public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
+ .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+ );
+ _delegate.emitDirect(taskId, t, messageId);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the specified output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ * <p/>
+ * <p> Because no message id is specified, Storm will not track this message
+ * so ack and fail will never be called for this tuple.</p>
+ *
+ * @param taskId
+ * @param streamId
+ * @param tuple
+ */
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+ .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+ );
+ _delegate.emitDirect(taskId, streamId, t);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the default output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ * <p/>
+ * <p> Because no message id is specified, Storm will not track this message
+ * so ack and fail will never be called for this tuple.</p>
+ *
+ * @param taskId
+ * @param tuple
+ */
+ @Override
+ public void emitDirect(int taskId, List<Object> tuple) {
+
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, new Integer(taskId)));
+ _delegate.emitDirect(taskId, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
new file mode 100644
index 0000000..661fb0b
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.*;
+
+public class CallbackKafkaSpout extends KafkaSpout {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ Class<? extends Callback> callbackClazz;
+ Callback _callback;
+ EmitContext _context;
+ public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
+ this(spoutConfig, toCallbackClass(callbackClass));
+ }
+
+ public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
+ super(spoutConf);
+ callbackClazz = callback;
+ }
+
+ public void initialize(TopologyContext context) {
+ _callback = createCallback(callbackClazz);
+ _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
+ .with(EmitContext.Type.UUID, context.getStormId())
+ .with(EmitContext.Type.TOPIC, _spoutConfig.topic);
+ _callback.initialize(_context);
+ }
+
+
+ private static Class<? extends Callback> toCallbackClass(String callbackClass) {
+ try{
+ return (Class<? extends Callback>) Callback.class.forName(callbackClass);
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException(callbackClass + " not found", e);
+ }
+ }
+
+ protected Callback createCallback(Class<? extends Callback> callbackClass) {
+ try {
+ return callbackClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException("Unable to instantiate callback", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Illegal access", e);
+ }
+ }
+
+ @Override
+ public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ if(_callback == null) {
+ initialize(context);
+ }
+ super.open( conf, context
+ , new CallbackCollector(_callback, collector
+ ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
+ .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
+ )
+ );
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if(_callback != null) {
+ try {
+ _callback.close();
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to close callback", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
new file mode 100644
index 0000000..434d884
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.task.TopologyContext;
+
+import java.io.Serializable;
+import java.util.EnumMap;
+import java.util.Map;
+
+public class EmitContext implements Cloneable,Serializable {
+ static final long serialVersionUID = 0xDEADBEEFL;
+
+ public enum Type{
+ STREAM_ID(String.class)
+ ,TOPIC(String.class)
+ ,PARTITION(Integer.class)
+ ,TASK_ID(Integer.class)
+ ,UUID(String.class)
+ ,SPOUT_CONFIG(SpoutConfig.class)
+ ,OPEN_CONFIG(Map.class)
+ ,TOPOLOGY_CONTEXT(TopologyContext.class)
+ ;
+ Class<?> clazz;
+ Type(Class<?> clazz) {
+ this.clazz= clazz;
+ }
+
+ public Class<?> clazz() {
+ return clazz;
+ }
+ }
+ public EmitContext() {
+ this(new EnumMap<>(Type.class));
+ }
+ public EmitContext(EnumMap<Type, Object> context) {
+ _context = context;
+ }
+ private EnumMap<Type, Object> _context;
+
+ public <T> EmitContext with(Type t, T o ) {
+ _context.put(t, t.clazz().cast(o));
+ return this;
+ }
+ public <T> void add(Type t, T o ) {
+ with(t, o);
+ }
+
+ public <T> T get(Type t) {
+ Object o = _context.get(t);
+ if(o == null) {
+ return null;
+ }
+ else {
+ return (T) o;
+ }
+ }
+
+ public EmitContext cloneContext() {
+ try {
+ return (EmitContext)this.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Unable to clone emit context.", e);
+ }
+ }
+
+ /**
+ * Creates and returns a copy of this object. The precise meaning
+ * of "copy" may depend on the class of the object. The general
+ * intent is that, for any object {@code x}, the expression:
+ * <blockquote>
+ * <pre>
+ * x.clone() != x</pre></blockquote>
+ * will be true, and that the expression:
+ * <blockquote>
+ * <pre>
+ * x.clone().getClass() == x.getClass()</pre></blockquote>
+ * will be {@code true}, but these are not absolute requirements.
+ * While it is typically the case that:
+ * <blockquote>
+ * <pre>
+ * x.clone().equals(x)</pre></blockquote>
+ * will be {@code true}, this is not an absolute requirement.
+ *
+ * By convention, the returned object should be obtained by calling
+ * {@code super.clone}. If a class and all of its superclasses (except
+ * {@code Object}) obey this convention, it will be the case that
+ * {@code x.clone().getClass() == x.getClass()}.
+ *
+ * By convention, the object returned by this method should be independent
+ * of this object (which is being cloned). To achieve this independence,
+ * it may be necessary to modify one or more fields of the object returned
+ * by {@code super.clone} before returning it. Typically, this means
+ * copying any mutable objects that comprise the internal "deep structure"
+ * of the object being cloned and replacing the references to these
+ * objects with references to the copies. If a class contains only
+ * primitive fields or references to immutable objects, then it is usually
+ * the case that no fields in the object returned by {@code super.clone}
+ * need to be modified.
+ *
+ * The method {@code clone} for class {@code Object} performs a
+ * specific cloning operation. First, if the class of this object does
+ * not implement the interface {@code Cloneable}, then a
+ * {@code CloneNotSupportedException} is thrown. Note that all arrays
+ * are considered to implement the interface {@code Cloneable} and that
+ * the return type of the {@code clone} method of an array type {@code T[]}
+ * is {@code T[]} where T is any reference or primitive type.
+ * Otherwise, this method creates a new instance of the class of this
+ * object and initializes all its fields with exactly the contents of
+ * the corresponding fields of this object, as if by assignment; the
+ * contents of the fields are not themselves cloned. Thus, this method
+ * performs a "shallow copy" of this object, not a "deep copy" operation.
+ *
+ * The class {@code Object} does not itself implement the interface
+ * {@code Cloneable}, so calling the {@code clone} method on an object
+ * whose class is {@code Object} will result in throwing an
+ * exception at run time.
+ *
+ * @return a clone of this instance.
+ * @throws CloneNotSupportedException if the object's class does not
+ * support the {@code Cloneable} interface. Subclasses
+ * that override the {@code clone} method can also
+ * throw this exception to indicate that an instance cannot
+ * be cloned.
+ * @see Cloneable
+ */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ EmitContext context = new EmitContext(_context.clone());
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
deleted file mode 100644
index 4fc1c55..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Callback extends AutoCloseable, Serializable {
- List<Object> apply(List<Object> tuple, EmitContext context);
- void initialize(EmitContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java
deleted file mode 100644
index 6f2f7fa..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackCollector.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class CallbackCollector extends SpoutOutputCollector implements Serializable {
- static final long serialVersionUID = 0xDEADBEEFL;
- Callback _callback;
- SpoutOutputCollector _delegate;
- EmitContext _context;
- public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
- super(collector);
- this._callback = callback;
- this._delegate = collector;
- this._context = context;
- }
-
-
- public static int getPartition(Object messageIdObj) {
- PartitionManager.KafkaMessageId messageId = (PartitionManager.KafkaMessageId) messageIdObj;
- return messageId.partition.partition;
- }
-
- /**
- * Emits a new tuple to the specified output stream with the given message ID.
- * When Storm detects that this tuple has been fully processed, or has failed
- * to be fully processed, the spout will receive an ack or fail callback respectively
- * with the messageId as long as the messageId was not null. If the messageId was null,
- * Storm will not track the tuple and no callback will be received. The emitted values must be
- * immutable.
- *
- * @param streamId
- * @param tuple
- * @param messageId
- * @return the list of task ids that this tuple was sent to
- */
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
- .with(EmitContext.Type.STREAM_ID, streamId)
- );
- return _delegate.emit(streamId, t, messageId);
- }
-
- /**
- * Emits a new tuple to the default output stream with the given message ID.
- * When Storm detects that this tuple has been fully processed, or has failed
- * to be fully processed, the spout will receive an ack or fail callback respectively
- * with the messageId as long as the messageId was not null. If the messageId was null,
- * Storm will not track the tuple and no callback will be received. The emitted values must be
- * immutable.
- *
- * @param tuple
- * @param messageId
- * @return the list of task ids that this tuple was sent to
- */
- @Override
- public List<Integer> emit(List<Object> tuple, Object messageId) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)));
- return _delegate.emit(t, messageId);
- }
-
- /**
- * Emits a tuple to the default output stream with a null message id. Storm will
- * not track this message so ack and fail will never be called for this tuple. The
- * emitted values must be immutable.
- *
- * @param tuple
- */
- @Override
- public List<Integer> emit(List<Object> tuple) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext());
- return _delegate.emit(t);
- }
-
- /**
- * Emits a tuple to the specified output stream with a null message id. Storm will
- * not track this message so ack and fail will never be called for this tuple. The
- * emitted values must be immutable.
- *
- * @param streamId
- * @param tuple
- */
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
- return _delegate.emit(streamId, t);
- }
-
- /**
- * Emits a tuple to the specified task on the specified output stream. This output
- * stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message. The emitted values must be
- * immutable.
- *
- * @param taskId
- * @param streamId
- * @param tuple
- * @param messageId
- */
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
- .with(EmitContext.Type.PARTITION, getPartition(messageId))
- .with(EmitContext.Type.TASK_ID, new Integer(taskId))
- );
- _delegate.emitDirect(taskId, streamId, t, messageId);
- }
-
- /**
- * Emits a tuple to the specified task on the default output stream. This output
- * stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message. The emitted values must be
- * immutable.
- *
- * @param taskId
- * @param tuple
- * @param messageId
- */
- @Override
- public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))
- .with(EmitContext.Type.TASK_ID, new Integer(taskId))
- );
- _delegate.emitDirect(taskId, t, messageId);
- }
-
- /**
- * Emits a tuple to the specified task on the specified output stream. This output
- * stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message. The emitted values must be
- * immutable.
- * <p/>
- * <p> Because no message id is specified, Storm will not track this message
- * so ack and fail will never be called for this tuple.</p>
- *
- * @param taskId
- * @param streamId
- * @param tuple
- */
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
- .with(EmitContext.Type.TASK_ID, new Integer(taskId))
- );
- _delegate.emitDirect(taskId, streamId, t);
- }
-
- /**
- * Emits a tuple to the specified task on the default output stream. This output
- * stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message. The emitted values must be
- * immutable.
- * <p/>
- * <p> Because no message id is specified, Storm will not track this message
- * so ack and fail will never be called for this tuple.</p>
- *
- * @param taskId
- * @param tuple
- */
- @Override
- public void emitDirect(int taskId, List<Object> tuple) {
-
- List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, new Integer(taskId)));
- _delegate.emitDirect(taskId, t);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java
deleted file mode 100644
index 3828d21..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/CallbackKafkaSpout.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import storm.kafka.*;
-
-import java.util.*;
-
-public class CallbackKafkaSpout extends KafkaSpout {
- static final long serialVersionUID = 0xDEADBEEFL;
- Class<? extends Callback> callbackClazz;
- Callback _callback;
- EmitContext _context;
- public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
- this(spoutConfig, toCallbackClass(callbackClass));
- }
-
- public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
- super(spoutConf);
- callbackClazz = callback;
- }
-
- public void initialize() {
- _callback = createCallback(callbackClazz);
- _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
- .with(EmitContext.Type.UUID, _uuid)
- .with(EmitContext.Type.TOPIC, _spoutConfig.topic);
- _callback.initialize(_context);
- }
-
-
- private static Class<? extends Callback> toCallbackClass(String callbackClass) {
- try{
- return (Class<? extends Callback>) Callback.class.forName(callbackClass);
- }
- catch (ClassNotFoundException e) {
- throw new RuntimeException(callbackClass + " not found", e);
- }
- }
-
- protected Callback createCallback(Class<? extends Callback> callbackClass) {
- try {
- return callbackClass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException("Unable to instantiate callback", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Illegal access", e);
- }
- }
-
- @Override
- public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
- if(_callback == null) {
- initialize();
- }
- super.open( conf, context
- , new CallbackCollector(_callback, collector
- ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
- .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
- )
- );
- }
-
- @Override
- public void close() {
- super.close();
- if(_callback != null) {
- try {
- _callback.close();
- } catch (Exception e) {
- throw new IllegalStateException("Unable to close callback", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java
deleted file mode 100644
index d2d9df7..0000000
--- a/metron-platform/metron-pcap/src/main/java/storm/kafka/EmitContext.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.task.TopologyContext;
-
-import java.io.Serializable;
-import java.util.EnumMap;
-import java.util.Map;
-
-public class EmitContext implements Cloneable,Serializable {
- static final long serialVersionUID = 0xDEADBEEFL;
-
- public enum Type{
- STREAM_ID(String.class)
- ,TOPIC(String.class)
- ,PARTITION(Integer.class)
- ,TASK_ID(Integer.class)
- ,UUID(String.class)
- ,SPOUT_CONFIG(SpoutConfig.class)
- ,OPEN_CONFIG(Map.class)
- ,TOPOLOGY_CONTEXT(TopologyContext.class)
- ;
- Class<?> clazz;
- Type(Class<?> clazz) {
- this.clazz= clazz;
- }
-
- public Class<?> clazz() {
- return clazz;
- }
- }
- public EmitContext() {
- this(new EnumMap<>(Type.class));
- }
- public EmitContext(EnumMap<Type, Object> context) {
- _context = context;
- }
- private EnumMap<Type, Object> _context;
-
- public <T> EmitContext with(Type t, T o ) {
- _context.put(t, t.clazz().cast(o));
- return this;
- }
- public <T> void add(Type t, T o ) {
- with(t, o);
- }
-
- public <T> T get(Type t) {
- Object o = _context.get(t);
- if(o == null) {
- return null;
- }
- else {
- return (T) o;
- }
- }
-
- public EmitContext cloneContext() {
- try {
- return (EmitContext)this.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException("Unable to clone emit context.", e);
- }
- }
-
- /**
- * Creates and returns a copy of this object. The precise meaning
- * of "copy" may depend on the class of the object. The general
- * intent is that, for any object {@code x}, the expression:
- * <blockquote>
- * <pre>
- * x.clone() != x</pre></blockquote>
- * will be true, and that the expression:
- * <blockquote>
- * <pre>
- * x.clone().getClass() == x.getClass()</pre></blockquote>
- * will be {@code true}, but these are not absolute requirements.
- * While it is typically the case that:
- * <blockquote>
- * <pre>
- * x.clone().equals(x)</pre></blockquote>
- * will be {@code true}, this is not an absolute requirement.
- *
- * By convention, the returned object should be obtained by calling
- * {@code super.clone}. If a class and all of its superclasses (except
- * {@code Object}) obey this convention, it will be the case that
- * {@code x.clone().getClass() == x.getClass()}.
- *
- * By convention, the object returned by this method should be independent
- * of this object (which is being cloned). To achieve this independence,
- * it may be necessary to modify one or more fields of the object returned
- * by {@code super.clone} before returning it. Typically, this means
- * copying any mutable objects that comprise the internal "deep structure"
- * of the object being cloned and replacing the references to these
- * objects with references to the copies. If a class contains only
- * primitive fields or references to immutable objects, then it is usually
- * the case that no fields in the object returned by {@code super.clone}
- * need to be modified.
- *
- * The method {@code clone} for class {@code Object} performs a
- * specific cloning operation. First, if the class of this object does
- * not implement the interface {@code Cloneable}, then a
- * {@code CloneNotSupportedException} is thrown. Note that all arrays
- * are considered to implement the interface {@code Cloneable} and that
- * the return type of the {@code clone} method of an array type {@code T[]}
- * is {@code T[]} where T is any reference or primitive type.
- * Otherwise, this method creates a new instance of the class of this
- * object and initializes all its fields with exactly the contents of
- * the corresponding fields of this object, as if by assignment; the
- * contents of the fields are not themselves cloned. Thus, this method
- * performs a "shallow copy" of this object, not a "deep copy" operation.
- *
- * The class {@code Object} does not itself implement the interface
- * {@code Cloneable}, so calling the {@code clone} method on an object
- * whose class is {@code Object} will result in throwing an
- * exception at run time.
- *
- * @return a clone of this instance.
- * @throws CloneNotSupportedException if the object's class does not
- * support the {@code Cloneable} interface. Subclasses
- * that override the {@code clone} method can also
- * throw this exception to indicate that an instance cannot
- * be cloned.
- * @see Cloneable
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
- EmitContext context = new EmitContext(_context.clone());
- return context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml
index 65077e0..d87e860 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -112,7 +112,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
+ <artifactId>kafka_2.10</artifactId>
<version>${global_kafka_version}</version>
<exclusions>
<exclusion>
@@ -122,6 +122,19 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${global_kafka_version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 4441b29..50b11eb 100644
--- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -17,7 +17,9 @@
*/
package org.apache.metron.solr.writer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml
index 1700ee0..04cfe14 100644
--- a/metron-platform/metron-test-utilities/pom.xml
+++ b/metron-platform/metron-test-utilities/pom.xml
@@ -41,11 +41,58 @@
<version>${global_hbase_guava_version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${global_hadoop_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${global_hbase_version}</version>
<exclusions>
<exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
@@ -69,7 +116,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
+ <artifactId>kafka_2.10</artifactId>
<version>${global_kafka_version}</version>
<exclusions>
<exclusion>
@@ -81,12 +128,12 @@
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
- <version>2.7.1</version>
+ <version>${global_curator_version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
- <version>2.7.1</version>
+ <version>${global_curator_version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
index a655e14..3faa1e1 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
@@ -17,11 +17,11 @@
*/
package org.apache.metron.test.bolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import org.apache.curator.framework.CuratorFramework;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
index 83681c7..ff9e0cf 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/PrintingBolt.java
@@ -20,11 +20,11 @@ package org.apache.metron.test.bolt;
import java.util.Map;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
@SuppressWarnings("serial")
public class PrintingBolt extends BaseRichBolt {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
index a36d99d..58b6fef 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
@@ -29,13 +29,13 @@ import org.apache.metron.test.converters.BinaryConverters;
import org.apache.metron.test.converters.IConverter;
import org.apache.metron.test.filereaders.FileReader;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
public class GenericInternalTestSpout extends BaseRichSpout {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
index 1859c7f..53de800 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -31,9 +31,53 @@
</properties>
<dependencies>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${global_hadoop_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${global_hadoop_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ </exclusions>
<scope>provided</scope>
</dependency>
<dependency>
@@ -54,6 +98,14 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -74,7 +126,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
+ <artifactId>kafka_2.10</artifactId>
<version>${global_kafka_version}</version>
<scope>provided</scope>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 897ef42..738ccac 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -18,8 +18,8 @@
package org.apache.metron.writer;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
import com.google.common.collect.Iterables;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index c6d220e..e323056 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -17,7 +17,7 @@
*/
package org.apache.metron.writer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index 3b1321b..e3e8150 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -18,7 +18,7 @@
package org.apache.metron.writer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
import com.google.common.collect.Iterables;
import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 161f4bc..a60ddae 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,11 +17,11 @@
*/
package org.apache.metron.writer.bolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index e4d56d9..3af0a93 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -17,7 +17,8 @@
*/
package org.apache.metron.writer.hdfs;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e3170502/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
index ae0242d..fe9e3e3 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
@@ -18,7 +18,7 @@
package org.apache.metron.writer.hdfs;
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import java.util.Map;