You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/09/18 21:33:44 UTC

spark git commit: [SPARK-25456][SQL][TEST] Fix PythonForeachWriterSuite

Repository: spark
Updated Branches:
  refs/heads/master 123f0041d -> a6f37b074


[SPARK-25456][SQL][TEST] Fix PythonForeachWriterSuite

PythonForeachWriterSuite was failing because RowQueue now needs to have a handle on a SparkEnv with a SerializerManager, so added a mock env with a serializer manager.

Also fixed a typo in the `finally` that was hiding the real exception.

Tested PythonForeachWriterSuite locally, full tests via jenkins.

Closes #22452 from squito/SPARK-25456.

Authored-by: Imran Rashid <ir...@cloudera.com>
Signed-off-by: Imran Rashid <ir...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6f37b07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6f37b07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6f37b07

Branch: refs/heads/master
Commit: a6f37b0742d87d5c8ee3e134999d665e5719e822
Parents: 123f004
Author: Imran Rashid <ir...@cloudera.com>
Authored: Tue Sep 18 16:33:37 2018 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Tue Sep 18 16:33:37 2018 -0500

----------------------------------------------------------------------
 .../execution/python/PythonForeachWriterSuite.scala   | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6f37b07/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
index 07e6034..d02014c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
@@ -19,17 +19,20 @@ package org.apache.spark.sql.execution.python
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.mockito.Mockito.when
 import org.scalatest.concurrent.Eventually
+import org.scalatest.mockito.MockitoSugar
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.python.PythonForeachWriter.UnsafeRowBuffer
 import org.apache.spark.sql.types.{DataType, IntegerType}
 import org.apache.spark.util.Utils
 
-class PythonForeachWriterSuite extends SparkFunSuite with Eventually {
+class PythonForeachWriterSuite extends SparkFunSuite with Eventually with MockitoSugar {
 
   testWithBuffer("UnsafeRowBuffer: iterator blocks when no data is available") { b =>
     b.assertIteratorBlocked()
@@ -75,7 +78,7 @@ class PythonForeachWriterSuite extends SparkFunSuite with Eventually {
         tester = new BufferTester(memBytes, sleepPerRowReadMs)
         f(tester)
       } finally {
-        if (tester == null) tester.close()
+        if (tester != null) tester.close()
       }
     }
   }
@@ -83,7 +86,12 @@ class PythonForeachWriterSuite extends SparkFunSuite with Eventually {
 
   class BufferTester(memBytes: Long, sleepPerRowReadMs: Int) {
     private val buffer = {
-      val mem = new TestMemoryManager(new SparkConf())
+      val mockEnv = mock[SparkEnv]
+      val conf = new SparkConf()
+      val serializerManager = new SerializerManager(new JavaSerializer(conf), conf, None)
+      when(mockEnv.serializerManager).thenReturn(serializerManager)
+      SparkEnv.set(mockEnv)
+      val mem = new TestMemoryManager(conf)
       mem.limit(memBytes)
       val taskM = new TaskMemoryManager(mem, 0)
       new UnsafeRowBuffer(taskM, Utils.createTempDir(), 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org