You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jonas P Sandström <jo...@student.chalmers.se> on 2015/04/07 16:54:03 UTC
Partitioning from Storm Trident to Kafka
Hi,
I'm trying to send processed data from Storm Trident to 3 Partitions of a Kafka topic.
However, I cannot figure out how to make Trident write to more than one partition, by selecting the topic with DefaultTopicSelector().
There is a makeState implementation that includes partitionIndex and numPartitions?, but I cannot find an example of how to use it
or set the Map or IMetricsContext.
My question is if it is possible to use TridentKafkaStateFactory for partitioned output or if there is some other entity that
can solve the problem? The ugly solution would be to create more topics and partition the stream thereby.
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));
public State<https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> makeState(Map<http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true> conf,
IMetricsContext<https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html> metrics,
int partitionIndex,
int numPartitions)
Best regards,
Jonas Sandström
SV: Partitioning from Storm Trident to Kafka
Posted by Jonas P Sandström <jo...@student.chalmers.se>.
Hi thanks for the quick answer,
If I understand correctly the producer needs to be configured before use:
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperhost);
props.put("metadata.broker.list", brokerlist);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "0");
props.put("partioner.class","kafka.producer.DefaultPartitioner"); // If I understood right it will produce to all partitions
ProducerConfig config = new ProducerConfig(props);
TridentKafkaState state = new TridentKafkaState();
state.prepare(props);
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));
rePosition.shuffle().parallelismHint(3).partitionPersist(stateFactory, new Fields("key","msg"), new TridentKafkaUpdater(), new Fields())
.parallelismHint(3);
However, I don't know how to put the ProducerConfig configuration to use in the TridentKafkaStateFactory.
Or if you had anything else in mind for producing to Kafka?
Best regards,
Jonas Sandström?
________________________________
Från: Andrew Neilson <ar...@gmail.com>
Skickat: den 7 april 2015 20:23
Till: user@storm.apache.org
Ämne: Re: Partitioning from Storm Trident to Kafka
You'll need to make sure your Kafka producer is configured to partition the way you are expecting when you write to your topic. By default it will publish to the same partition for 10 minutes at a time then switch to a new one. It looks like you are trying to pass a partition key to the producer but the producer needs to be set up to use it.
On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström <jo...@student.chalmers.se>> wrote:
Hi,
I'm trying to send processed data from Storm Trident to 3 Partitions of a Kafka topic.
However, I cannot figure out how to make Trident write to more than one partition, by selecting the topic with DefaultTopicSelector().
There is a makeState implementation that includes partitionIndex and numPartitions?, but I cannot find an example of how to use it
or set the Map or IMetricsContext.
My question is if it is possible to use TridentKafkaStateFactory for partitioned output or if there is some other entity that
can solve the problem? The ugly solution would be to create more topics and partition the stream thereby.
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));
public State<https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> makeState(Map<http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true> conf,
IMetricsContext<https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html> metrics,
int partitionIndex,
int numPartitions)
Best regards,
Jonas Sandström
Re: Partitioning from Storm Trident to Kafka
Posted by Andrew Neilson <ar...@gmail.com>.
You'll need to make sure your Kafka producer is configured to partition the
way you are expecting when you write to your topic. By default it will
publish to the same partition for 10 minutes at a time then switch to a new
one. It looks like you are trying to pass a partition key to the producer
but the producer needs to be set up to use it.
On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström <jo...@student.chalmers.se>
wrote:
> Hi,
>
>
> I'm trying to send processed data from Storm Trident to 3 Partitions of
> a Kafka topic.
>
> However, I cannot figure out how to make Trident write to more than one
> partition, by selecting the topic with DefaultTopicSelector().
>
> There is a makeState implementation that includes partitionIndex and
> numPartitions, but I cannot find an example of how to use it
>
> or set the Map or IMetricsContext.
>
>
> My question is if it is possible to use TridentKafkaStateFactory for
> partitioned output or if there is some other entity that
>
> can solve the problem? The ugly solution would be to create more topics
> and partition the stream thereby.
>
>
>
> @SuppressWarnings("rawtypes")
> TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
> .withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
> .withTridentTupleToKafkaMapper(
> new FieldNameBasedTupleToKafkaMapper("key", "msg"));
>
>
> public State <https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> makeState(Map <http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true> conf,
> IMetricsContext <https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html> metrics,
> int partitionIndex,
> int numPartitions)
>
> Best regards,
>
> Jonas Sandström
>