You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Ramanan, Buvana (Nokia - US/Murray Hill)" <bu...@nokia-bell-labs.com> on 2017/10/30 17:53:06 UTC

executors processing tasks sequentially

Hi All,

Using Spark 2.2.0 on YARN cluster.

I am running the Kafka Direct Stream wordcount example code (pasted below my signature). My topic consists of 400 partitions. And the Spark Job tracker page shows 26 executors to process the corresponding 400 tasks.

When I check the execution timeline for each job (=2 sec microbatch worth of records), it shows the tasks to be executed serially by the executors. I attach a screen shot for reference (shows only two out of the 26 executors).

I increased the total-executor-cores to 200, in hopes that it would show me 4 tasks to be processed in parallel by each executor. Still the behavior continues.
Ran the scala wordcount example that uses Direct Kafka Stream (supposedly using kafka010), reading from the same topic. Once again, I see the tasks to be serially executed and not in parallel within an executor.

Can someone please explain why the executor processes the tasks serially? Is it expected? Does it have something to do with YARN?

Thanks,
Buvana

rom __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# This keeps a running count of the total service.instances
def updateFunc(newValues,runningCount):
    if runningCount is None:
        runningCount=0
    return sum(newValues,runningCount)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="Wordcount_python_DS_Kafka")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.map(lambda line: line.split("|")[1]).flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
ssc.awaitTermination()