You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/18 20:47:02 UTC
spark git commit: [SPARK-17930][CORE] The SerializerInstance instance
used when deserializing a TaskResult is not reused
Repository: spark
Updated Branches:
refs/heads/master 20dd11096 -> 4518642ab
[SPARK-17930][CORE] The SerializerInstance instance used when deserializing a TaskResult is not reused
## What changes were proposed in this pull request?
The following code is called when the DirectTaskResult instance is deserialized
```scala
def value(): T = {
if (valueObjectDeserialized) {
valueObject
} else {
// Each deserialization creates a new instance of SerializerInstance, which is very time-consuming
val resultSer = SparkEnv.get.serializer.newInstance()
valueObject = resultSer.deserialize(valueBytes)
valueObjectDeserialized = true
valueObject
}
}
```
In the case of stage has a lot of tasks, reuse SerializerInstance instance can improve the scheduling performance of three times
The test data is TPC-DS 2T (Parquet) and SQL statement as follows (query 2):
```sql
select i_item_id,
avg(ss_quantity) agg1,
avg(ss_list_price) agg2,
avg(ss_coupon_amt) agg3,
avg(ss_sales_price) agg4
from store_sales, customer_demographics, date_dim, item, promotion
where ss_sold_date_sk = d_date_sk and
ss_item_sk = i_item_sk and
ss_cdemo_sk = cd_demo_sk and
ss_promo_sk = p_promo_sk and
cd_gender = 'M' and
cd_marital_status = 'M' and
cd_education_status = '4 yr Degree' and
(p_channel_email = 'N' or p_channel_event = 'N') and
d_year = 2001
group by i_item_id
order by i_item_id
limit 100;
```
`spark-defaults.conf` file:
```
spark.master yarn-client
spark.executor.instances 20
spark.driver.memory 16g
spark.executor.memory 30g
spark.executor.cores 5
spark.default.parallelism 100
spark.sql.shuffle.partitions 100000
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize 0
spark.rpc.netty.dispatcher.numThreads 8
spark.executor.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M
spark.cleaner.referenceTracking.blocking true
spark.cleaner.referenceTracking.blocking.shuffle true
```
Performance test results are as follows
[SPARK-17930](https://github.com/witgo/spark/tree/SPARK-17930)| [ed14633](https://github.com/witgo/spark/commit/ed1463341455830b8867b721a1b34f291139baf3])
------------ | -------------
54.5 s|231.7 s
## How was this patch tested?
Existing tests.
Author: Guoqiang Li <wi...@qq.com>
Closes #15512 from witgo/SPARK-17930.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4518642a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4518642a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4518642a
Branch: refs/heads/master
Commit: 4518642abd71bb1213a9efd72732102abf0bf7e7
Parents: 20dd110
Author: Guoqiang Li <wi...@qq.com>
Authored: Tue Oct 18 13:46:57 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Oct 18 13:46:57 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/scheduler/TaskResult.scala | 9 +++++----
.../org/apache/spark/scheduler/TaskResultGetter.scala | 10 +++++++++-
2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4518642a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 77fda6f..366b92c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkEnv
+import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockId
import org.apache.spark.util.{AccumulatorV2, Utils}
@@ -77,14 +78,14 @@ private[spark] class DirectTaskResult[T](
*
* After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
*/
- def value(): T = {
+ def value(resultSer: SerializerInstance = null): T = {
if (valueObjectDeserialized) {
valueObject
} else {
// This should not run when holding a lock because it may cost dozens of seconds for a large
- // value.
- val resultSer = SparkEnv.get.serializer.newInstance()
- valueObject = resultSer.deserialize(valueBytes)
+ // value
+ val ser = if (resultSer == null) SparkEnv.get.serializer.newInstance() else resultSer
+ valueObject = ser.deserialize(valueBytes)
valueObjectDeserialized = true
valueObject
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4518642a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 1c3fcbd..b1addc1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -48,6 +48,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
}
+ protected val taskResultSerializer = new ThreadLocal[SerializerInstance] {
+ override def initialValue(): SerializerInstance = {
+ sparkEnv.serializer.newInstance()
+ }
+ }
+
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
@@ -63,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
- directResult.value()
+ directResult.value(taskResultSerializer.get())
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
@@ -84,6 +90,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
+ // force deserialization of referenced value
+ deserializedResult.value(taskResultSerializer.get())
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org