You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rachana Srivastava <Ra...@markmonitor.com> on 2016/09/14 19:33:38 UTC

Not all KafkaReceivers processing the data Why?

Hello all,

I have created a Kafka topic with 5 partitions.  And I am using createStream receiver API like following.   But somehow only one receiver is getting the input data. Rest of receivers are not processign anything.  Can you please help?

JavaPairDStream<String, String> messages = null;

            if(sparkStreamCount > 0){
                // We create an input DStream for each partition of the topic, unify those streams, and then repartition the unified stream.
                List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
                for (int i = 0; i < sparkStreamCount; i++) {
                                kafkaStreams.add( KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
                }
                messages = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
            }
            else{
                messages =  KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap);
            }



[cid:image001.png@01D20E84.3558F520]





Re: Not all KafkaReceivers processing the data Why?

Posted by Jeff Nadler <jn...@srcginc.com>.
Sure the partitions exist, but is there data in all partitions?   Try the
kafka offset checker:

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper
localhost:2181 -group <my-consumer-group> -topic <my-topic-name>

On Wed, Sep 14, 2016 at 1:00 PM, <Ra...@thomsonreuters.com>
wrote:

> Sure thanks I have removed dev.   I do see all the partitions created
> correctly at Kafka side.
>
>
>
> Topic:CEQReceiver       PartitionCount:5        ReplicationFactor:1
> Configs:
>
>         Topic: CEQReceiver      Partition: 0    Leader: 90      Replicas:
> 90    Isr: 90
>
>         Topic: CEQReceiver      Partition: 1    Leader: 86      Replicas:
> 86    Isr: 86
>
>         Topic: CEQReceiver      Partition: 2    Leader: 87      Replicas:
> 87    Isr: 87
>
>         Topic: CEQReceiver      Partition: 3    Leader: 88      Replicas:
> 88    Isr: 88
>
>         Topic: CEQReceiver      Partition: 4    Leader: 89      Replicas:
> 89    Isr: 89
>
>
>
> *From:* Jeff Nadler [mailto:jnadler@srcginc.com]
> *Sent:* Wednesday, September 14, 2016 12:46 PM
> *To:* Rachana Srivastava
> *Cc:* user@spark.apache.org; dev@spark.apache.org
> *Subject:* Re: Not all KafkaReceivers processing the data Why?
>
>
>
> Have you checked your Kafka brokers to be certain that data is going to
> all 5 partitions?    We use something very similar (but in Scala) and have
> no problems.
>
>
>
> Also you might not get the best response blasting both user+dev lists like
> this.   Normally you'd want to use 'user' only.
>
>
>
> -Jeff
>
>
>
>
>
> On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava <Rachana.Srivastava@
> markmonitor.com> wrote:
>
> Hello all,
>
>
>
> I have created a Kafka topic with 5 partitions.  And I am using
> createStream receiver API like following.   But somehow only one receiver
> is getting the input data. Rest of receivers are not processign anything.
> Can you please help?
>
>
>
> JavaPairDStream<String, String> messages = null;
>
>
>
>             if(sparkStreamCount > 0){
>
>                 // We create an input DStream for each partition of the
> topic, unify those streams, and then repartition the unified stream.
>
>                 List<JavaPairDStream<String, String>> kafkaStreams = new
> ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
>
>                 for (int i = 0; i < sparkStreamCount; i++) {
>
>                                 kafkaStreams.add(
> KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER),
> contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
>
>                 }
>
>                 messages = jssc.union(kafkaStreams.get(0),
> kafkaStreams.subList(1, kafkaStreams.size()));
>
>             }
>
>             else{
>
>                 messages =  KafkaUtils.createStream(jssc,
> contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID),
> kafkaTopicMap);
>
>             }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: Not all KafkaReceivers processing the data Why?

Posted by Jeff Nadler <jn...@srcginc.com>.
Have you checked your Kafka brokers to be certain that data is going to all
5 partitions?    We use something very similar (but in Scala) and have no
problems.

Also you might not get the best response blasting both user+dev lists like
this.   Normally you'd want to use 'user' only.

-Jeff


On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> I have created a Kafka topic with 5 partitions.  And I am using
> createStream receiver API like following.   But somehow only one receiver
> is getting the input data. Rest of receivers are not processign anything.
> Can you please help?
>
>
>
> JavaPairDStream<String, String> messages = null;
>
>
>
>             if(sparkStreamCount > 0){
>
>                 // We create an input DStream for each partition of the
> topic, unify those streams, and then repartition the unified stream.
>
>                 List<JavaPairDStream<String, String>> kafkaStreams = new
> ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
>
>                 for (int i = 0; i < sparkStreamCount; i++) {
>
>                                 kafkaStreams.add(
> KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER),
> contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
>
>                 }
>
>                 messages = jssc.union(kafkaStreams.get(0),
> kafkaStreams.subList(1, kafkaStreams.size()));
>
>             }
>
>             else{
>
>                 messages =  KafkaUtils.createStream(jssc,
> contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID),
> kafkaTopicMap);
>
>             }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: Not all KafkaReceivers processing the data Why?

Posted by Jeremy Smith <je...@acorns.com>.
Take a look at how the messages are actually distributed across the
partitions. If the message keys have a low cardinality, you might get poor
distribution (i.e. all the messages are actually only in two of the five
partitions, leading to what you see in Spark).

If you take a look at the Kafka data directories, you can probably get an
idea of the distribution by just examining the sizes of each partition.

Jeremy

On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> I have created a Kafka topic with 5 partitions.  And I am using
> createStream receiver API like following.   But somehow only one receiver
> is getting the input data. Rest of receivers are not processign anything.
> Can you please help?
>
>
>
> JavaPairDStream<String, String> messages = null;
>
>
>
>             if(sparkStreamCount > 0){
>
>                 // We create an input DStream for each partition of the
> topic, unify those streams, and then repartition the unified stream.
>
>                 List<JavaPairDStream<String, String>> kafkaStreams = new
> ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
>
>                 for (int i = 0; i < sparkStreamCount; i++) {
>
>                                 kafkaStreams.add(
> KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER),
> contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
>
>                 }
>
>                 messages = jssc.union(kafkaStreams.get(0),
> kafkaStreams.subList(1, kafkaStreams.size()));
>
>             }
>
>             else{
>
>                 messages =  KafkaUtils.createStream(jssc,
> contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID),
> kafkaTopicMap);
>
>             }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: Not all KafkaReceivers processing the data Why?

Posted by Jeremy Smith <je...@acorns.com>.
Take a look at how the messages are actually distributed across the
partitions. If the message keys have a low cardinality, you might get poor
distribution (i.e. all the messages are actually only in two of the five
partitions, leading to what you see in Spark).

If you take a look at the Kafka data directories, you can probably get an
idea of the distribution by just examining the sizes of each partition.

Jeremy

On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> I have created a Kafka topic with 5 partitions.  And I am using
> createStream receiver API like following.   But somehow only one receiver
> is getting the input data. Rest of receivers are not processign anything.
> Can you please help?
>
>
>
> JavaPairDStream<String, String> messages = null;
>
>
>
>             if(sparkStreamCount > 0){
>
>                 // We create an input DStream for each partition of the
> topic, unify those streams, and then repartition the unified stream.
>
>                 List<JavaPairDStream<String, String>> kafkaStreams = new
> ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
>
>                 for (int i = 0; i < sparkStreamCount; i++) {
>
>                                 kafkaStreams.add(
> KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER),
> contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
>
>                 }
>
>                 messages = jssc.union(kafkaStreams.get(0),
> kafkaStreams.subList(1, kafkaStreams.size()));
>
>             }
>
>             else{
>
>                 messages =  KafkaUtils.createStream(jssc,
> contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID),
> kafkaTopicMap);
>
>             }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>