You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by kayousterhout <gi...@git.apache.org> on 2017/01/03 20:29:08 UTC

[GitHub] spark pull request #16053: [SPARK-17931] Eliminate unncessary task (de) seri...

Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16053#discussion_r94463665
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
    @@ -17,27 +17,123 @@
     
     package org.apache.spark.scheduler
     
    +import java.io.{DataInputStream, DataOutputStream}
     import java.nio.ByteBuffer
    +import java.util.Properties
     
    -import org.apache.spark.util.SerializableBuffer
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{HashMap, Map}
    +
    +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
     
     /**
      * Description of a task that gets passed onto executors to be executed, usually created by
    - * `TaskSetManager.resourceOffer`.
    +  * `TaskSetManager.resourceOffer`.
    + *
    + * TaskDescriptions and the associated Task need to be serialized carefully for two reasons:
    + *
    + *     (1) When a TaskDescription is received by an Executor, the Executor needs to first get the
    + *         list of JARs and files and add these to the classpath, and set the properties, before
    + *         deserializing the Task object (serializedTask). This is why the Properties are included
    + *         in the TaskDescription, even though they're also in the serialized task.
    + *     (2) Because a TaskDescription is serialized and sent to an executor for each task, efficient
    + *         serialization (both in terms of serialization time and serialized buffer size) is
    + *         important. For this reason, we serialize TaskDescriptions ourselves with the
    + *         TaskDescription.encode and TaskDescription.decode methods.  This results in a smaller
    + *         serialized size because it avoids serializing unnecessary fields in the Map objects
    + *         (which can introduce significant overhead when the maps are small).
      */
     private[spark] class TaskDescription(
         val taskId: Long,
         val attemptNumber: Int,
         val executorId: String,
         val name: String,
         val index: Int,    // Index within this task's TaskSet
    -    _serializedTask: ByteBuffer)
    -  extends Serializable {
    +    val addedFiles: Map[String, Long],
    +    val addedJars: Map[String, Long],
    +    val properties: Properties,
    +    val serializedTask: ByteBuffer) {
     
    -  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
    -  private val buffer = new SerializableBuffer(_serializedTask)
    +  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
    +}
     
    -  def serializedTask: ByteBuffer = buffer.value
    +private[spark] object TaskDescription {
    +  def encode(taskDescription: TaskDescription): ByteBuffer = {
    +    val bytesOut = new ByteBufferOutputStream(4096)
    +    val dataOut = new DataOutputStream(bytesOut)
     
    -  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
    +    dataOut.writeLong(taskDescription.taskId)
    +    dataOut.writeInt(taskDescription.attemptNumber)
    +    dataOut.writeUTF(taskDescription.executorId)
    +    dataOut.writeUTF(taskDescription.name)
    +    dataOut.writeInt(taskDescription.index)
    +
    +    // Write files.
    +    dataOut.writeInt(taskDescription.addedFiles.size)
    +    for ((name, timestamp) <- taskDescription.addedFiles) {
    +      dataOut.writeUTF(name)
    +      dataOut.writeLong(timestamp)
    +    }
    +
    +    // Write jars.
    +    dataOut.writeInt(taskDescription.addedJars.size)
    +    for ((name, timestamp) <- taskDescription.addedJars) {
    +      dataOut.writeUTF(name)
    +      dataOut.writeLong(timestamp)
    +    }
    +
    +    // Write properties.
    +    dataOut.writeInt(taskDescription.properties.size())
    +    taskDescription.properties.stringPropertyNames.asScala.foreach { name =>
    +      dataOut.writeUTF(name)
    +      dataOut.writeUTF(taskDescription.properties.getProperty(name))
    +    }
    +
    +    // Write the task. The task is already serialized, so write it directly to the byte buffer
    +    // (this requires first flushing the data output stream, so that all of the data has been
    +    // written from the data output stream so the underlying ByteBufferOutputStream before
    +    // we write the task).
    +    dataOut.flush()
    --- End diff --
    
    Cool thanks for noticing this -- fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org