You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kay Ousterhout (JIRA)" <ji...@apache.org> on 2017/01/06 19:35:58 UTC

[jira] [Created] (SPARK-19108) Broadcast all shared parts of tasks (to reduce task serialization time)

Kay Ousterhout created SPARK-19108:
--------------------------------------

             Summary: Broadcast all shared parts of tasks (to reduce task serialization time)
                 Key: SPARK-19108
                 URL: https://issues.apache.org/jira/browse/SPARK-19108
             Project: Spark
          Issue Type: Improvement
          Components: Scheduler
            Reporter: Kay Ousterhout


Expand the amount of information that's broadcasted for tasks, to avoid serializing data per-task that should only be sent to each executor once for the entire stage.

Conceptually, this means we'd have new classes  specially for sending the minimal necessary data to the executor, like:

{code}
/**
  * metadata about the taskset needed by the executor for all tasks in this taskset.  Subset of the
  * full data kept on the driver to make it faster to serialize and send to executors.
  */
class ExecutorTaskSetMeta(
  val stageId: Int,
  val stageAttemptId: Int,
  val properties: Properties,
  val addedFiles: Map[String, String],
  val addedJars: Map[String, String]
  // maybe task metrics here?
)

class ExecutorTaskData(
  val partitionId: Int,
  val attemptNumber: Int,
  val taskId: Long,
  val taskBinary: Broadcast[Array[Byte]],
  val taskSetMeta: Broadcast[ExecutorTaskSetMeta]
)
{code}

Then all the info you'd need to send to the executors would be a serialized version of ExecutorTaskData.  Furthermore, given the simplicity of that class, you could serialize manually, and then for each task you could just modify the first two ints & one long directly in the byte buffer.  (You could do the same trick for serialization even if ExecutorTaskSetMeta was not a broadcast, but that will keep the msgs small as well.)

There a bunch of details I'm skipping here: you'd also need to do some special handling for the TaskMetrics; the way tasks get started in the executor would change; you'd also need to refactor {{Task}} to let it get reconstructed from this information (or add more to ExecutorTaskSetMeta); and probably other details I'm overlooking now.

(this is copied from SPARK-18890 and [~imranr]'s comment there; cc [~shivaram])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org