You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Bill Jay <bi...@gmail.com> on 2014/07/10 01:05:18 UTC

Number of executors change during job running

Hi all,

I have a Spark streaming job running on yarn. It consume data from Kafka
and group the data by a certain field. The data size is 480k lines per
minute where the batch size is 1 minute.

For some batches, the program sometimes take more than 3 minute to finish
the groupBy operation, which seems slow to me. I allocated 300 workers and
specify 300 as the partition number for groupby. When I checked the slow
stage *"combineByKey at ShuffledDStream.scala:42",* there are sometimes 2
executors allocated for this stage. However, during other batches, the
executors can be several hundred for the same stage, which means the number
of executors for the same operations change.

Does anyone know how Spark allocate the number of executors for different
stages and how to increase the efficiency for task? Thanks!

Bill

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Praveen,

I did not change the number of total executors. I specified 300 as the
number of executors when I submitted the jobs. However, for some stages,
the number of executors is very small, leading to long calculation time
even for small data set. That means not all executors were used for some
stages.

If I went to the detail of the running time of different executors, I found
some of them had very low running time while very few had very long running
time, leading to long overall running time. Another point I noticed is that
the number of completed tasks are usually larger than the number of total
tasks. That means sometimes the job is still running in some stages
although all the tasks have been finished. These are the too behavior I
observed that may related to the wrong running time.

Bill


On Thu, Jul 10, 2014 at 11:26 PM, Praveen Seluka <ps...@qubole.com> wrote:

> If I understand correctly, you could not change the number of executors at
> runtime right(correct me if am wrong) - its defined when we start the
> application and fixed. Do you mean number of tasks?
>
>
> On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Can you try setting the number-of-partitions in all the shuffle-based
>> DStream operations, explicitly. It may be the case that the default
>> parallelism (that is, spark.default.parallelism) is probably not being
>> respected.
>>
>> Regarding the unusual delay, I would look at the task details of that
>> stage in the Spark web ui. It will show break of time for each task,
>> including GC times, etc. That might give some indication.
>>
>> TD
>>
>>
>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> I set default parallelism as 300 in my configuration file. Sometimes
>>> there are more executors in a job. However, it is still slow. And I further
>>> observed that most executors take less than 20 seconds but two of them take
>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>> lines with only 4 fields). I am not sure why the group by operation takes
>>> more then 3 minutes.  Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> Are you specifying the number of reducers in all the DStream.****ByKey
>>>> operations? If the reduce by key is not set, then the number of reducers
>>>> used in the stages can keep changing across batches.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>>> per minute where the batch size is 1 minute.
>>>>>
>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
>>>>> are sometimes 2 executors allocated for this stage. However, during other
>>>>> batches, the executors can be several hundred for the same stage, which
>>>>> means the number of executors for the same operations change.
>>>>>
>>>>> Does anyone know how Spark allocate the number of executors for
>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>
>>>>> Bill
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Praveen Seluka <ps...@qubole.com>.
If I understand correctly, you could not change the number of executors at
runtime right(correct me if am wrong) - its defined when we start the
application and fixed. Do you mean number of tasks?


On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das <ta...@gmail.com>
wrote:

> Can you try setting the number-of-partitions in all the shuffle-based
> DStream operations, explicitly. It may be the case that the default
> parallelism (that is, spark.default.parallelism) is probably not being
> respected.
>
> Regarding the unusual delay, I would look at the task details of that
> stage in the Spark web ui. It will show break of time for each task,
> including GC times, etc. That might give some indication.
>
> TD
>
>
> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> I set default parallelism as 300 in my configuration file. Sometimes
>> there are more executors in a job. However, it is still slow. And I further
>> observed that most executors take less than 20 seconds but two of them take
>> much longer such as 2 minutes. The data size is very small (less than 480k
>> lines with only 4 fields). I am not sure why the group by operation takes
>> more then 3 minutes.  Thanks!
>>
>> Bill
>>
>>
>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Are you specifying the number of reducers in all the DStream.****ByKey
>>> operations? If the reduce by key is not set, then the number of reducers
>>> used in the stages can keep changing across batches.
>>>
>>> TD
>>>
>>>
>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a Spark streaming job running on yarn. It consume data from
>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>> per minute where the batch size is 1 minute.
>>>>
>>>> For some batches, the program sometimes take more than 3 minute to
>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there are
>>>> sometimes 2 executors allocated for this stage. However, during other
>>>> batches, the executors can be several hundred for the same stage, which
>>>> means the number of executors for the same operations change.
>>>>
>>>> Does anyone know how Spark allocate the number of executors for
>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>
>>>> Bill
>>>>
>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Vikash Pareek <vi...@infoobjects.com>.
Hi Bill,

You can try DirectStream and increase # of partition to kafka. then input
Dstream will have the partitions as per kafka topic without using
re-partitioning.

Can you please share your event timeline chart from spark ui. You need to
tune your configuration as per computation. Spark ui will give deeper
understanding of the problem.

Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-change-during-job-running-tp9243p26866.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

I have tried the repartition method. The reduce stage first had 2 executors
and then it had around 85 executors. I specified repartition(300) and each
of the executors were specified 2 cores when I submitted the job. This
shows repartition works to increase more executors. However, the running
time was still around 50 seconds although I only did a simple groupby
operation. I think repartition may consume part of the running time.
Considering the input source of Kafka, is there a way to make the program
even faster? Thanks!


On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Can you give me a screen shot of the stages page in the web ui, the spark
> logs, and the code that is causing this behavior. This seems quite weird to
> me.
>
> TD
>
>
> On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> It seems repartition does not necessarily force Spark to distribute the
>> data into different executors. I have launched a new job which uses
>> repartition right after I received data from Kafka. For the first two
>> batches, the reduce stage used more than 80 executors. Starting from the
>> third batch, there were always only 2 executors in the reduce task
>> (combineByKey). Even with the first batch which used more than 80
>> executors, it took 2.4 mins to finish the reduce stage for a very small
>> amount of data.
>>
>> Bill
>>
>>
>> On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> After using repartition(300), how many executors did it run on? By the
>>> way, repartitions(300) means it will divide the shuffled data into 300
>>> partitions. Since there are many cores on each of the 300
>>> machines/executors, these partitions (each requiring a core) may not be
>>> spread all 300 executors. Hence, if you really want spread it all 300
>>> executors, you may have to bump up the partitions even more. However,
>>> increasing the partitions to too high may not be beneficial, and you will
>>> have play around with the number to figure out sweet spot that reduces the
>>> time to process the stage / time to process the whole batch.
>>>
>>> TD
>>>
>>>
>>> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> Do you mean that the data is not shuffled until the reduce stage? That
>>>> means groupBy still only uses 2 machines?
>>>>
>>>> I think I used repartition(300) after I read the data from Kafka into
>>>> DStream. It seems that it did not guarantee that the map or reduce stages
>>>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>>>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>>>> around 80 machines for all the batches. However, this method will introduce
>>>> many dstreams. It will be good if we can control the number of executors in
>>>> the groupBy operation because the calculation needs to be finished within 1
>>>> minute for different size of input data based on our production need.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> Aah, I get it now. That is because the input data streams is
>>>>> replicated on two machines, so by locality the data is processed on those
>>>>> two machines. So the "map" stage on the data uses 2 executors, but the
>>>>> "reduce" stage, (after groupByKey) the saveAsTextFiles would use 300 tasks.
>>>>> And the default parallelism takes into affect only when the data is
>>>>> explicitly shuffled around.
>>>>>
>>>>> You can fix this by explicitly repartitioning the data.
>>>>>
>>>>> inputDStream.repartition(partitions)
>>>>>
>>>>> This is covered in the streaming tuning guide
>>>>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
>>>>> .
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi folks,
>>>>>>
>>>>>> I just ran another job that only received data from Kafka, did some
>>>>>> filtering, and then save as text files in HDFS. There was no reducing work
>>>>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>>>>> stage was also 2 although I specified 300 executors in the job submission.
>>>>>> As a result, the simple save file action took more than 2 minutes. Do you
>>>>>> have any idea how Spark determined the number of executors
>>>>>> for different stages?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bill.jaypeterson@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Tathagata,
>>>>>>>
>>>>>>> Below is my main function. I omit some filtering and data conversion
>>>>>>> functions. These functions are just a one-to-one mapping, which may not
>>>>>>> possible increase running time. The only reduce function I have here is
>>>>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>>>>>> have 240k lines each minute. And the other two topics have less than 30k
>>>>>>> lines per minute. The batch size is one minute and I specified 300
>>>>>>> executors in my spark-submit script. The default parallelism is 300.
>>>>>>>
>>>>>>>
>>>>>>>     val parition = 300
>>>>>>>     val zkQuorum = "zk1,zk2,zk3"
>>>>>>>     val group = "my-group-" + currentTime.toString
>>>>>>>     val topics = "topic1,topic2,topic3,topic4"
>>>>>>>     val numThreads = 4
>>>>>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>>>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>>>>>     val lines = lines1
>>>>>>>     lines.cache()
>>>>>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>>>>>     val mapData = jsonData.filter(_.isDefined)
>>>>>>>
>>>>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>>>>>     val validMapData = mapData.filter(isValidData(_))
>>>>>>>     val fields = validMapData.map(data => (data("id").toString,
>>>>>>> timestampToUTCUnix(data("time").toString),
>>>>>>>
>>>>>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>>>>>                                            data("id3").toString,
>>>>>>> data("log_type").toString, data("sub_log_type").toString))
>>>>>>>     val timeDiff = 3600L
>>>>>>>     val filteredFields = fields.filter(field => abs(field._2 -
>>>>>>> field._3) <= timeDiff)
>>>>>>>
>>>>>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>>>>>> fields._2, fields._4, fields._5, fields._7))
>>>>>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>>>>>> getWatchtimeTuple(fields))
>>>>>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>>>>>> fields._1)).groupByKey(partition)
>>>>>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>>>>>> value.toSet.size)}
>>>>>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>>>>>
>>>>>>> I have been working on this for several days. No findings why there
>>>>>>> are always 2 executors for the groupBy stage. Thanks a lot!
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>
>>>>>>>> Can you show us the program that you are running. If you are
>>>>>>>> setting number of partitions in the XYZ-ByKey operation as 300, then there
>>>>>>>> should be 300 tasks for that stage, distributed on the 50 executors are
>>>>>>>> allocated to your context. However the data distribution may be skewed in
>>>>>>>> which case, you can use a repartition operation to redistributed the data
>>>>>>>> more evenly (both DStream and RDD have repartition).
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <
>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Tathagata,
>>>>>>>>>
>>>>>>>>> I also tried to use the number of partitions as parameters to the
>>>>>>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>>>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>>>>>> submission script. Moreover, the running time of different executors is
>>>>>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>>>>>> executors evenly so that the computation can be efficiently finished. I am
>>>>>>>>> not sure how to achieve this.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Can you try setting the number-of-partitions in all the
>>>>>>>>>> shuffle-based DStream operations, explicitly. It may be the case that the
>>>>>>>>>> default parallelism (that is, spark.default.parallelism) is probably not
>>>>>>>>>> being respected.
>>>>>>>>>>
>>>>>>>>>> Regarding the unusual delay, I would look at the task details of
>>>>>>>>>> that stage in the Spark web ui. It will show break of time for each task,
>>>>>>>>>> including GC times, etc. That might give some indication.
>>>>>>>>>>
>>>>>>>>>> TD
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <
>>>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Tathagata,
>>>>>>>>>>>
>>>>>>>>>>> I set default parallelism as 300 in my configuration file.
>>>>>>>>>>> Sometimes there are more executors in a job. However, it is still slow. And
>>>>>>>>>>> I further observed that most executors take less than 20 seconds but two of
>>>>>>>>>>> them take much longer such as 2 minutes. The data size is very small (less
>>>>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by
>>>>>>>>>>> operation takes more then 3 minutes.  Thanks!
>>>>>>>>>>>
>>>>>>>>>>> Bill
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>>>>>>
>>>>>>>>>>>> TD
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data
>>>>>>>>>>>>> from Kafka and group the data by a certain field. The data size is 480k
>>>>>>>>>>>>> lines per minute where the batch size is 1 minute.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For some batches, the program sometimes take more than 3
>>>>>>>>>>>>> minute to finish the groupBy operation, which seems slow to me. I allocated
>>>>>>>>>>>>> 300 workers and specify 300 as the partition number for groupby. When I
>>>>>>>>>>>>> checked the slow stage *"combineByKey at
>>>>>>>>>>>>> ShuffledDStream.scala:42",* there are sometimes 2 executors
>>>>>>>>>>>>> allocated for this stage. However, during other batches, the executors can
>>>>>>>>>>>>> be several hundred for the same stage, which means the number of executors
>>>>>>>>>>>>> for the same operations change.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does anyone know how Spark allocate the number of executors
>>>>>>>>>>>>> for different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Tathagata Das <ta...@gmail.com>.
Can you give me a screen shot of the stages page in the web ui, the spark
logs, and the code that is causing this behavior. This seems quite weird to
me.

TD


On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> It seems repartition does not necessarily force Spark to distribute the
> data into different executors. I have launched a new job which uses
> repartition right after I received data from Kafka. For the first two
> batches, the reduce stage used more than 80 executors. Starting from the
> third batch, there were always only 2 executors in the reduce task
> (combineByKey). Even with the first batch which used more than 80
> executors, it took 2.4 mins to finish the reduce stage for a very small
> amount of data.
>
> Bill
>
>
> On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> After using repartition(300), how many executors did it run on? By the
>> way, repartitions(300) means it will divide the shuffled data into 300
>> partitions. Since there are many cores on each of the 300
>> machines/executors, these partitions (each requiring a core) may not be
>> spread all 300 executors. Hence, if you really want spread it all 300
>> executors, you may have to bump up the partitions even more. However,
>> increasing the partitions to too high may not be beneficial, and you will
>> have play around with the number to figure out sweet spot that reduces the
>> time to process the stage / time to process the whole batch.
>>
>> TD
>>
>>
>> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Do you mean that the data is not shuffled until the reduce stage? That
>>> means groupBy still only uses 2 machines?
>>>
>>> I think I used repartition(300) after I read the data from Kafka into
>>> DStream. It seems that it did not guarantee that the map or reduce stages
>>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>>> around 80 machines for all the batches. However, this method will introduce
>>> many dstreams. It will be good if we can control the number of executors in
>>> the groupBy operation because the calculation needs to be finished within 1
>>> minute for different size of input data based on our production need.
>>>
>>> Thanks!
>>>
>>>
>>> Bill
>>>
>>>
>>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> Aah, I get it now. That is because the input data streams is replicated
>>>> on two machines, so by locality the data is processed on those two
>>>> machines. So the "map" stage on the data uses 2 executors, but the "reduce"
>>>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
>>>> default parallelism takes into affect only when the data is explicitly
>>>> shuffled around.
>>>>
>>>> You can fix this by explicitly repartitioning the data.
>>>>
>>>> inputDStream.repartition(partitions)
>>>>
>>>> This is covered in the streaming tuning guide
>>>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
>>>> .
>>>>
>>>> TD
>>>>
>>>>
>>>>
>>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi folks,
>>>>>
>>>>> I just ran another job that only received data from Kafka, did some
>>>>> filtering, and then save as text files in HDFS. There was no reducing work
>>>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>>>> stage was also 2 although I specified 300 executors in the job submission.
>>>>> As a result, the simple save file action took more than 2 minutes. Do you
>>>>> have any idea how Spark determined the number of executors
>>>>> for different stages?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Bill
>>>>>
>>>>>
>>>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> Below is my main function. I omit some filtering and data conversion
>>>>>> functions. These functions are just a one-to-one mapping, which may not
>>>>>> possible increase running time. The only reduce function I have here is
>>>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>>>>> have 240k lines each minute. And the other two topics have less than 30k
>>>>>> lines per minute. The batch size is one minute and I specified 300
>>>>>> executors in my spark-submit script. The default parallelism is 300.
>>>>>>
>>>>>>
>>>>>>     val parition = 300
>>>>>>     val zkQuorum = "zk1,zk2,zk3"
>>>>>>     val group = "my-group-" + currentTime.toString
>>>>>>     val topics = "topic1,topic2,topic3,topic4"
>>>>>>     val numThreads = 4
>>>>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>>>>     val lines = lines1
>>>>>>     lines.cache()
>>>>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>>>>     val mapData = jsonData.filter(_.isDefined)
>>>>>>
>>>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>>>>     val validMapData = mapData.filter(isValidData(_))
>>>>>>     val fields = validMapData.map(data => (data("id").toString,
>>>>>> timestampToUTCUnix(data("time").toString),
>>>>>>
>>>>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>>>>                                            data("id3").toString,
>>>>>> data("log_type").toString, data("sub_log_type").toString))
>>>>>>     val timeDiff = 3600L
>>>>>>     val filteredFields = fields.filter(field => abs(field._2 -
>>>>>> field._3) <= timeDiff)
>>>>>>
>>>>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>>>>> fields._2, fields._4, fields._5, fields._7))
>>>>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>>>>> getWatchtimeTuple(fields))
>>>>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>>>>> fields._1)).groupByKey(partition)
>>>>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>>>>> value.toSet.size)}
>>>>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>>>>
>>>>>> I have been working on this for several days. No findings why there
>>>>>> are always 2 executors for the groupBy stage. Thanks a lot!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>>> Can you show us the program that you are running. If you are setting
>>>>>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>>>>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>>>>>> to your context. However the data distribution may be skewed in which case,
>>>>>>> you can use a repartition operation to redistributed the data more evenly
>>>>>>> (both DStream and RDD have repartition).
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <
>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Tathagata,
>>>>>>>>
>>>>>>>> I also tried to use the number of partitions as parameters to the
>>>>>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>>>>> submission script. Moreover, the running time of different executors is
>>>>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>>>>> executors evenly so that the computation can be efficiently finished. I am
>>>>>>>> not sure how to achieve this.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Can you try setting the number-of-partitions in all the
>>>>>>>>> shuffle-based DStream operations, explicitly. It may be the case that the
>>>>>>>>> default parallelism (that is, spark.default.parallelism) is probably not
>>>>>>>>> being respected.
>>>>>>>>>
>>>>>>>>> Regarding the unusual delay, I would look at the task details of
>>>>>>>>> that stage in the Spark web ui. It will show break of time for each task,
>>>>>>>>> including GC times, etc. That might give some indication.
>>>>>>>>>
>>>>>>>>> TD
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <
>>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Tathagata,
>>>>>>>>>>
>>>>>>>>>> I set default parallelism as 300 in my configuration file.
>>>>>>>>>> Sometimes there are more executors in a job. However, it is still slow. And
>>>>>>>>>> I further observed that most executors take less than 20 seconds but two of
>>>>>>>>>> them take much longer such as 2 minutes. The data size is very small (less
>>>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by
>>>>>>>>>> operation takes more then 3 minutes.  Thanks!
>>>>>>>>>>
>>>>>>>>>> Bill
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>>>>>
>>>>>>>>>>> TD
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data
>>>>>>>>>>>> from Kafka and group the data by a certain field. The data size is 480k
>>>>>>>>>>>> lines per minute where the batch size is 1 minute.
>>>>>>>>>>>>
>>>>>>>>>>>> For some batches, the program sometimes take more than 3 minute
>>>>>>>>>>>> to finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",*
>>>>>>>>>>>> there are sometimes 2 executors allocated for this stage. However, during
>>>>>>>>>>>> other batches, the executors can be several hundred for the same stage,
>>>>>>>>>>>> which means the number of executors for the same operations change.
>>>>>>>>>>>>
>>>>>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> Bill
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

It seems repartition does not necessarily force Spark to distribute the
data into different executors. I have launched a new job which uses
repartition right after I received data from Kafka. For the first two
batches, the reduce stage used more than 80 executors. Starting from the
third batch, there were always only 2 executors in the reduce task
(combineByKey). Even with the first batch which used more than 80
executors, it took 2.4 mins to finish the reduce stage for a very small
amount of data.

Bill


On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <tathagata.das1565@gmail.com
> wrote:

> After using repartition(300), how many executors did it run on? By the
> way, repartitions(300) means it will divide the shuffled data into 300
> partitions. Since there are many cores on each of the 300
> machines/executors, these partitions (each requiring a core) may not be
> spread all 300 executors. Hence, if you really want spread it all 300
> executors, you may have to bump up the partitions even more. However,
> increasing the partitions to too high may not be beneficial, and you will
> have play around with the number to figure out sweet spot that reduces the
> time to process the stage / time to process the whole batch.
>
> TD
>
>
> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> Do you mean that the data is not shuffled until the reduce stage? That
>> means groupBy still only uses 2 machines?
>>
>> I think I used repartition(300) after I read the data from Kafka into
>> DStream. It seems that it did not guarantee that the map or reduce stages
>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>> around 80 machines for all the batches. However, this method will introduce
>> many dstreams. It will be good if we can control the number of executors in
>> the groupBy operation because the calculation needs to be finished within 1
>> minute for different size of input data based on our production need.
>>
>> Thanks!
>>
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Aah, I get it now. That is because the input data streams is replicated
>>> on two machines, so by locality the data is processed on those two
>>> machines. So the "map" stage on the data uses 2 executors, but the "reduce"
>>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
>>> default parallelism takes into affect only when the data is explicitly
>>> shuffled around.
>>>
>>> You can fix this by explicitly repartitioning the data.
>>>
>>> inputDStream.repartition(partitions)
>>>
>>> This is covered in the streaming tuning guide
>>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
>>> .
>>>
>>> TD
>>>
>>>
>>>
>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I just ran another job that only received data from Kafka, did some
>>>> filtering, and then save as text files in HDFS. There was no reducing work
>>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>>> stage was also 2 although I specified 300 executors in the job submission.
>>>> As a result, the simple save file action took more than 2 minutes. Do you
>>>> have any idea how Spark determined the number of executors
>>>> for different stages?
>>>>
>>>> Thanks!
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Tathagata,
>>>>>
>>>>> Below is my main function. I omit some filtering and data conversion
>>>>> functions. These functions are just a one-to-one mapping, which may not
>>>>> possible increase running time. The only reduce function I have here is
>>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>>>> have 240k lines each minute. And the other two topics have less than 30k
>>>>> lines per minute. The batch size is one minute and I specified 300
>>>>> executors in my spark-submit script. The default parallelism is 300.
>>>>>
>>>>>
>>>>>     val parition = 300
>>>>>     val zkQuorum = "zk1,zk2,zk3"
>>>>>     val group = "my-group-" + currentTime.toString
>>>>>     val topics = "topic1,topic2,topic3,topic4"
>>>>>     val numThreads = 4
>>>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>>>     val lines = lines1
>>>>>     lines.cache()
>>>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>>>     val mapData = jsonData.filter(_.isDefined)
>>>>>
>>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>>>     val validMapData = mapData.filter(isValidData(_))
>>>>>     val fields = validMapData.map(data => (data("id").toString,
>>>>> timestampToUTCUnix(data("time").toString),
>>>>>
>>>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>>>                                            data("id3").toString,
>>>>> data("log_type").toString, data("sub_log_type").toString))
>>>>>     val timeDiff = 3600L
>>>>>     val filteredFields = fields.filter(field => abs(field._2 -
>>>>> field._3) <= timeDiff)
>>>>>
>>>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>>>> fields._2, fields._4, fields._5, fields._7))
>>>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>>>> getWatchtimeTuple(fields))
>>>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>>>> fields._1)).groupByKey(partition)
>>>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>>>> value.toSet.size)}
>>>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>>>
>>>>> I have been working on this for several days. No findings why there
>>>>> are always 2 executors for the groupBy stage. Thanks a lot!
>>>>>
>>>>> Bill
>>>>>
>>>>>
>>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>
>>>>>> Can you show us the program that you are running. If you are setting
>>>>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>>>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>>>>> to your context. However the data distribution may be skewed in which case,
>>>>>> you can use a repartition operation to redistributed the data more evenly
>>>>>> (both DStream and RDD have repartition).
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <
>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Tathagata,
>>>>>>>
>>>>>>> I also tried to use the number of partitions as parameters to the
>>>>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>>>> submission script. Moreover, the running time of different executors is
>>>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>>>> executors evenly so that the computation can be efficiently finished. I am
>>>>>>> not sure how to achieve this.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>
>>>>>>>> Can you try setting the number-of-partitions in all the
>>>>>>>> shuffle-based DStream operations, explicitly. It may be the case that the
>>>>>>>> default parallelism (that is, spark.default.parallelism) is probably not
>>>>>>>> being respected.
>>>>>>>>
>>>>>>>> Regarding the unusual delay, I would look at the task details of
>>>>>>>> that stage in the Spark web ui. It will show break of time for each task,
>>>>>>>> including GC times, etc. That might give some indication.
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <
>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Tathagata,
>>>>>>>>>
>>>>>>>>> I set default parallelism as 300 in my configuration file.
>>>>>>>>> Sometimes there are more executors in a job. However, it is still slow. And
>>>>>>>>> I further observed that most executors take less than 20 seconds but two of
>>>>>>>>> them take much longer such as 2 minutes. The data size is very small (less
>>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by
>>>>>>>>> operation takes more then 3 minutes.  Thanks!
>>>>>>>>>
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>>>>
>>>>>>>>>> TD
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data
>>>>>>>>>>> from Kafka and group the data by a certain field. The data size is 480k
>>>>>>>>>>> lines per minute where the batch size is 1 minute.
>>>>>>>>>>>
>>>>>>>>>>> For some batches, the program sometimes take more than 3 minute
>>>>>>>>>>> to finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",*
>>>>>>>>>>> there are sometimes 2 executors allocated for this stage. However, during
>>>>>>>>>>> other batches, the executors can be several hundred for the same stage,
>>>>>>>>>>> which means the number of executors for the same operations change.
>>>>>>>>>>>
>>>>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>>>>
>>>>>>>>>>> Bill
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Tathagata Das <ta...@gmail.com>.
After using repartition(300), how many executors did it run on? By the way,
repartitions(300) means it will divide the shuffled data into 300
partitions. Since there are many cores on each of the 300
machines/executors, these partitions (each requiring a core) may not be
spread all 300 executors. Hence, if you really want spread it all 300
executors, you may have to bump up the partitions even more. However,
increasing the partitions to too high may not be beneficial, and you will
have play around with the number to figure out sweet spot that reduces the
time to process the stage / time to process the whole batch.

TD


On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> Do you mean that the data is not shuffled until the reduce stage? That
> means groupBy still only uses 2 machines?
>
> I think I used repartition(300) after I read the data from Kafka into
> DStream. It seems that it did not guarantee that the map or reduce stages
> will be run on 300 machines. I am currently trying to initiate 100 DStream
> from KafkaUtils.createDStream and union them. Now the reduce stages had
> around 80 machines for all the batches. However, this method will introduce
> many dstreams. It will be good if we can control the number of executors in
> the groupBy operation because the calculation needs to be finished within 1
> minute for different size of input data based on our production need.
>
> Thanks!
>
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Aah, I get it now. That is because the input data streams is replicated
>> on two machines, so by locality the data is processed on those two
>> machines. So the "map" stage on the data uses 2 executors, but the "reduce"
>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
>> default parallelism takes into affect only when the data is explicitly
>> shuffled around.
>>
>> You can fix this by explicitly repartitioning the data.
>>
>> inputDStream.repartition(partitions)
>>
>> This is covered in the streaming tuning guide
>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
>> .
>>
>> TD
>>
>>
>>
>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I just ran another job that only received data from Kafka, did some
>>> filtering, and then save as text files in HDFS. There was no reducing work
>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>> stage was also 2 although I specified 300 executors in the job submission.
>>> As a result, the simple save file action took more than 2 minutes. Do you
>>> have any idea how Spark determined the number of executors
>>> for different stages?
>>>
>>> Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> Below is my main function. I omit some filtering and data conversion
>>>> functions. These functions are just a one-to-one mapping, which may not
>>>> possible increase running time. The only reduce function I have here is
>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>>> have 240k lines each minute. And the other two topics have less than 30k
>>>> lines per minute. The batch size is one minute and I specified 300
>>>> executors in my spark-submit script. The default parallelism is 300.
>>>>
>>>>
>>>>     val parition = 300
>>>>     val zkQuorum = "zk1,zk2,zk3"
>>>>     val group = "my-group-" + currentTime.toString
>>>>     val topics = "topic1,topic2,topic3,topic4"
>>>>     val numThreads = 4
>>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>>     val lines = lines1
>>>>     lines.cache()
>>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>>     val mapData = jsonData.filter(_.isDefined)
>>>>
>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>>     val validMapData = mapData.filter(isValidData(_))
>>>>     val fields = validMapData.map(data => (data("id").toString,
>>>> timestampToUTCUnix(data("time").toString),
>>>>
>>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>>                                            data("id3").toString,
>>>> data("log_type").toString, data("sub_log_type").toString))
>>>>     val timeDiff = 3600L
>>>>     val filteredFields = fields.filter(field => abs(field._2 -
>>>> field._3) <= timeDiff)
>>>>
>>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>>> fields._2, fields._4, fields._5, fields._7))
>>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>>> getWatchtimeTuple(fields))
>>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>>> fields._1)).groupByKey(partition)
>>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>>> value.toSet.size)}
>>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>>
>>>> I have been working on this for several days. No findings why there are
>>>> always 2 executors for the groupBy stage. Thanks a lot!
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> Can you show us the program that you are running. If you are setting
>>>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>>>> to your context. However the data distribution may be skewed in which case,
>>>>> you can use a repartition operation to redistributed the data more evenly
>>>>> (both DStream and RDD have repartition).
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bill.jaypeterson@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> I also tried to use the number of partitions as parameters to the
>>>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>>> submission script. Moreover, the running time of different executors is
>>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>>> executors evenly so that the computation can be efficiently finished. I am
>>>>>> not sure how to achieve this.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>>> Can you try setting the number-of-partitions in all the
>>>>>>> shuffle-based DStream operations, explicitly. It may be the case that the
>>>>>>> default parallelism (that is, spark.default.parallelism) is probably not
>>>>>>> being respected.
>>>>>>>
>>>>>>> Regarding the unusual delay, I would look at the task details of
>>>>>>> that stage in the Spark web ui. It will show break of time for each task,
>>>>>>> including GC times, etc. That might give some indication.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <
>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Tathagata,
>>>>>>>>
>>>>>>>> I set default parallelism as 300 in my configuration file.
>>>>>>>> Sometimes there are more executors in a job. However, it is still slow. And
>>>>>>>> I further observed that most executors take less than 20 seconds but two of
>>>>>>>> them take much longer such as 2 minutes. The data size is very small (less
>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by
>>>>>>>> operation takes more then 3 minutes.  Thanks!
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>>>
>>>>>>>>> TD
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data
>>>>>>>>>> from Kafka and group the data by a certain field. The data size is 480k
>>>>>>>>>> lines per minute where the batch size is 1 minute.
>>>>>>>>>>
>>>>>>>>>> For some batches, the program sometimes take more than 3 minute
>>>>>>>>>> to finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",*
>>>>>>>>>> there are sometimes 2 executors allocated for this stage. However, during
>>>>>>>>>> other batches, the executors can be several hundred for the same stage,
>>>>>>>>>> which means the number of executors for the same operations change.
>>>>>>>>>>
>>>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>>>
>>>>>>>>>> Bill
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

Do you mean that the data is not shuffled until the reduce stage? That
means groupBy still only uses 2 machines?

I think I used repartition(300) after I read the data from Kafka into
DStream. It seems that it did not guarantee that the map or reduce stages
will be run on 300 machines. I am currently trying to initiate 100 DStream
from KafkaUtils.createDStream and union them. Now the reduce stages had
around 80 machines for all the batches. However, this method will introduce
many dstreams. It will be good if we can control the number of executors in
the groupBy operation because the calculation needs to be finished within 1
minute for different size of input data based on our production need.

Thanks!


Bill


On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Aah, I get it now. That is because the input data streams is replicated on
> two machines, so by locality the data is processed on those two machines.
> So the "map" stage on the data uses 2 executors, but the "reduce" stage,
> (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
> parallelism takes into affect only when the data is explicitly shuffled
> around.
>
> You can fix this by explicitly repartitioning the data.
>
> inputDStream.repartition(partitions)
>
> This is covered in the streaming tuning guide
> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
> .
>
> TD
>
>
>
> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> I just ran another job that only received data from Kafka, did some
>> filtering, and then save as text files in HDFS. There was no reducing work
>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>> stage was also 2 although I specified 300 executors in the job submission.
>> As a result, the simple save file action took more than 2 minutes. Do you
>> have any idea how Spark determined the number of executors
>> for different stages?
>>
>> Thanks!
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Below is my main function. I omit some filtering and data conversion
>>> functions. These functions are just a one-to-one mapping, which may not
>>> possible increase running time. The only reduce function I have here is
>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>> have 240k lines each minute. And the other two topics have less than 30k
>>> lines per minute. The batch size is one minute and I specified 300
>>> executors in my spark-submit script. The default parallelism is 300.
>>>
>>>
>>>     val parition = 300
>>>     val zkQuorum = "zk1,zk2,zk3"
>>>     val group = "my-group-" + currentTime.toString
>>>     val topics = "topic1,topic2,topic3,topic4"
>>>     val numThreads = 4
>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>     val lines = lines1
>>>     lines.cache()
>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>     val mapData = jsonData.filter(_.isDefined)
>>>
>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>     val validMapData = mapData.filter(isValidData(_))
>>>     val fields = validMapData.map(data => (data("id").toString,
>>> timestampToUTCUnix(data("time").toString),
>>>
>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>                                            data("id3").toString,
>>> data("log_type").toString, data("sub_log_type").toString))
>>>     val timeDiff = 3600L
>>>     val filteredFields = fields.filter(field => abs(field._2 - field._3)
>>> <= timeDiff)
>>>
>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>> fields._2, fields._4, fields._5, fields._7))
>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>> getWatchtimeTuple(fields))
>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>> fields._1)).groupByKey(partition)
>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>> value.toSet.size)}
>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>
>>> I have been working on this for several days. No findings why there are
>>> always 2 executors for the groupBy stage. Thanks a lot!
>>>
>>> Bill
>>>
>>>
>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> Can you show us the program that you are running. If you are setting
>>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>>> to your context. However the data distribution may be skewed in which case,
>>>> you can use a repartition operation to redistributed the data more evenly
>>>> (both DStream and RDD have repartition).
>>>>
>>>> TD
>>>>
>>>>
>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Tathagata,
>>>>>
>>>>> I also tried to use the number of partitions as parameters to the
>>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>> submission script. Moreover, the running time of different executors is
>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>> executors evenly so that the computation can be efficiently finished. I am
>>>>> not sure how to achieve this.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Bill
>>>>>
>>>>>
>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>
>>>>>> Can you try setting the number-of-partitions in all the shuffle-based
>>>>>> DStream operations, explicitly. It may be the case that the default
>>>>>> parallelism (that is, spark.default.parallelism) is probably not being
>>>>>> respected.
>>>>>>
>>>>>> Regarding the unusual delay, I would look at the task details of that
>>>>>> stage in the Spark web ui. It will show break of time for each task,
>>>>>> including GC times, etc. That might give some indication.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bill.jaypeterson@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Tathagata,
>>>>>>>
>>>>>>> I set default parallelism as 300 in my configuration file. Sometimes
>>>>>>> there are more executors in a job. However, it is still slow. And I further
>>>>>>> observed that most executors take less than 20 seconds but two of them take
>>>>>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>>>>>> lines with only 4 fields). I am not sure why the group by operation takes
>>>>>>> more then 3 minutes.  Thanks!
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>
>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>> bill.jaypeterson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>>>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>>>>>>> per minute where the batch size is 1 minute.
>>>>>>>>>
>>>>>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",*
>>>>>>>>> there are sometimes 2 executors allocated for this stage. However, during
>>>>>>>>> other batches, the executors can be several hundred for the same stage,
>>>>>>>>> which means the number of executors for the same operations change.
>>>>>>>>>
>>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>>
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Tathagata Das <ta...@gmail.com>.
Aah, I get it now. That is because the input data streams is replicated on
two machines, so by locality the data is processed on those two machines.
So the "map" stage on the data uses 2 executors, but the "reduce" stage,
(after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
parallelism takes into affect only when the data is explicitly shuffled
around.

You can fix this by explicitly repartitioning the data.

inputDStream.repartition(partitions)

This is covered in the streaming tuning guide
<http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
.

TD



On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi folks,
>
> I just ran another job that only received data from Kafka, did some
> filtering, and then save as text files in HDFS. There was no reducing work
> involved. Surprisingly, the number of executors for the saveAsTextFiles
> stage was also 2 although I specified 300 executors in the job submission.
> As a result, the simple save file action took more than 2 minutes. Do you
> have any idea how Spark determined the number of executors
> for different stages?
>
> Thanks!
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> Below is my main function. I omit some filtering and data conversion
>> functions. These functions are just a one-to-one mapping, which may not
>> possible increase running time. The only reduce function I have here is
>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>> have 240k lines each minute. And the other two topics have less than 30k
>> lines per minute. The batch size is one minute and I specified 300
>> executors in my spark-submit script. The default parallelism is 300.
>>
>>
>>     val parition = 300
>>     val zkQuorum = "zk1,zk2,zk3"
>>     val group = "my-group-" + currentTime.toString
>>     val topics = "topic1,topic2,topic3,topic4"
>>     val numThreads = 4
>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>     ssc = new StreamingContext(conf, Seconds(batch))
>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>     val lines = lines1
>>     lines.cache()
>>     val jsonData = lines.map(JSON.parseFull(_))
>>     val mapData = jsonData.filter(_.isDefined)
>>
>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>     val validMapData = mapData.filter(isValidData(_))
>>     val fields = validMapData.map(data => (data("id").toString,
>> timestampToUTCUnix(data("time").toString),
>>
>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>                                            data("id3").toString,
>> data("log_type").toString, data("sub_log_type").toString))
>>     val timeDiff = 3600L
>>     val filteredFields = fields.filter(field => abs(field._2 - field._3)
>> <= timeDiff)
>>
>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>> fields._2, fields._4, fields._5, fields._7))
>>     val watchTimeTuples = watchTimeFields.map(fields =>
>> getWatchtimeTuple(fields))
>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>> fields._1)).groupByKey(partition)
>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>> value.toSet.size)}
>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>
>> I have been working on this for several days. No findings why there are
>> always 2 executors for the groupBy stage. Thanks a lot!
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Can you show us the program that you are running. If you are setting
>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>> to your context. However the data distribution may be skewed in which case,
>>> you can use a repartition operation to redistributed the data more evenly
>>> (both DStream and RDD have repartition).
>>>
>>> TD
>>>
>>>
>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> I also tried to use the number of partitions as parameters to the
>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>> 50 instead of 300, which is the number of the executors I specified in
>>>> submission script. Moreover, the running time of different executors is
>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>> executors evenly so that the computation can be efficiently finished. I am
>>>> not sure how to achieve this.
>>>>
>>>> Thanks!
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> Can you try setting the number-of-partitions in all the shuffle-based
>>>>> DStream operations, explicitly. It may be the case that the default
>>>>> parallelism (that is, spark.default.parallelism) is probably not being
>>>>> respected.
>>>>>
>>>>> Regarding the unusual delay, I would look at the task details of that
>>>>> stage in the Spark web ui. It will show break of time for each task,
>>>>> including GC times, etc. That might give some indication.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> I set default parallelism as 300 in my configuration file. Sometimes
>>>>>> there are more executors in a job. However, it is still slow. And I further
>>>>>> observed that most executors take less than 20 seconds but two of them take
>>>>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>>>>> lines with only 4 fields). I am not sure why the group by operation takes
>>>>>> more then 3 minutes.  Thanks!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>>> Are you specifying the number of reducers in all the
>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bill.jaypeterson@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>>>>>> per minute where the batch size is 1 minute.
>>>>>>>>
>>>>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
>>>>>>>> are sometimes 2 executors allocated for this stage. However, during other
>>>>>>>> batches, the executors can be several hundred for the same stage, which
>>>>>>>> means the number of executors for the same operations change.
>>>>>>>>
>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi folks,

I just ran another job that only received data from Kafka, did some
filtering, and then save as text files in HDFS. There was no reducing work
involved. Surprisingly, the number of executors for the saveAsTextFiles
stage was also 2 although I specified 300 executors in the job submission.
As a result, the simple save file action took more than 2 minutes. Do you
have any idea how Spark determined the number of executors
for different stages?

Thanks!

Bill


On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> Below is my main function. I omit some filtering and data conversion
> functions. These functions are just a one-to-one mapping, which may not
> possible increase running time. The only reduce function I have here is
> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
> have 240k lines each minute. And the other two topics have less than 30k
> lines per minute. The batch size is one minute and I specified 300
> executors in my spark-submit script. The default parallelism is 300.
>
>
>     val parition = 300
>     val zkQuorum = "zk1,zk2,zk3"
>     val group = "my-group-" + currentTime.toString
>     val topics = "topic1,topic2,topic3,topic4"
>     val numThreads = 4
>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>     ssc = new StreamingContext(conf, Seconds(batch))
>     ssc.checkpoint(hadoopOutput + "checkpoint")
>     val lines = lines1
>     lines.cache()
>     val jsonData = lines.map(JSON.parseFull(_))
>     val mapData = jsonData.filter(_.isDefined)
>
> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>     val validMapData = mapData.filter(isValidData(_))
>     val fields = validMapData.map(data => (data("id").toString,
> timestampToUTCUnix(data("time").toString),
>
>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>                                            data("id3").toString,
> data("log_type").toString, data("sub_log_type").toString))
>     val timeDiff = 3600L
>     val filteredFields = fields.filter(field => abs(field._2 - field._3)
> <= timeDiff)
>
>     val watchTimeFields = filteredFields.map(fields => (fields._1,
> fields._2, fields._4, fields._5, fields._7))
>     val watchTimeTuples = watchTimeFields.map(fields =>
> getWatchtimeTuple(fields))
>     val programDuids = watchTimeTuples.map(fields => (fields._3,
> fields._1)).groupByKey(partition)
>     val programDuidNum = programDuids.map{case(key, value) => (key,
> value.toSet.size)}
>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>
> I have been working on this for several days. No findings why there are
> always 2 executors for the groupBy stage. Thanks a lot!
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Can you show us the program that you are running. If you are setting
>> number of partitions in the XYZ-ByKey operation as 300, then there should
>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>> to your context. However the data distribution may be skewed in which case,
>> you can use a repartition operation to redistributed the data more evenly
>> (both DStream and RDD have repartition).
>>
>> TD
>>
>>
>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> I also tried to use the number of partitions as parameters to the
>>> functions such as groupByKey. It seems the numbers of executors is around
>>> 50 instead of 300, which is the number of the executors I specified in
>>> submission script. Moreover, the running time of different executors is
>>> skewed. The ideal case is that Spark can distribute the data into 300
>>> executors evenly so that the computation can be efficiently finished. I am
>>> not sure how to achieve this.
>>>
>>> Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> Can you try setting the number-of-partitions in all the shuffle-based
>>>> DStream operations, explicitly. It may be the case that the default
>>>> parallelism (that is, spark.default.parallelism) is probably not being
>>>> respected.
>>>>
>>>> Regarding the unusual delay, I would look at the task details of that
>>>> stage in the Spark web ui. It will show break of time for each task,
>>>> including GC times, etc. That might give some indication.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Tathagata,
>>>>>
>>>>> I set default parallelism as 300 in my configuration file. Sometimes
>>>>> there are more executors in a job. However, it is still slow. And I further
>>>>> observed that most executors take less than 20 seconds but two of them take
>>>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>>>> lines with only 4 fields). I am not sure why the group by operation takes
>>>>> more then 3 minutes.  Thanks!
>>>>>
>>>>> Bill
>>>>>
>>>>>
>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>
>>>>>> Are you specifying the number of reducers in all the
>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>>>>> per minute where the batch size is 1 minute.
>>>>>>>
>>>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
>>>>>>> are sometimes 2 executors allocated for this stage. However, during other
>>>>>>> batches, the executors can be several hundred for the same stage, which
>>>>>>> means the number of executors for the same operations change.
>>>>>>>
>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

Below is my main function. I omit some filtering and data conversion
functions. These functions are just a one-to-one mapping, which may not
possible increase running time. The only reduce function I have here is
groupByKey. There are 4 topics in my Kafka brokers and two of the topics
have 240k lines each minute. And the other two topics have less than 30k
lines per minute. The batch size is one minute and I specified 300
executors in my spark-submit script. The default parallelism is 300.


    val parition = 300
    val zkQuorum = "zk1,zk2,zk3"
    val group = "my-group-" + currentTime.toString
    val topics = "topic1,topic2,topic3,topic4"
    val numThreads = 4
    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    ssc = new StreamingContext(conf, Seconds(batch))
    ssc.checkpoint(hadoopOutput + "checkpoint")
    val lines = lines1
    lines.cache()
    val jsonData = lines.map(JSON.parseFull(_))
    val mapData = jsonData.filter(_.isDefined)

.map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
    val validMapData = mapData.filter(isValidData(_))
    val fields = validMapData.map(data => (data("id").toString,
timestampToUTCUnix(data("time").toString),

 timestampToUTCUnix(data("local_time").toString), data("id2").toString,
                                           data("id3").toString,
data("log_type").toString, data("sub_log_type").toString))
    val timeDiff = 3600L
    val filteredFields = fields.filter(field => abs(field._2 - field._3) <=
timeDiff)

    val watchTimeFields = filteredFields.map(fields => (fields._1,
fields._2, fields._4, fields._5, fields._7))
    val watchTimeTuples = watchTimeFields.map(fields =>
getWatchtimeTuple(fields))
    val programDuids = watchTimeTuples.map(fields => (fields._3,
fields._1)).groupByKey(partition)
    val programDuidNum = programDuids.map{case(key, value) => (key,
value.toSet.size)}
    programDuidNum.saveAsTextFiles(hadoopOutput+"result")

I have been working on this for several days. No findings why there are
always 2 executors for the groupBy stage. Thanks a lot!

Bill


On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Can you show us the program that you are running. If you are setting
> number of partitions in the XYZ-ByKey operation as 300, then there should
> be 300 tasks for that stage, distributed on the 50 executors are allocated
> to your context. However the data distribution may be skewed in which case,
> you can use a repartition operation to redistributed the data more evenly
> (both DStream and RDD have repartition).
>
> TD
>
>
> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> I also tried to use the number of partitions as parameters to the
>> functions such as groupByKey. It seems the numbers of executors is around
>> 50 instead of 300, which is the number of the executors I specified in
>> submission script. Moreover, the running time of different executors is
>> skewed. The ideal case is that Spark can distribute the data into 300
>> executors evenly so that the computation can be efficiently finished. I am
>> not sure how to achieve this.
>>
>> Thanks!
>>
>> Bill
>>
>>
>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Can you try setting the number-of-partitions in all the shuffle-based
>>> DStream operations, explicitly. It may be the case that the default
>>> parallelism (that is, spark.default.parallelism) is probably not being
>>> respected.
>>>
>>> Regarding the unusual delay, I would look at the task details of that
>>> stage in the Spark web ui. It will show break of time for each task,
>>> including GC times, etc. That might give some indication.
>>>
>>> TD
>>>
>>>
>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> I set default parallelism as 300 in my configuration file. Sometimes
>>>> there are more executors in a job. However, it is still slow. And I further
>>>> observed that most executors take less than 20 seconds but two of them take
>>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>>> lines with only 4 fields). I am not sure why the group by operation takes
>>>> more then 3 minutes.  Thanks!
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> Are you specifying the number of reducers in all the DStream.****ByKey
>>>>> operations? If the reduce by key is not set, then the number of reducers
>>>>> used in the stages can keep changing across batches.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>>>> per minute where the batch size is 1 minute.
>>>>>>
>>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
>>>>>> are sometimes 2 executors allocated for this stage. However, during other
>>>>>> batches, the executors can be several hundred for the same stage, which
>>>>>> means the number of executors for the same operations change.
>>>>>>
>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Tathagata Das <ta...@gmail.com>.
Can you show us the program that you are running. If you are setting number
of partitions in the XYZ-ByKey operation as 300, then there should be 300
tasks for that stage, distributed on the 50 executors are allocated to your
context. However the data distribution may be skewed in which case, you can
use a repartition operation to redistributed the data more evenly (both
DStream and RDD have repartition).

TD


On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> I also tried to use the number of partitions as parameters to the
> functions such as groupByKey. It seems the numbers of executors is around
> 50 instead of 300, which is the number of the executors I specified in
> submission script. Moreover, the running time of different executors is
> skewed. The ideal case is that Spark can distribute the data into 300
> executors evenly so that the computation can be efficiently finished. I am
> not sure how to achieve this.
>
> Thanks!
>
> Bill
>
>
> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Can you try setting the number-of-partitions in all the shuffle-based
>> DStream operations, explicitly. It may be the case that the default
>> parallelism (that is, spark.default.parallelism) is probably not being
>> respected.
>>
>> Regarding the unusual delay, I would look at the task details of that
>> stage in the Spark web ui. It will show break of time for each task,
>> including GC times, etc. That might give some indication.
>>
>> TD
>>
>>
>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> I set default parallelism as 300 in my configuration file. Sometimes
>>> there are more executors in a job. However, it is still slow. And I further
>>> observed that most executors take less than 20 seconds but two of them take
>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>> lines with only 4 fields). I am not sure why the group by operation takes
>>> more then 3 minutes.  Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> Are you specifying the number of reducers in all the DStream.****ByKey
>>>> operations? If the reduce by key is not set, then the number of reducers
>>>> used in the stages can keep changing across batches.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>>> per minute where the batch size is 1 minute.
>>>>>
>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
>>>>> are sometimes 2 executors allocated for this stage. However, during other
>>>>> batches, the executors can be several hundred for the same stage, which
>>>>> means the number of executors for the same operations change.
>>>>>
>>>>> Does anyone know how Spark allocate the number of executors for
>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>
>>>>> Bill
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

I also tried to use the number of partitions as parameters to the functions
such as groupByKey. It seems the numbers of executors is around 50 instead
of 300, which is the number of the executors I specified in submission
script. Moreover, the running time of different executors is skewed. The
ideal case is that Spark can distribute the data into 300 executors evenly
so that the computation can be efficiently finished. I am not sure how to
achieve this.

Thanks!

Bill


On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Can you try setting the number-of-partitions in all the shuffle-based
> DStream operations, explicitly. It may be the case that the default
> parallelism (that is, spark.default.parallelism) is probably not being
> respected.
>
> Regarding the unusual delay, I would look at the task details of that
> stage in the Spark web ui. It will show break of time for each task,
> including GC times, etc. That might give some indication.
>
> TD
>
>
> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> I set default parallelism as 300 in my configuration file. Sometimes
>> there are more executors in a job. However, it is still slow. And I further
>> observed that most executors take less than 20 seconds but two of them take
>> much longer such as 2 minutes. The data size is very small (less than 480k
>> lines with only 4 fields). I am not sure why the group by operation takes
>> more then 3 minutes.  Thanks!
>>
>> Bill
>>
>>
>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Are you specifying the number of reducers in all the DStream.****ByKey
>>> operations? If the reduce by key is not set, then the number of reducers
>>> used in the stages can keep changing across batches.
>>>
>>> TD
>>>
>>>
>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a Spark streaming job running on yarn. It consume data from
>>>> Kafka and group the data by a certain field. The data size is 480k lines
>>>> per minute where the batch size is 1 minute.
>>>>
>>>> For some batches, the program sometimes take more than 3 minute to
>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>> workers and specify 300 as the partition number for groupby. When I checked
>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there are
>>>> sometimes 2 executors allocated for this stage. However, during other
>>>> batches, the executors can be several hundred for the same stage, which
>>>> means the number of executors for the same operations change.
>>>>
>>>> Does anyone know how Spark allocate the number of executors for
>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>
>>>> Bill
>>>>
>>>
>>>
>>
>

Re: Number of executors change during job running

Posted by Tathagata Das <ta...@gmail.com>.
Can you try setting the number-of-partitions in all the shuffle-based
DStream operations, explicitly. It may be the case that the default
parallelism (that is, spark.default.parallelism) is probably not being
respected.

Regarding the unusual delay, I would look at the task details of that stage
in the Spark web ui. It will show break of time for each task, including GC
times, etc. That might give some indication.

TD


On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bi...@gmail.com>
wrote:

> Hi Tathagata,
>
> I set default parallelism as 300 in my configuration file. Sometimes there
> are more executors in a job. However, it is still slow. And I further
> observed that most executors take less than 20 seconds but two of them take
> much longer such as 2 minutes. The data size is very small (less than 480k
> lines with only 4 fields). I am not sure why the group by operation takes
> more then 3 minutes.  Thanks!
>
> Bill
>
>
> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Are you specifying the number of reducers in all the DStream.****ByKey
>> operations? If the reduce by key is not set, then the number of reducers
>> used in the stages can keep changing across batches.
>>
>> TD
>>
>>
>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a Spark streaming job running on yarn. It consume data from Kafka
>>> and group the data by a certain field. The data size is 480k lines per
>>> minute where the batch size is 1 minute.
>>>
>>> For some batches, the program sometimes take more than 3 minute to
>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>> workers and specify 300 as the partition number for groupby. When I checked
>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there are
>>> sometimes 2 executors allocated for this stage. However, during other
>>> batches, the executors can be several hundred for the same stage, which
>>> means the number of executors for the same operations change.
>>>
>>> Does anyone know how Spark allocate the number of executors for
>>> different stages and how to increase the efficiency for task? Thanks!
>>>
>>> Bill
>>>
>>
>>
>

Re: Number of executors change during job running

Posted by Bill Jay <bi...@gmail.com>.
Hi Tathagata,

I set default parallelism as 300 in my configuration file. Sometimes there
are more executors in a job. However, it is still slow. And I further
observed that most executors take less than 20 seconds but two of them take
much longer such as 2 minutes. The data size is very small (less than 480k
lines with only 4 fields). I am not sure why the group by operation takes
more then 3 minutes.  Thanks!

Bill


On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Are you specifying the number of reducers in all the DStream.****ByKey
> operations? If the reduce by key is not set, then the number of reducers
> used in the stages can keep changing across batches.
>
> TD
>
>
> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have a Spark streaming job running on yarn. It consume data from Kafka
>> and group the data by a certain field. The data size is 480k lines per
>> minute where the batch size is 1 minute.
>>
>> For some batches, the program sometimes take more than 3 minute to finish
>> the groupBy operation, which seems slow to me. I allocated 300 workers and
>> specify 300 as the partition number for groupby. When I checked the slow
>> stage *"combineByKey at ShuffledDStream.scala:42",* there are sometimes
>> 2 executors allocated for this stage. However, during other batches, the
>> executors can be several hundred for the same stage, which means the number
>> of executors for the same operations change.
>>
>> Does anyone know how Spark allocate the number of executors for different
>> stages and how to increase the efficiency for task? Thanks!
>>
>> Bill
>>
>
>

Re: Number of executors change during job running

Posted by Tathagata Das <ta...@gmail.com>.
Are you specifying the number of reducers in all the DStream.****ByKey
operations? If the reduce by key is not set, then the number of reducers
used in the stages can keep changing across batches.

TD


On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bi...@gmail.com> wrote:

> Hi all,
>
> I have a Spark streaming job running on yarn. It consume data from Kafka
> and group the data by a certain field. The data size is 480k lines per
> minute where the batch size is 1 minute.
>
> For some batches, the program sometimes take more than 3 minute to finish
> the groupBy operation, which seems slow to me. I allocated 300 workers and
> specify 300 as the partition number for groupby. When I checked the slow
> stage *"combineByKey at ShuffledDStream.scala:42",* there are sometimes 2
> executors allocated for this stage. However, during other batches, the
> executors can be several hundred for the same stage, which means the number
> of executors for the same operations change.
>
> Does anyone know how Spark allocate the number of executors for different
> stages and how to increase the efficiency for task? Thanks!
>
> Bill
>