You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/02/01 15:55:38 UTC
[2/7] storm git commit: STORM-1997: copy state/bolt from storm-kafka
to storm-kafka-client STORM-2225: change spout config to be simpler.
STORM-2228: removed ability to request a single topic go to multiple streams
STORM-2236: Reimplemented manual partit
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 4cf4824..2aeeb95 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -18,32 +18,29 @@
package org.apache.storm.kafka.spout.test;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import org.apache.storm.tuple.Values;
public class KafkaSpoutTopologyMainNamedTopics {
- private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
+ private static final String TOPIC_2_STREAM = "test_2_stream";
+ private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
private static final String[] TOPICS = new String[]{"test","test1","test2"};
@@ -87,14 +84,25 @@ public class KafkaSpoutTopologyMainNamedTopics {
protected StormTopology getTopolgyKafkaSpout() {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
- tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
- tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
+ tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())
+ .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM)
+ .shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
+ tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
return tp.createTopology();
}
- protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
- return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
+ protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
+ ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
+ (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
+ new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM);
+ trans.forTopic(TOPICS[2],
+ (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
+ new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM);
+ return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS)
+ .setGroupId("kafkaSpoutTestGroup")
+ .setRetry(getRetryService())
+ .setRecordTranslator(trans)
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
@@ -105,30 +113,4 @@ public class KafkaSpoutTopologyMainNamedTopics {
return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
}
-
- protected Map<String,Object> getKafkaConsumerProps() {
- Map<String, Object> props = new HashMap<>();
-// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
- props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
- props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
- props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
- return props;
- }
-
- protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
- return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
- new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
- new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
- .build();
- }
-
- protected KafkaSpoutStreams getKafkaSpoutStreams() {
- final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
- final Fields outputFields1 = new Fields("topic", "partition", "offset");
- return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream
- .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream
- .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream
- .build();
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index c362a2b..d0376e6 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -18,22 +18,20 @@
package org.apache.storm.kafka.spout.test;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.regex.Pattern;
+
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutStream;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-
-import java.util.regex.Pattern;
+import org.apache.storm.tuple.Values;
public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics {
private static final String STREAM = "test_wildcard_stream";
- private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+ private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test[1|2]");
public static void main(String[] args) throws Exception {
new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
@@ -41,22 +39,20 @@ public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMain
protected StormTopology getTopolgyKafkaSpout() {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
return tp.createTopology();
}
- protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
- return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder());
- }
-
- protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() {
- return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
- }
-
- protected KafkaSpoutStreams getKafkaSpoutStreams() {
- final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
- final KafkaSpoutStream kafkaSpoutStream = new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN));
- return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream);
+ protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
+ return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_WILDCARD_PATTERN)
+ .setGroupId("kafkaSpoutTestGroup")
+ .setRetry(getRetryService())
+ .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
+ new Fields("topic", "partition", "offset", "key", "value"), STREAM)
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
deleted file mode 100644
index ca65177..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
- /**
- * @param topics list of topics that use this implementation to build tuples
- */
- public TopicTest2TupleBuilder(String... topics) {
- super(topics);
- }
-
- @Override
- public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
- return new Values(consumerRecord.topic(),
- consumerRecord.partition(),
- consumerRecord.offset());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
deleted file mode 100644
index 4c55aa1..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
- /**
- * @param topics list of topics that use this implementation to build tuples
- */
- public TopicsTest0Test1TupleBuilder(String... topics) {
- super(topics);
- }
-
- @Override
- public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
- return new Values(consumerRecord.topic(),
- consumerRecord.partition(),
- consumerRecord.offset(),
- consumerRecord.key(),
- consumerRecord.value());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
index 19f0452..4c5dba5 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
@@ -41,7 +41,7 @@ public class DynamicPartitionConnections {
}
}
- Map<Broker, ConnectionInfo> _connections = new HashMap();
+ Map<Broker, ConnectionInfo> _connections = new HashMap<>();
KafkaConfig _config;
IBrokerReader _reader;
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index d2bd313..4608963 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -61,7 +61,7 @@ public class KafkaSpout extends BaseRichSpout {
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
String topologyInstanceId = context.getStormId();
- Map stateConf = new HashMap(conf);
+ Map<String, Object> stateConf = new HashMap<>(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 53961d4..2072df3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -114,7 +114,7 @@ public class KafkaUtils {
@Override
public Object getValueAndReset() {
try {
- HashMap ret = new HashMap();
+ HashMap<String, Long> ret = new HashMap<>();
if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) {
@@ -195,7 +195,7 @@ public class KafkaUtils {
int partitionId = partition.partition;
FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
- clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
+ clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
FetchResponse fetchResponse;
try {
fetchResponse = consumer.fetch(fetchRequest);
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 793d227..8d608d9 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -133,7 +133,7 @@ public class PartitionManager {
}
public Map getMetricsDataMap() {
- Map ret = new HashMap();
+ Map<String, Object> ret = new HashMap<>();
ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index bdbc44d..628bfc0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -24,7 +24,7 @@ import java.util.*;
public class StaticCoordinator implements PartitionCoordinator {
Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
- List<PartitionManager> _allManagers = new ArrayList();
+ List<PartitionManager> _allManagers = new ArrayList<>();
public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
@@ -34,7 +34,7 @@ public class StaticCoordinator implements PartitionCoordinator {
for (Partition myPartition : myPartitions) {
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
}
- _allManagers = new ArrayList(_managers.values());
+ _allManagers = new ArrayList<>(_managers.values());
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
index 31eaac5..b5bb124 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -51,7 +51,9 @@ import java.util.Properties;
* This bolt uses 0.8.2 Kafka Producer API.
* <p/>
* It works for sending tuples to older Kafka version (0.8.1).
+ * @deprecated Please use the KafkaBolt in storm-kafka-client
*/
+@Deprecated
public class KafkaBolt<K, V> extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
index 3363252..46cc60d 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
@@ -54,7 +54,7 @@ public class DynamicBrokersReaderTest {
public void setUp() throws Exception {
server = new TestingServer();
String connectionString = server.getConnectString();
- Map conf = new HashMap();
+ Map<String, Object> conf = new HashMap<>();
conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
@@ -64,7 +64,7 @@ public class DynamicBrokersReaderTest {
zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
- Map conf2 = new HashMap();
+ Map<String, Object> conf2 = new HashMap<>();
conf2.putAll(conf);
conf2.put("kafka.topic.wildcard.match",true);
@@ -240,7 +240,7 @@ public class DynamicBrokersReaderTest {
@Test(expected = NullPointerException.class)
public void testErrorLogsWhenConfigIsMissing() throws Exception {
String connectionString = server.getConnectString();
- Map conf = new HashMap();
+ Map<String, Object> conf = new HashMap<>();
conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
// conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
index 7a6073a..864eaa9 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
@@ -42,7 +42,7 @@ public class TridentKafkaTest {
public void setup() {
broker = new KafkaTestBroker();
simpleConsumer = TestUtils.getKafkaConsumer(broker);
- TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
+ TridentTupleToKafkaMapper<Object, Object> mapper = new FieldNameBasedTupleToKafkaMapper<Object, Object>("key", "message");
KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
state = new TridentKafkaState()
.withKafkaTopicSelector(topicSelector)
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 65bf0b4..364da33 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -124,7 +124,7 @@ public class ZkCoordinatorTest {
}
private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) {
- List<List<PartitionManager>> partitions = new ArrayList();
+ List<List<PartitionManager>> partitions = new ArrayList<>();
for (ZkCoordinator coordinator : coordinatorList) {
partitions.add(coordinator.getMyManagedPartitions());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 180828e..cccbce0 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -287,7 +287,7 @@ public class KafkaBoltTest {
private Tuple generateTestTuple(Object key, Object message) {
TopologyBuilder builder = new TopologyBuilder();
- GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+ GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("key", "message");
@@ -298,7 +298,7 @@ public class KafkaBoltTest {
private Tuple generateTestTuple(Object message) {
TopologyBuilder builder = new TopologyBuilder();
- GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+ GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("message");
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
index 840b2d3..a771748 100644
--- a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
+++ b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
* Collection of unique named fields using in an ITuple
*/
public class Fields implements Iterable<String>, Serializable {
+ private static final long serialVersionUID = -3377931843059975424L;
private List<String> _fields;
private Map<String, Integer> _index = new HashMap<>();
@@ -122,5 +123,20 @@ public class Fields implements Iterable<String>, Serializable {
@Override
public String toString() {
return _fields.toString();
- }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (other instanceof Fields) {
+ Fields of = (Fields)other;
+ return _fields.equals(of._fields);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return _fields.hashCode();
+ }
}