You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Robert Joseph Evans (JIRA)" <ji...@apache.org> on 2017/02/01 15:57:51 UTC
[jira] [Resolved] (STORM-2225) Kafka New API make simple things
simple
[ https://issues.apache.org/jira/browse/STORM-2225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Joseph Evans resolved STORM-2225.
----------------------------------------
Resolution: Fixed
Fix Version/s: 1.1.0
2.0.0
> Kafka New API make simple things simple
> ---------------------------------------
>
> Key: STORM-2225
> URL: https://issues.apache.org/jira/browse/STORM-2225
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka-client
> Affects Versions: 1.0.0, 2.0.0
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Fix For: 2.0.0, 1.1.0
>
> Time Spent: 24h
> Remaining Estimate: 0h
>
> The Kafka spouts in storm-kafka-client use the new API and are very extendable, but doing very simple things take way too many lines of code.
> For example to create a KafkaTridentSpoutOpaque you need the following code (from the example).
> {code}
> private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
> return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
> newKafkaSpoutConfig(
> newKafkaSpoutStreams())));
> }
> private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
> return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
> kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
> .setOffsetCommitPeriodMs(10_000)
> .setFirstPollOffsetStrategy(EARLIEST)
> .setMaxUncommittedOffsets(250)
> .build();
> }
> protected Map<String,Object> newKafkaConsumerProps() {
> Map<String, Object> props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("max.partition.fetch.bytes", 200);
> return props;
> }
> protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
> return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
> new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
> .build();
> }
> protected KafkaSpoutRetryService newRetryService() {
> return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
> Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
> }
> protected KafkaSpoutStreams newKafkaSpoutStreams() {
> return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new String[]{"test-trident","test-trident-1"}).build();
> }
> protected static class TopicsTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
> public TopicsTupleBuilder(String... topics) {
> super(topics);
> }
> @Override
> public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
> return new Values(consumerRecord.value());
> }
> }
> {code}
> All of this so I can have a trident spout that reads <String, String> values from "localhost:9092" on the topics "test-trident" and "test-trident-1" and outputting the value as the field "str".
> I shouldn't need 50 lines of code for something I can explain in 3 lines of test. It feels like we need to have some better defaults, and less overhead on a lot of these things.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)