You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Robert Joseph Evans (JIRA)" <ji...@apache.org> on 2016/11/29 18:49:58 UTC
[jira] [Created] (STORM-2225) Kafka New API make simple things
simple
Robert Joseph Evans created STORM-2225:
------------------------------------------
Summary: Kafka New API make simple things simple
Key: STORM-2225
URL: https://issues.apache.org/jira/browse/STORM-2225
Project: Apache Storm
Issue Type: Improvement
Components: storm-kafka-client
Affects Versions: 1.0.0, 2.0.0
Reporter: Robert Joseph Evans
Assignee: Robert Joseph Evans
The Kafka spouts in storm-kafka-client use the new API and are very extendable, but doing very simple things take way too many lines of code.
For example to create a KafkaTridentSpoutOpaque you need the following code (from the example).
{code}
private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
newKafkaSpoutConfig(
newKafkaSpoutStreams())));
}
private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.build();
}
protected Map<String,Object> newKafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.partition.fetch.bytes", 200);
return props;
}
protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
.build();
}
protected KafkaSpoutRetryService newRetryService() {
return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
}
protected KafkaSpoutStreams newKafkaSpoutStreams() {
return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new String[]{"test-trident","test-trident-1"}).build();
}
protected static class TopicsTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
public TopicsTupleBuilder(String... topics) {
super(topics);
}
@Override
public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
return new Values(consumerRecord.value());
}
}
{code}
All of this so I can have a trident spout that reads <String, String> values from "localhost:9092" on the topics "test-trident" and "test-trident-1" and outputting the value as the field "str".
I shouldn't need 50 lines of code for something I can explain in 3 lines of test. It feels like we need to have some better defaults, and less overhead on a lot of these things.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)