You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/27 00:51:20 UTC

spark git commit: [SPARK-22355][SQL] Dataset.collect is not threadsafe

Repository: spark
Updated Branches:
  refs/heads/master 9b262f6a0 -> 5c3a1f3fa


[SPARK-22355][SQL] Dataset.collect is not threadsafe

## What changes were proposed in this pull request?

It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row.

This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset.

## How was this patch tested?

N/A

Author: Wenchen Fan <we...@databricks.com>

Closes #19577 from cloud-fan/encoder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c3a1f3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c3a1f3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c3a1f3f

Branch: refs/heads/master
Commit: 5c3a1f3fad695317c2fff1243cdb9b3ceb25c317
Parents: 9b262f6
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Oct 26 17:51:16 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Oct 26 17:51:16 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    | 33 +++++++++++++-------
 1 file changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c3a1f3f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b70dfc0..0e23983 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
@@ -198,15 +199,10 @@ class Dataset[T] private[sql](
    */
   private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)
 
-  /**
-   * Encoder is used mostly as a container of serde expressions in Dataset.  We build logical
-   * plans by these serde expressions and execute it within the query framework.  However, for
-   * performance reasons we may want to use encoder as a function to deserialize internal rows to
-   * custom objects, e.g. collect.  Here we resolve and bind the encoder so that we can call its
-   * `fromRow` method later.
-   */
-  private val boundEnc =
-    exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)
+  // The deserializer expression which can be used to build a projection and turn rows to objects
+  // of type T, after collecting rows to the driver side.
+  private val deserializer =
+    exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).deserializer
 
   private implicit def classTag = exprEnc.clsTag
 
@@ -2661,7 +2657,15 @@ class Dataset[T] private[sql](
    */
   def toLocalIterator(): java.util.Iterator[T] = {
     withAction("toLocalIterator", queryExecution) { plan =>
-      plan.executeToIterator().map(boundEnc.fromRow).asJava
+      // This projection writes output to a `InternalRow`, which means applying this projection is
+      // not thread-safe. Here we create the projection inside this method to make `Dataset`
+      // thread-safe.
+      val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+      plan.executeToIterator().map { row =>
+        // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
+        // parameter of its `get` method, so it's safe to use null here.
+        objProj(row).get(0, null).asInstanceOf[T]
+      }.asJava
     }
   }
 
@@ -3102,7 +3106,14 @@ class Dataset[T] private[sql](
    * Collect all elements from a spark plan.
    */
   private def collectFromPlan(plan: SparkPlan): Array[T] = {
-    plan.executeCollect().map(boundEnc.fromRow)
+    // This projection writes output to a `InternalRow`, which means applying this projection is not
+    // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
+    val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+    plan.executeCollect().map { row =>
+      // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
+      // parameter of its `get` method, so it's safe to use null here.
+      objProj(row).get(0, null).asInstanceOf[T]
+    }
   }
 
   private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org