You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hrishikesh Mishra <sd...@gmail.com> on 2020/02/29 04:05:19 UTC

In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

I have created one sample Direct Kafka Stream in Spark. Kafka has 30
partitions of given topic. But all consumers are executing from same
executor machine.

Kafka Manager screenshot.
[image: Screenshot 2020-02-28 at 7.06.49 PM 2.png]

As per my understanding in direct Kafka Stream, Drive gives the offsets to
executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));

Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");



Regards,

Hrishi

Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

Posted by Hrishikesh Mishra <sd...@gmail.com>.
Thanks Zhang,

I know *spark.streaming.concurrentJobs *provides parallelism but It has
adverse affect as mentioned here https://stackoverflow.com/a/23533736. I
don't know is still valid with Spark 2.4.




On Fri, Mar 6, 2020 at 7:59 AM Zhang Victor <zh...@outlook.com>
wrote:

> Hi Hrishi.
>
> I tested using multiple Kafka Streams.
>
> When the *number of executor * cores is greater than the number of topic
> partitions* and spark.streaming.concurrentJob> 1, it is possible to
> execute jobs concurrently.
>
> For example, stream1 -> topicA with 1 partitions and stream2 -> topicB
> with 2 partitions.
>
> And set spark.streaming.concurrentJob=2.
>
>
>
>
>
> ------------------------------
> *发件人:* Hrishikesh Mishra <sd...@gmail.com>
> *发送时间:* 2020年3月4日 0:35
> *收件人:* Zhang Victor <zh...@outlook.com>
> *抄送:* user@spark.apache.org <us...@spark.apache.org>
> *主题:* Re: In Spark Streaming, Direct Kafak Consumers are not evenly
> distrubuted across executors
>
> Thanks Zhang.
>
> You are right. The driver is committing on Kafka that's why single
> consumer IP is coming on Kafka manager. Actually, in one spark context we
> are starting multiple Kafka steam, but Driver is executing them
> sequentially, not in parallel. While debugging this, I found this issue and
> suspected that everything is happening in the driver. But now it clear,
> even I enabled debug log on executors where KafkaRDD was fetching events
> from Kafka for given offsets.
>
>
> Second thing, where can I get some insight that why all different Kafka
> streams of a Spark context are being executed sequentially. I found
> *spark.streaming.concurrentJob *config to run job parallel but I read on stack
> overflow
> <https://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming?answertab=active#tab-top>that
> it has some adverse effect.
>
>
>
>
>
>
> On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <zh...@outlook.com>
> wrote:
>
> Hi Hrishi.
>
> I guess your code is similar to the following。
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> The action of submitting the offset occurs on the driver side。
>
> Spark calculates the records that should be consumed by each topic
> partition in the current batch on the driver side, and then the tasks on
> each executor actually consume the corresponding partitions.
>
> You can check if the ip is the node address where the driver is located。
>
> ------------------------------
> *发件人:* Hrishikesh Mishra <sd...@gmail.com>
> *发送时间:* 2020年2月29日 12:05
> *收件人:* user@spark.apache.org <us...@spark.apache.org>
> *主题:* In Spark Streaming, Direct Kafak Consumers are not evenly
> distrubuted across executors
>
>
> I have created one sample Direct Kafka Stream in Spark. Kafka has 30
> partitions of given topic. But all consumers are executing from same
> executor machine.
>
> Kafka Manager screenshot.
> [image: Screenshot 2020-02-28 at 7.06.49 PM 2.png]
>
> As per my understanding in direct Kafka Stream, Drive gives the offsets to
> executors and polls with this.
>
> Kafka Stream
>
>         HashMap<String, Object> kafkaParams = new HashMap<>();
>         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
>         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
>         kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
>         kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
>
>         KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
>
> Spark Version: 2.4
>
> Spark Config
>
>         SparkConf conf = new SparkConf().setAppName("StreamingTest");
>         conf.set("spark.shuffle.service.enabled", "true");
>         conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
>         conf.set("spark.streaming.backpressure.enabled", "true");
>         conf.set("spark.streaming.concurrentJobs", "1");
>         conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
>         conf.set("spark.streaming.backpressure.pid.minRate", "1500");
>
>
>
> Regards,
>
> Hrishi
>
>

回复: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

Posted by Zhang Victor <zh...@outlook.com>.
Hi Hrishi.

I tested using multiple Kafka Streams.

When the number of executor * cores is greater than the number of topic partitions and spark.streaming.concurrentJob> 1, it is possible to execute jobs concurrently.

For example, stream1 -> topicA with 1 partitions and stream2 -> topicB with 2 partitions.

And set spark.streaming.concurrentJob=2.


[cid:9c2cbc1b-f274-427d-af12-6e2a69ce4ce2]


________________________________
发件人: Hrishikesh Mishra <sd...@gmail.com>
发送时间: 2020年3月4日 0:35
收件人: Zhang Victor <zh...@outlook.com>
抄送: user@spark.apache.org <us...@spark.apache.org>
主题: Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

Thanks Zhang.


You are right. The driver is committing on Kafka that's why single consumer IP is coming on Kafka manager. Actually, in one spark context we are starting multiple Kafka steam, but Driver is executing them sequentially, not in parallel. While debugging this, I found this issue and suspected that everything is happening in the driver. But now it clear, even I enabled debug log on executors where KafkaRDD was fetching events from Kafka for given offsets.


Second thing, where can I get some insight that why all different Kafka streams of a Spark context are being executed sequentially. I found spark.streaming.concurrentJob config to run job parallel but I read on stack overflow <https://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming?answertab=active#tab-top> that it has some adverse effect.





On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <zh...@outlook.com>> wrote:
Hi Hrishi.

I guess your code is similar to the following。


stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The action of submitting the offset occurs on the driver side。

Spark calculates the records that should be consumed by each topic partition in the current batch on the driver side, and then the tasks on each executor actually consume the corresponding partitions.

You can check if the ip is the node address where the driver is located。

________________________________
发件人: Hrishikesh Mishra <sd...@gmail.com>>
发送时间: 2020年2月29日 12:05
收件人: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
主题: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors


I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine.

Kafka Manager screenshot.

[Screenshot 2020-02-28 at 7.06.49 PM 2.png]

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));


Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");




Regards,

Hrishi

Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

Posted by Hrishikesh Mishra <sd...@gmail.com>.
Thanks Zhang.

You are right. The driver is committing on Kafka that's why single consumer
IP is coming on Kafka manager. Actually, in one spark context we are
starting multiple Kafka steam, but Driver is executing them sequentially,
not in parallel. While debugging this, I found this issue and suspected
that everything is happening in the driver. But now it clear, even I
enabled debug log on executors where KafkaRDD was fetching events from
Kafka for given offsets.


Second thing, where can I get some insight that why all different Kafka
streams of a Spark context are being executed sequentially. I found
*spark.streaming.concurrentJob *config to run job parallel but I read on stack
overflow
<https://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming?answertab=active#tab-top>that
it has some adverse effect.






On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <zh...@outlook.com>
wrote:

> Hi Hrishi.
>
> I guess your code is similar to the following。
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> The action of submitting the offset occurs on the driver side。
>
> Spark calculates the records that should be consumed by each topic
> partition in the current batch on the driver side, and then the tasks on
> each executor actually consume the corresponding partitions.
>
> You can check if the ip is the node address where the driver is located。
>
> ------------------------------
> *发件人:* Hrishikesh Mishra <sd...@gmail.com>
> *发送时间:* 2020年2月29日 12:05
> *收件人:* user@spark.apache.org <us...@spark.apache.org>
> *主题:* In Spark Streaming, Direct Kafak Consumers are not evenly
> distrubuted across executors
>
>
> I have created one sample Direct Kafka Stream in Spark. Kafka has 30
> partitions of given topic. But all consumers are executing from same
> executor machine.
>
> Kafka Manager screenshot.
> [image: Screenshot 2020-02-28 at 7.06.49 PM 2.png]
>
> As per my understanding in direct Kafka Stream, Drive gives the offsets to
> executors and polls with this.
>
> Kafka Stream
>
>         HashMap<String, Object> kafkaParams = new HashMap<>();
>         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
>         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
>         kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
>         kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
>
>         KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
>
> Spark Version: 2.4
>
> Spark Config
>
>         SparkConf conf = new SparkConf().setAppName("StreamingTest");
>         conf.set("spark.shuffle.service.enabled", "true");
>         conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
>         conf.set("spark.streaming.backpressure.enabled", "true");
>         conf.set("spark.streaming.concurrentJobs", "1");
>         conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
>         conf.set("spark.streaming.backpressure.pid.minRate", "1500");
>
>
>
> Regards,
>
> Hrishi
>
>

回复: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors

Posted by Zhang Victor <zh...@outlook.com>.
Hi Hrishi.

I guess your code is similar to the following。


stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The action of submitting the offset occurs on the driver side。

Spark calculates the records that should be consumed by each topic partition in the current batch on the driver side, and then the tasks on each executor actually consume the corresponding partitions.

You can check if the ip is the node address where the driver is located。

________________________________
发件人: Hrishikesh Mishra <sd...@gmail.com>
发送时间: 2020年2月29日 12:05
收件人: user@spark.apache.org <us...@spark.apache.org>
主题: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors


I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine.

Kafka Manager screenshot.

[Screenshot 2020-02-28 at 7.06.49 PM 2.png]

As per my understanding in direct Kafka Stream, Drive gives the offsets to executors and polls with this.

Kafka Stream

        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));


Spark Version: 2.4

Spark Config

        SparkConf conf = new SparkConf().setAppName("StreamingTest");
        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");




Regards,

Hrishi