You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/06 02:15:58 UTC

spark git commit: [SPARK-18729][SS] Move DataFrame.collect out of synchronized block in MemorySink

Repository: spark
Updated Branches:
  refs/heads/master 3ba69b648 -> 1b2785c3d


[SPARK-18729][SS] Move DataFrame.collect out of synchronized block in MemorySink

## What changes were proposed in this pull request?

Move DataFrame.collect out of synchronized block so that we can query content in MemorySink when `DataFrame.collect` is running.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16162 from zsxwing/SPARK-18729.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b2785c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b2785c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b2785c3

Branch: refs/heads/master
Commit: 1b2785c3d0a40da2fca923af78066060dbfbcf0a
Parents: 3ba69b6
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Dec 5 18:15:55 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 5 18:15:55 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/memory.scala   | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b2785c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index adf6963..b370845 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -186,16 +186,23 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
     }.mkString("\n")
   }
 
-  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
-    if (latestBatchId.isEmpty || batchId > latestBatchId.get) {
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    val notCommitted = synchronized {
+      latestBatchId.isEmpty || batchId > latestBatchId.get
+    }
+    if (notCommitted) {
       logDebug(s"Committing batch $batchId to $this")
       outputMode match {
         case InternalOutputModes.Append | InternalOutputModes.Update =>
-          batches.append(AddedData(batchId, data.collect()))
+          val rows = AddedData(batchId, data.collect())
+          synchronized { batches += rows }
 
         case InternalOutputModes.Complete =>
-          batches.clear()
-          batches += AddedData(batchId, data.collect())
+          val rows = AddedData(batchId, data.collect())
+          synchronized {
+            batches.clear()
+            batches += rows
+          }
 
         case _ =>
           throw new IllegalArgumentException(
@@ -206,7 +213,7 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
     }
   }
 
-  def clear(): Unit = {
+  def clear(): Unit = synchronized {
     batches.clear()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org