You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jonas P Sandström <jo...@student.chalmers.se> on 2015/04/07 16:54:03 UTC

Partitioning from Storm Trident to Kafka

Hi,


I'm trying to send processed data from Storm Trident to 3 Partitions of a Kafka topic.

However, I cannot figure out how to make Trident write to more than one partition, by selecting the topic with DefaultTopicSelector().

There is a makeState implementation that includes  partitionIndex and numPartitions?, but I cannot find an example of how to use it

or set the Map or IMetricsContext.


My question is if it is possible to use TridentKafkaStateFactory for partitioned output or if there is some other entity that

can solve the problem? The ugly solution would be to create more topics and partition the stream thereby.



@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));


public State<https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> makeState(Map<http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true> conf,
                       IMetricsContext<https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html> metrics,
                       int partitionIndex,
                       int numPartitions)

Best regards,

Jonas Sandström

SV: Partitioning from Storm Trident to Kafka

Posted by Jonas P Sandström <jo...@student.chalmers.se>.
Hi thanks for the quick answer,

If I understand correctly the producer needs to be configured before use:


Properties props = new Properties();
props.put("zookeeper.connect", zookeeperhost);
props.put("metadata.broker.list", brokerlist);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "0");
props.put("partioner.class","kafka.producer.DefaultPartitioner"); // If I understood right it will produce to all partitions
ProducerConfig config = new ProducerConfig(props);
TridentKafkaState state = new TridentKafkaState();
state.prepare(props);
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));

rePosition.shuffle().parallelismHint(3).partitionPersist(stateFactory, new Fields("key","msg"), new TridentKafkaUpdater(), new Fields())
.parallelismHint(3);

However, I don't know how to put the ProducerConfig configuration to use in the TridentKafkaStateFactory.
Or if you had anything else in mind for producing to Kafka?



Best regards,

Jonas Sandström?

________________________________
Från: Andrew Neilson <ar...@gmail.com>
Skickat: den 7 april 2015 20:23
Till: user@storm.apache.org
Ämne: Re: Partitioning from Storm Trident to Kafka

You'll need to make sure your Kafka producer is configured to partition the way you are expecting when you write to your topic. By default it will publish to the same partition for 10 minutes at a time then switch to a new one. It looks like you are trying to pass a partition key to the producer but the producer needs to be set up to use it.

On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström <jo...@student.chalmers.se>> wrote:

Hi,


I'm trying to send processed data from Storm Trident to 3 Partitions of a Kafka topic.

However, I cannot figure out how to make Trident write to more than one partition, by selecting the topic with DefaultTopicSelector().

There is a makeState implementation that includes  partitionIndex and numPartitions?, but I cannot find an example of how to use it

or set the Map or IMetricsContext.


My question is if it is possible to use TridentKafkaStateFactory for partitioned output or if there is some other entity that

can solve the problem? The ugly solution would be to create more topics and partition the stream thereby.



@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));


public State<https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> makeState(Map<http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true> conf,
                       IMetricsContext<https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html> metrics,
                       int partitionIndex,
                       int numPartitions)

Best regards,

Jonas Sandström

Re: Partitioning from Storm Trident to Kafka

Posted by Andrew Neilson <ar...@gmail.com>.
You'll need to make sure your Kafka producer is configured to partition the
way you are expecting when you write to your topic. By default it will
publish to the same partition for 10 minutes at a time then switch to a new
one. It looks like you are trying to pass a partition key to the producer
but the producer needs to be set up to use it.

On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström <jo...@student.chalmers.se>
wrote:

>  Hi,
>
>
>  I'm trying to send processed data from Storm Trident to 3 Partitions of
> a Kafka topic.
>
> However, I cannot figure out how to make Trident write to more than one
> partition, by selecting the topic with DefaultTopicSelector().
>
> There is a makeState implementation that includes  partitionIndex and
> numPartitions​, but I cannot find an example of how to use it
>
> or set the Map or IMetricsContext.
>
>
>  My question is if it is possible to use TridentKafkaStateFactory for
> partitioned output or if there is some other entity that
>
> can solve the problem? The ugly solution would be to create more topics
> and partition the stream thereby.
>
>
>
>  @SuppressWarnings("rawtypes")
> TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
> .withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
>  .withTridentTupleToKafkaMapper(
> new FieldNameBasedTupleToKafkaMapper("key", "msg"));
>
>
>  public State <https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> makeState(Map <http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true> conf,
>                        IMetricsContext <https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html> metrics,
>                        int partitionIndex,
>                        int numPartitions)
>
> Best regards,
>
> Jonas Sandström
>