You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/02/01 15:56:25 UTC

[2/8] storm git commit: STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client STORM-2225: change spout config to be simpler. STORM-2228: removed ability to request a single topic go to multiple streams STORM-2236: Reimplemented manual partit

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index d9818b7..6e59d42 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -18,32 +18,32 @@
 
 package org.apache.storm.kafka.spout.test;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import org.apache.storm.tuple.Values;
 
 public class KafkaSpoutTopologyMainNamedTopics {
-    private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
+    private static final String TOPIC_2_STREAM = "test_2_stream";
+    private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
     private static final String[] TOPICS = new String[]{"test","test1","test2"};
 
 
@@ -88,14 +88,32 @@ public class KafkaSpoutTopologyMainNamedTopics {
 
     protected StormTopology getTopolgyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
-        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-        tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())
+          .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM)
+          .shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
+        tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
         return tp.createTopology();
     }
 
-    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
+    public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> r) {
+            return new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value());
+        }
+    };
+    
+    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
+        ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
+                TOPIC_PART_OFF_KEY_VALUE_FUNC,
+                new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM);
+        trans.forTopic(TOPICS[2], 
+                TOPIC_PART_OFF_KEY_VALUE_FUNC,
+                new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM);
+        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS)
+                .setGroupId("kafkaSpoutTestGroup")
+                .setRetry(getRetryService())
+                .setRecordTranslator(trans)
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
                 .setMaxUncommittedOffsets(250)
@@ -106,30 +124,4 @@ public class KafkaSpoutTopologyMainNamedTopics {
             return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                     TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
     }
-
-    protected Map<String,Object> getKafkaConsumerProps() {
-        Map<String, Object> props = new HashMap<>();
-//        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
-        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-        return props;
-    }
-
-    protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
-                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
-                .build();
-    }
-
-    protected KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
-        final Fields outputFields1 = new Fields("topic", "partition", "offset");
-        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to test_stream
-                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of topic test2 sent to test_stream
-                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of topic test2 sent to test2_stream
-                .build();
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index c362a2b..8b967fa 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -18,22 +18,23 @@
 
 package org.apache.storm.kafka.spout.test;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutStream;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-
-import java.util.regex.Pattern;
+import org.apache.storm.tuple.Values;
 
 public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics {
     private static final String STREAM = "test_wildcard_stream";
-    private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+    private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test[1|2]");
 
     public static void main(String[] args) throws Exception {
         new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
@@ -41,22 +42,27 @@ public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMain
 
     protected StormTopology getTopolgyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
 
-    protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder());
-    }
-
-    protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() {
-        return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
-    }
-
-    protected KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
-        final KafkaSpoutStream kafkaSpoutStream = new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN));
-        return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream);
+    public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> r) {
+            return new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value());
+        }
+    };
+    
+    protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
+        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_WILDCARD_PATTERN)
+                .setGroupId("kafkaSpoutTestGroup")
+                .setRetry(getRetryService())
+                .setRecordTranslator(TOPIC_PART_OFF_KEY_VALUE_FUNC,
+                        new Fields("topic", "partition", "offset", "key", "value"), STREAM)
+                .setOffsetCommitPeriodMs(10_000)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
deleted file mode 100644
index ca65177..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
-    /**
-     * @param topics list of topics that use this implementation to build tuples
-     */
-    public TopicTest2TupleBuilder(String... topics) {
-        super(topics);
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return new Values(consumerRecord.topic(),
-                consumerRecord.partition(),
-                consumerRecord.offset());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
deleted file mode 100644
index 4c55aa1..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
-    /**
-     * @param topics list of topics that use this implementation to build tuples
-     */
-    public TopicsTest0Test1TupleBuilder(String... topics) {
-        super(topics);
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return new Values(consumerRecord.topic(),
-                consumerRecord.partition(),
-                consumerRecord.offset(),
-                consumerRecord.key(),
-                consumerRecord.value());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
index 19f0452..4c5dba5 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
@@ -41,7 +41,7 @@ public class DynamicPartitionConnections {
         }
     }
 
-    Map<Broker, ConnectionInfo> _connections = new HashMap();
+    Map<Broker, ConnectionInfo> _connections = new HashMap<>();
     KafkaConfig _config;
     IBrokerReader _reader;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index d2bd313..4608963 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -61,7 +61,7 @@ public class KafkaSpout extends BaseRichSpout {
     public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
         _collector = collector;
         String topologyInstanceId = context.getStormId();
-        Map stateConf = new HashMap(conf);
+        Map<String, Object> stateConf = new HashMap<>(conf);
         List<String> zkServers = _spoutConfig.zkServers;
         if (zkServers == null) {
             zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 8c22118..f23c873 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -110,7 +110,7 @@ public class KafkaUtils {
         @Override
         public Object getValueAndReset() {
             try {
-                HashMap ret = new HashMap();
+                HashMap<String, Long> ret = new HashMap<>();
                 if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
                     Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
                     for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 79e7c3d..db5558d 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -133,7 +133,7 @@ public class PartitionManager {
     }
 
     public Map getMetricsDataMap() {
-        Map ret = new HashMap();
+        Map<String, Object> ret = new HashMap<>();
         ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
         ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
         ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index bdbc44d..628bfc0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -24,7 +24,7 @@ import java.util.*;
 
 public class StaticCoordinator implements PartitionCoordinator {
     Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
-    List<PartitionManager> _allManagers = new ArrayList();
+    List<PartitionManager> _allManagers = new ArrayList<>();
 
     public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
@@ -34,7 +34,7 @@ public class StaticCoordinator implements PartitionCoordinator {
         for (Partition myPartition : myPartitions) {
             _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
         }
-        _allManagers = new ArrayList(_managers.values());
+        _allManagers = new ArrayList<>(_managers.values());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
index 31eaac5..b5bb124 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -51,7 +51,9 @@ import java.util.Properties;
  * This bolt uses 0.8.2 Kafka Producer API.
  * <p/>
  * It works for sending tuples to older Kafka version (0.8.1).
+ * @deprecated Please use the KafkaBolt in storm-kafka-client
  */
+@Deprecated
 public class KafkaBolt<K, V> extends BaseRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
index 3363252..46cc60d 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
@@ -54,7 +54,7 @@ public class DynamicBrokersReaderTest {
     public void setUp() throws Exception {
         server = new TestingServer();
         String connectionString = server.getConnectString();
-        Map conf = new HashMap();
+        Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
         conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
         conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
@@ -64,7 +64,7 @@ public class DynamicBrokersReaderTest {
         zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
         dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
 
-        Map conf2 = new HashMap();
+        Map<String, Object> conf2 = new HashMap<>();
         conf2.putAll(conf);
         conf2.put("kafka.topic.wildcard.match",true);
 
@@ -240,7 +240,7 @@ public class DynamicBrokersReaderTest {
     @Test(expected = NullPointerException.class)
     public void testErrorLogsWhenConfigIsMissing() throws Exception {
         String connectionString = server.getConnectString();
-        Map conf = new HashMap();
+        Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
 //        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
         conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
index 7a6073a..864eaa9 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
@@ -42,7 +42,7 @@ public class TridentKafkaTest {
     public void setup() {
         broker = new KafkaTestBroker();
         simpleConsumer = TestUtils.getKafkaConsumer(broker);
-        TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
+        TridentTupleToKafkaMapper<Object, Object> mapper = new FieldNameBasedTupleToKafkaMapper<Object, Object>("key", "message");
         KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
         state = new TridentKafkaState()
                 .withKafkaTopicSelector(topicSelector)

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 65bf0b4..364da33 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -124,7 +124,7 @@ public class ZkCoordinatorTest {
     }
 
     private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) {
-        List<List<PartitionManager>> partitions = new ArrayList();
+        List<List<PartitionManager>> partitions = new ArrayList<>();
         for (ZkCoordinator coordinator : coordinatorList) {
             partitions.add(coordinator.getMyManagedPartitions());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 180828e..58e52e8 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -52,6 +52,7 @@ import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.HashMap;
 import java.util.Properties;
 import java.util.concurrent.Future;
@@ -287,7 +288,8 @@ public class KafkaBoltTest {
 
     private Tuple generateTestTuple(Object key, Object message) {
         TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<Integer,String>(),
+                new HashMap<String, List<Integer>>(), new HashMap<String, Map<String, Fields>>(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String streamId) {
                 return new Fields("key", "message");
@@ -298,7 +300,8 @@ public class KafkaBoltTest {
 
     private Tuple generateTestTuple(Object message) {
         TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<Integer, String>(),
+                new HashMap<String, List<Integer>>(), new HashMap<String, Map<String, Fields>>(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String streamId) {
                 return new Fields("message");

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
index 840b2d3..a771748 100644
--- a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
+++ b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
  * Collection of unique named fields using in an ITuple
  */
 public class Fields implements Iterable<String>, Serializable {
+    private static final long serialVersionUID = -3377931843059975424L;
     private List<String> _fields;
     private Map<String, Integer> _index = new HashMap<>();
     
@@ -122,5 +123,20 @@ public class Fields implements Iterable<String>, Serializable {
     @Override
     public String toString() {
         return _fields.toString();
-    }    
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) return true;
+        if (other instanceof Fields) {
+            Fields of = (Fields)other;
+            return _fields.equals(of._fields);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return _fields.hashCode();
+    }
 }