You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/01 19:09:22 UTC
[38/50] [abbrv] storm git commit: STORM-1352. Trident should support
writing to multiple Kafka clusters.
STORM-1352. Trident should support writing to multiple Kafka clusters.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1c52735
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1c52735
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1c52735
Branch: refs/heads/STORM-1040
Commit: c1c52735b9bace12a64e9a8c3190b292604b1120
Parents: 20a864d
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Nov 25 11:30:22 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Nov 25 13:56:35 2015 -0800
----------------------------------------------------------------------
.../starter/trident/TridentKafkaWordCount.java | 15 ++++----
external/storm-kafka/README.md | 37 ++++++++++----------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 13 ++-----
.../storm/kafka/trident/TridentKafkaState.java | 10 ++----
.../kafka/trident/TridentKafkaStateFactory.java | 10 ++++--
.../src/test/storm/kafka/TestUtils.java | 8 ++---
.../src/test/storm/kafka/TridentKafkaTest.java | 13 +++----
.../test/storm/kafka/TridentKafkaTopology.java | 33 +++++++----------
.../test/storm/kafka/bolt/KafkaBoltTest.java | 6 ++--
9 files changed, 58 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
index 813841a..bd8ecba 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
@@ -149,14 +149,14 @@ public class TridentKafkaWordCount {
*
* @return the storm topology
*/
- public StormTopology buildProducerTopology() {
+ public StormTopology buildProducerTopology(Properties prop) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 2);
/**
* The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField
* so that this gets written out as the message in the kafka topic.
*/
- KafkaBolt bolt = new KafkaBolt()
+ KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
@@ -169,16 +169,13 @@ public class TridentKafkaWordCount {
*
* @return the topology config
*/
- public Config getProducerConfig() {
- Config conf = new Config();
- conf.setMaxSpoutPending(20);
+ public Properties getProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
- conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
- return conf;
+ return props;
}
/**
@@ -214,8 +211,10 @@ public class TridentKafkaWordCount {
// submit the consumer topology.
cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc));
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
// submit the producer topology.
- cluster.submitTopology("kafkaBolt", wordCount.getProducerConfig(), wordCount.buildProducerTopology());
+ cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig()));
// keep querying the word counts for a minute.
for (int i = 0; i < 60; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 1d3678b..2fe930e 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -226,9 +226,8 @@ You can return a null and the message will be ignored. If you have one static to
DefaultTopicSelector.java and set the name of the topic in the constructor.
### Specifying Kafka producer properties
-You can provide all the produce properties , see http://kafka.apache.org/documentation.html#newproducerconfigs
-section "Important configuration properties for the producer", in your Storm topology config by setting the properties
-map with key kafka.broker.properties.
+You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more details.
###Using wildcard kafka topic match
You can do a wildcard topic match by adding the following config
@@ -238,7 +237,7 @@ You can do a wildcard topic match by adding the following config
```
-After this you can specifiy a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc
+After this you can specify a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc
###Putting it all together
@@ -256,19 +255,20 @@ For the bolt :
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
- KafkaBolt bolt = new KafkaBolt()
- .withTopicSelector(new DefaultTopicSelector("test"))
- .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
- builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-
- Config conf = new Config();
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+
+ KafkaBolt bolt = new KafkaBolt()
+ .withProducerProperties(props)
+ .withTopicSelector(new DefaultTopicSelector("test"))
+ .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+ builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+ Config conf = new Config();
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
```
@@ -288,19 +288,20 @@ For Trident:
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
- TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
- .withKafkaTopicSelector(new DefaultTopicSelector("test"))
- .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
- stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
- Config conf = new Config();
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+
+ TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+ .withProducerProperties(props)
+ .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+ .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+ stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+ Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
```
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 2cca826..1ebe142 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -57,7 +57,6 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
public static final String TOPIC = "topic";
- public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
private KafkaProducer<K, V> producer;
private OutputCollector collector;
@@ -73,8 +72,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private boolean fireAndForget = false;
private boolean async = true;
- public KafkaBolt() {
- }
+ public KafkaBolt() {}
public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
this.mapper = mapper;
@@ -103,14 +101,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC));
}
- Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
- Properties properties = new Properties();
- if(configMap!= null)
- properties.putAll(configMap);
- if(boltSpecfiedProperties != null)
- properties.putAll(boltSpecfiedProperties);
-
- producer = new KafkaProducer<K, V>(properties);
+ producer = new KafkaProducer<>(boltSpecfiedProperties);
this.collector = collector;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 1ed61b1..84b6a6a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -32,7 +32,6 @@ import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -40,8 +39,6 @@ import java.util.concurrent.Future;
public class TridentKafkaState implements State {
private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
- public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
-
private KafkaProducer producer;
private OutputCollector collector;
@@ -68,13 +65,10 @@ public class TridentKafkaState implements State {
LOG.debug("commit is Noop.");
}
- public void prepare(Map stormConf) {
+ public void prepare(Properties options) {
Validate.notNull(mapper, "mapper can not be null");
Validate.notNull(topicSelector, "topicSelector can not be null");
- Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
- Properties properties = new Properties();
- properties.putAll(configMap);
- producer = new KafkaProducer(properties);
+ producer = new KafkaProducer(options);
}
public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
index adca69e..a5d9d42 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -26,6 +26,7 @@ import storm.trident.state.State;
import storm.trident.state.StateFactory;
import java.util.Map;
+import java.util.Properties;
public class TridentKafkaStateFactory implements StateFactory {
@@ -33,7 +34,7 @@ public class TridentKafkaStateFactory implements StateFactory {
private TridentTupleToKafkaMapper mapper;
private KafkaTopicSelector topicSelector;
-
+ private Properties producerProperties = new Properties();
public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
this.mapper = mapper;
@@ -45,13 +46,18 @@ public class TridentKafkaStateFactory implements StateFactory {
return this;
}
+ public TridentKafkaStateFactory withProducerProperties(Properties props) {
+ this.producerProperties = props;
+ return this;
+ }
+
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
TridentKafkaState state = new TridentKafkaState()
.withKafkaTopicSelector(this.topicSelector)
.withTridentTupleToKafkaMapper(this.mapper);
- state.prepare(conf);
+ state.prepare(producerProperties);
return state;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
index 839b691..3e69160 100644
--- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
@@ -73,17 +73,13 @@ public class TestUtils {
return new StaticHosts(globalPartitionInformation);
}
- public static Config getConfig(String brokerConnectionString) {
- Config config = new Config();
+ public static Properties getProducerProperties(String brokerConnectionString) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerConnectionString);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
- config.put(KafkaBolt.TOPIC, TOPIC);
-
- return config;
+ return props;
}
public static boolean verifyMessage(String key, String message, KafkaTestBroker broker, SimpleConsumer simpleConsumer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
index d8a5e24..8213b07 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
@@ -17,7 +17,6 @@
*/
package storm.kafka;
-import backtype.storm.Config;
import backtype.storm.tuple.Fields;
import kafka.javaapi.consumer.SimpleConsumer;
import org.junit.After;
@@ -37,22 +36,18 @@ import java.util.List;
public class TridentKafkaTest {
private KafkaTestBroker broker;
private TridentKafkaState state;
- private Config config;
private SimpleConsumer simpleConsumer;
- private TridentTupleToKafkaMapper mapper;
- private KafkaTopicSelector topicSelector;
@Before
public void setup() {
broker = new KafkaTestBroker();
simpleConsumer = TestUtils.getKafkaConsumer(broker);
- config = TestUtils.getConfig(broker.getBrokerConnectionString());
- mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
- topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
+ TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
+ KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
state = new TridentKafkaState()
.withKafkaTopicSelector(topicSelector)
.withTridentTupleToKafkaMapper(mapper);
- state.prepare(config);
+ state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString()));
}
@Test
@@ -71,7 +66,7 @@ public class TridentKafkaTest {
}
private List<TridentTuple> generateTupleBatch(String key, String message, int batchsize) {
- List<TridentTuple> batch = new ArrayList<TridentTuple>();
+ List<TridentTuple> batch = new ArrayList<>();
for(int i =0 ; i < batchsize; i++) {
batch.add(TridentTupleView.createFreshTuple(new Fields("key", "message"), key, message));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
index cade6df..b9e25e4 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
@@ -22,7 +22,7 @@ import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import storm.kafka.trident.TridentKafkaState;
+import com.google.common.collect.ImmutableMap;
import storm.kafka.trident.TridentKafkaStateFactory;
import storm.kafka.trident.TridentKafkaUpdater;
import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
@@ -31,14 +31,11 @@ import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.testing.FixedBatchSpout;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
-
public class TridentKafkaTopology {
- private static StormTopology buildTopology() {
+ private static StormTopology buildTopology(String brokerConnectionString) {
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
@@ -51,9 +48,16 @@ public class TridentKafkaTopology {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
+ Properties props = new Properties();
+ props.put("bootstrap.servers", brokerConnectionString);
+ props.put("acks", "1");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
- .withKafkaTopicSelector(new DefaultTopicSelector("test"))
- .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+ .withProducerProperties(props)
+ .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+ .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
return topology.build();
@@ -77,24 +81,11 @@ public class TridentKafkaTopology {
System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
}
- Config conf = getConfig(args[0]);
LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology());
+ cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
Thread.sleep(60 * 1000);
cluster.killTopology("wordCounter");
cluster.shutdown();
}
-
- private static Config getConfig(String brokerConnectionString) {
- Config conf = new Config();
- Properties props = new Properties();
- props.put("bootstrap.servers", brokerConnectionString);
- props.put("acks", "1");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
- return conf;
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 75c24d7..87daab0 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -183,21 +183,19 @@ public class KafkaBoltTest {
}
private KafkaBolt generateStringSerializerBolt() {
- KafkaBolt bolt = new KafkaBolt();
Properties props = new Properties();
props.put("acks", "1");
props.put("bootstrap.servers", broker.getBrokerConnectionString());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("metadata.fetch.timeout.ms", 1000);
- config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+ KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(false);
return bolt;
}
private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
- KafkaBolt bolt = new KafkaBolt();
Properties props = new Properties();
props.put("acks", "1");
props.put("bootstrap.servers", broker.getBrokerConnectionString());
@@ -205,7 +203,7 @@ public class KafkaBoltTest {
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("metadata.fetch.timeout.ms", 1000);
props.put("linger.ms", 0);
- config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+ KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(async);
bolt.setFireAndForget(fireAndForget);