You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/27 00:51:20 UTC
spark git commit: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Repository: spark
Updated Branches:
refs/heads/master 9b262f6a0 -> 5c3a1f3fa
[SPARK-22355][SQL] Dataset.collect is not threadsafe
## What changes were proposed in this pull request?
It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row.
This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset.
## How was this patch tested?
N/A
Author: Wenchen Fan <we...@databricks.com>
Closes #19577 from cloud-fan/encoder.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c3a1f3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c3a1f3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c3a1f3f
Branch: refs/heads/master
Commit: 5c3a1f3fad695317c2fff1243cdb9b3ceb25c317
Parents: 9b262f6
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Oct 26 17:51:16 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Oct 26 17:51:16 2017 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/Dataset.scala | 33 +++++++++++++-------
1 file changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5c3a1f3f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b70dfc0..0e23983 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
@@ -198,15 +199,10 @@ class Dataset[T] private[sql](
*/
private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)
- /**
- * Encoder is used mostly as a container of serde expressions in Dataset. We build logical
- * plans by these serde expressions and execute it within the query framework. However, for
- * performance reasons we may want to use encoder as a function to deserialize internal rows to
- * custom objects, e.g. collect. Here we resolve and bind the encoder so that we can call its
- * `fromRow` method later.
- */
- private val boundEnc =
- exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)
+ // The deserializer expression which can be used to build a projection and turn rows to objects
+ // of type T, after collecting rows to the driver side.
+ private val deserializer =
+ exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).deserializer
private implicit def classTag = exprEnc.clsTag
@@ -2661,7 +2657,15 @@ class Dataset[T] private[sql](
*/
def toLocalIterator(): java.util.Iterator[T] = {
withAction("toLocalIterator", queryExecution) { plan =>
- plan.executeToIterator().map(boundEnc.fromRow).asJava
+ // This projection writes output to a `InternalRow`, which means applying this projection is
+ // not thread-safe. Here we create the projection inside this method to make `Dataset`
+ // thread-safe.
+ val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+ plan.executeToIterator().map { row =>
+ // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
+ // parameter of its `get` method, so it's safe to use null here.
+ objProj(row).get(0, null).asInstanceOf[T]
+ }.asJava
}
}
@@ -3102,7 +3106,14 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
private def collectFromPlan(plan: SparkPlan): Array[T] = {
- plan.executeCollect().map(boundEnc.fromRow)
+ // This projection writes output to a `InternalRow`, which means applying this projection is not
+ // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
+ val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+ plan.executeCollect().map { row =>
+ // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
+ // parameter of its `get` method, so it's safe to use null here.
+ objProj(row).get(0, null).asInstanceOf[T]
+ }
}
private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org