You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/01 19:09:22 UTC

[38/50] [abbrv] storm git commit: STORM-1352. Trident should support writing to multiple Kafka clusters.

STORM-1352. Trident should support writing to multiple Kafka clusters.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1c52735
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1c52735
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1c52735

Branch: refs/heads/STORM-1040
Commit: c1c52735b9bace12a64e9a8c3190b292604b1120
Parents: 20a864d
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Nov 25 11:30:22 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Nov 25 13:56:35 2015 -0800

----------------------------------------------------------------------
 .../starter/trident/TridentKafkaWordCount.java  | 15 ++++----
 external/storm-kafka/README.md                  | 37 ++++++++++----------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     | 13 ++-----
 .../storm/kafka/trident/TridentKafkaState.java  | 10 ++----
 .../kafka/trident/TridentKafkaStateFactory.java | 10 ++++--
 .../src/test/storm/kafka/TestUtils.java         |  8 ++---
 .../src/test/storm/kafka/TridentKafkaTest.java  | 13 +++----
 .../test/storm/kafka/TridentKafkaTopology.java  | 33 +++++++----------
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  6 ++--
 9 files changed, 58 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
index 813841a..bd8ecba 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
@@ -149,14 +149,14 @@ public class TridentKafkaWordCount {
      *
      * @return the storm topology
      */
-    public StormTopology buildProducerTopology() {
+    public StormTopology buildProducerTopology(Properties prop) {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new RandomSentenceSpout(), 2);
         /**
          * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField
          * so that this gets written out as the message in the kafka topic.
          */
-        KafkaBolt bolt = new KafkaBolt()
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
                 .withTopicSelector(new DefaultTopicSelector("test"))
                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
         builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
@@ -169,16 +169,13 @@ public class TridentKafkaWordCount {
      *
      * @return the topology config
      */
-    public Config getProducerConfig() {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
+    public Properties getProducerConfig() {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
-        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        return conf;
+        return props;
     }
 
     /**
@@ -214,8 +211,10 @@ public class TridentKafkaWordCount {
         // submit the consumer topology.
         cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc));
 
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
         // submit the producer topology.
-        cluster.submitTopology("kafkaBolt", wordCount.getProducerConfig(), wordCount.buildProducerTopology());
+        cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig()));
 
         // keep querying the word counts for a minute.
         for (int i = 0; i < 60; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 1d3678b..2fe930e 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -226,9 +226,8 @@ You can return a null and the message will be ignored. If you have one static to
 DefaultTopicSelector.java and set the name of the topic in the constructor.
 
 ### Specifying Kafka producer properties
-You can provide all the produce properties , see http://kafka.apache.org/documentation.html#newproducerconfigs
-section "Important configuration properties for the producer", in your Storm topology config by setting the properties
-map with key kafka.broker.properties.
+You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see  http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more details.
 
 ###Using wildcard kafka topic match
 You can do a wildcard topic match by adding the following config
@@ -238,7 +237,7 @@ You can do a wildcard topic match by adding the following config
 
 ```
 
-After this you can specifiy a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
+After this you can specify a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
 
 
 ###Putting it all together
@@ -256,19 +255,20 @@ For the bolt :
         );
         spout.setCycle(true);
         builder.setSpout("spout", spout, 5);
-        KafkaBolt bolt = new KafkaBolt()
-                .withTopicSelector(new DefaultTopicSelector("test"))
-                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
-        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-
-        Config conf = new Config();
         //set producer properties.
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("acks", "1");
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+
+        KafkaBolt bolt = new KafkaBolt()
+                .withProducerProperties(props)
+                .withTopicSelector(new DefaultTopicSelector("test"))
+                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+        Config conf = new Config();
 
         StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
@@ -288,19 +288,20 @@ For Trident:
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
-        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
-        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
-        Config conf = new Config();
         //set producer properties.
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("acks", "1");
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+
+        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withProducerProperties(props)
+                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+        Config conf = new Config();
         StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
 ```
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 2cca826..1ebe142 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -57,7 +57,6 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
 
     public static final String TOPIC = "topic";
-    public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
 
     private KafkaProducer<K, V> producer;
     private OutputCollector collector;
@@ -73,8 +72,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private boolean fireAndForget = false;
     private boolean async = true;
 
-    public KafkaBolt() {
-    }
+    public KafkaBolt() {}
 
     public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
         this.mapper = mapper;
@@ -103,14 +101,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
             this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC));
         }
 
-        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
-        Properties properties = new Properties();
-        if(configMap!= null)
-            properties.putAll(configMap);
-        if(boltSpecfiedProperties != null)
-            properties.putAll(boltSpecfiedProperties);
-
-        producer = new KafkaProducer<K, V>(properties);
+        producer = new KafkaProducer<>(boltSpecfiedProperties);
         this.collector = collector;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 1ed61b1..84b6a6a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -32,7 +32,6 @@ import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -40,8 +39,6 @@ import java.util.concurrent.Future;
 public class TridentKafkaState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
 
-    public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
-
     private KafkaProducer producer;
     private OutputCollector collector;
 
@@ -68,13 +65,10 @@ public class TridentKafkaState implements State {
         LOG.debug("commit is Noop.");
     }
 
-    public void prepare(Map stormConf) {
+    public void prepare(Properties options) {
         Validate.notNull(mapper, "mapper can not be null");
         Validate.notNull(topicSelector, "topicSelector can not be null");
-        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
-        Properties properties = new Properties();
-        properties.putAll(configMap);
-        producer = new KafkaProducer(properties);
+        producer = new KafkaProducer(options);
     }
 
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
index adca69e..a5d9d42 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -26,6 +26,7 @@ import storm.trident.state.State;
 import storm.trident.state.StateFactory;
 
 import java.util.Map;
+import java.util.Properties;
 
 public class TridentKafkaStateFactory implements StateFactory {
 
@@ -33,7 +34,7 @@ public class TridentKafkaStateFactory implements StateFactory {
 
     private TridentTupleToKafkaMapper mapper;
     private KafkaTopicSelector topicSelector;
-
+    private Properties producerProperties = new Properties();
 
     public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
         this.mapper = mapper;
@@ -45,13 +46,18 @@ public class TridentKafkaStateFactory implements StateFactory {
         return this;
     }
 
+    public TridentKafkaStateFactory withProducerProperties(Properties props) {
+        this.producerProperties = props;
+        return this;
+    }
+
     @Override
     public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
         TridentKafkaState state = new TridentKafkaState()
                 .withKafkaTopicSelector(this.topicSelector)
                 .withTridentTupleToKafkaMapper(this.mapper);
-        state.prepare(conf);
+        state.prepare(producerProperties);
         return state;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
index 839b691..3e69160 100644
--- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
@@ -73,17 +73,13 @@ public class TestUtils {
         return new StaticHosts(globalPartitionInformation);
     }
 
-    public static Config getConfig(String brokerConnectionString) {
-        Config config = new Config();
+    public static Properties getProducerProperties(String brokerConnectionString) {
         Properties props = new Properties();
         props.put("bootstrap.servers", brokerConnectionString);
         props.put("acks", "1");
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        config.put(KafkaBolt.TOPIC, TOPIC);
-
-        return config;
+        return props;
     }
 
     public static boolean verifyMessage(String key, String message, KafkaTestBroker broker, SimpleConsumer simpleConsumer) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
index d8a5e24..8213b07 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
@@ -17,7 +17,6 @@
  */
 package storm.kafka;
 
-import backtype.storm.Config;
 import backtype.storm.tuple.Fields;
 import kafka.javaapi.consumer.SimpleConsumer;
 import org.junit.After;
@@ -37,22 +36,18 @@ import java.util.List;
 public class TridentKafkaTest {
     private KafkaTestBroker broker;
     private TridentKafkaState state;
-    private Config config;
     private SimpleConsumer simpleConsumer;
-    private TridentTupleToKafkaMapper mapper;
-    private KafkaTopicSelector topicSelector;
 
     @Before
     public void setup() {
         broker = new KafkaTestBroker();
         simpleConsumer = TestUtils.getKafkaConsumer(broker);
-        config = TestUtils.getConfig(broker.getBrokerConnectionString());
-        mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
-        topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
+        TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
+        KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
         state = new TridentKafkaState()
                 .withKafkaTopicSelector(topicSelector)
                 .withTridentTupleToKafkaMapper(mapper);
-        state.prepare(config);
+        state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString()));
     }
 
     @Test
@@ -71,7 +66,7 @@ public class TridentKafkaTest {
     }
 
     private List<TridentTuple> generateTupleBatch(String key, String message, int batchsize) {
-        List<TridentTuple> batch = new ArrayList<TridentTuple>();
+        List<TridentTuple> batch = new ArrayList<>();
         for(int i =0 ; i < batchsize; i++) {
             batch.add(TridentTupleView.createFreshTuple(new Fields("key", "message"), key, message));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
index cade6df..b9e25e4 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
@@ -22,7 +22,7 @@ import backtype.storm.LocalCluster;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import storm.kafka.trident.TridentKafkaState;
+import com.google.common.collect.ImmutableMap;
 import storm.kafka.trident.TridentKafkaStateFactory;
 import storm.kafka.trident.TridentKafkaUpdater;
 import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
@@ -31,14 +31,11 @@ import storm.trident.Stream;
 import storm.trident.TridentTopology;
 import storm.trident.testing.FixedBatchSpout;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
-
 public class TridentKafkaTopology {
 
-    private static StormTopology buildTopology() {
+    private static StormTopology buildTopology(String brokerConnectionString) {
         Fields fields = new Fields("word", "count");
         FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                 new Values("storm", "1"),
@@ -51,9 +48,16 @@ public class TridentKafkaTopology {
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
+        Properties props = new Properties();
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
         TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+            .withProducerProperties(props)
+            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
         stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
 
         return topology.build();
@@ -77,24 +81,11 @@ public class TridentKafkaTopology {
             System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
         }
 
-        Config conf = getConfig(args[0]);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("wordCounter", conf, buildTopology());
+        cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
         Thread.sleep(60 * 1000);
         cluster.killTopology("wordCounter");
 
         cluster.shutdown();
     }
-
-    private  static Config getConfig(String brokerConnectionString) {
-        Config conf = new Config();
-        Properties props = new Properties();
-        props.put("bootstrap.servers", brokerConnectionString);
-        props.put("acks", "1");
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
-        return conf;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c1c52735/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 75c24d7..87daab0 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -183,21 +183,19 @@ public class KafkaBoltTest {
     }
 
     private KafkaBolt generateStringSerializerBolt() {
-        KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
         props.put("acks", "1");
         props.put("bootstrap.servers", broker.getBrokerConnectionString());
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("metadata.fetch.timeout.ms", 1000);
-        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(false);
         return bolt;
     }
 
     private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
-        KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
         props.put("acks", "1");
         props.put("bootstrap.servers", broker.getBrokerConnectionString());
@@ -205,7 +203,7 @@ public class KafkaBoltTest {
         props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put("metadata.fetch.timeout.ms", 1000);
         props.put("linger.ms", 0);
-        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(async);
         bolt.setFireAndForget(fireAndForget);