You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/18 20:47:02 UTC

spark git commit: [SPARK-17930][CORE] The SerializerInstance instance used when deserializing a TaskResult is not reused

Repository: spark
Updated Branches:
  refs/heads/master 20dd11096 -> 4518642ab


[SPARK-17930][CORE] The SerializerInstance instance used when deserializing a TaskResult is not reused

## What changes were proposed in this pull request?
The following code is called when the DirectTaskResult instance is deserialized

```scala

  def value(): T = {
    if (valueObjectDeserialized) {
      valueObject
    } else {
      // Each deserialization creates a new instance of SerializerInstance, which is very time-consuming
      val resultSer = SparkEnv.get.serializer.newInstance()
      valueObject = resultSer.deserialize(valueBytes)
      valueObjectDeserialized = true
      valueObject
    }
  }

```

In the case of stage has a lot of tasks, reuse SerializerInstance instance can improve the scheduling performance of three times

The test data is TPC-DS 2T (Parquet) and  SQL statement as follows (query 2):

```sql

select  i_item_id,
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4
 from store_sales, customer_demographics, date_dim, item, promotion
 where ss_sold_date_sk = d_date_sk and
       ss_item_sk = i_item_sk and
       ss_cdemo_sk = cd_demo_sk and
       ss_promo_sk = p_promo_sk and
       cd_gender = 'M' and
       cd_marital_status = 'M' and
       cd_education_status = '4 yr Degree' and
       (p_channel_email = 'N' or p_channel_event = 'N') and
       d_year = 2001
 group by i_item_id
 order by i_item_id
 limit 100;

```

`spark-defaults.conf` file:

```
spark.master                           yarn-client
spark.executor.instances               20
spark.driver.memory                    16g
spark.executor.memory                  30g
spark.executor.cores                   5
spark.default.parallelism              100
spark.sql.shuffle.partitions           100000
spark.serializer                       org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize              0
spark.rpc.netty.dispatcher.numThreads   8
spark.executor.extraJavaOptions          -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M
spark.cleaner.referenceTracking.blocking true
spark.cleaner.referenceTracking.blocking.shuffle true

```

Performance test results are as follows

[SPARK-17930](https://github.com/witgo/spark/tree/SPARK-17930)| [ed14633](https://github.com/witgo/spark/commit/ed1463341455830b8867b721a1b34f291139baf3])
------------ | -------------
54.5 s|231.7 s

## How was this patch tested?

Existing tests.

Author: Guoqiang Li <wi...@qq.com>

Closes #15512 from witgo/SPARK-17930.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4518642a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4518642a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4518642a

Branch: refs/heads/master
Commit: 4518642abd71bb1213a9efd72732102abf0bf7e7
Parents: 20dd110
Author: Guoqiang Li <wi...@qq.com>
Authored: Tue Oct 18 13:46:57 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Oct 18 13:46:57 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/TaskResult.scala     |  9 +++++----
 .../org/apache/spark/scheduler/TaskResultGetter.scala     | 10 +++++++++-
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4518642a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 77fda6f..366b92c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkEnv
+import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.{AccumulatorV2, Utils}
 
@@ -77,14 +78,14 @@ private[spark] class DirectTaskResult[T](
    *
    * After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
    */
-  def value(): T = {
+  def value(resultSer: SerializerInstance = null): T = {
     if (valueObjectDeserialized) {
       valueObject
     } else {
       // This should not run when holding a lock because it may cost dozens of seconds for a large
-      // value.
-      val resultSer = SparkEnv.get.serializer.newInstance()
-      valueObject = resultSer.deserialize(valueBytes)
+      // value
+      val ser = if (resultSer == null) SparkEnv.get.serializer.newInstance() else resultSer
+      valueObject = ser.deserialize(valueBytes)
       valueObjectDeserialized = true
       valueObject
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4518642a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 1c3fcbd..b1addc1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -48,6 +48,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
     }
   }
 
+  protected val taskResultSerializer = new ThreadLocal[SerializerInstance] {
+    override def initialValue(): SerializerInstance = {
+      sparkEnv.serializer.newInstance()
+    }
+  }
+
   def enqueueSuccessfulTask(
       taskSetManager: TaskSetManager,
       tid: Long,
@@ -63,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
               // deserialize "value" without holding any lock so that it won't block other threads.
               // We should call it here, so that when it's called again in
               // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
-              directResult.value()
+              directResult.value(taskResultSerializer.get())
               (directResult, serializedData.limit())
             case IndirectTaskResult(blockId, size) =>
               if (!taskSetManager.canFetchMoreResults(size)) {
@@ -84,6 +90,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
               }
               val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                 serializedTaskResult.get.toByteBuffer)
+              // force deserialization of referenced value
+              deserializedResult.value(taskResultSerializer.get())
               sparkEnv.blockManager.master.removeBlock(blockId)
               (deserializedResult, size)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org