You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/30 19:45:35 UTC

spark git commit: [SPARK-8850] [SQL] Enable Unsafe mode by default

Repository: spark
Updated Branches:
  refs/heads/master ab78b1d2a -> 520ec0ff9


[SPARK-8850] [SQL] Enable Unsafe mode by default

This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues:

**List of fixed blockers**:

- [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741).
- [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this)
- [x] Update planner to also check whether codegen is enabled before planning unsafe operators.
- [x] Investigate failing HiveThriftBinaryServerSuite test.  This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD.  This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used.
- [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603).
- [x]  ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF.  This is necessary for `UDFSuite` to pass.  For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682.
- [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6).
- [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736.
- [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`.
- [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs:
  - [x] Wrong answer in `join_1to1` (fixed by #7680)
  - [x] Wrong answer in `join_nulls` (fixed by #7680)
  - [x] Managed memory OOM / leak in `lateral_view`
  - [x] Seems to hang indefinitely in `partcols1`.  This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710.
  - [x] Error while freeing memory in `partcols1`: will be fixed by #7734.
- [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well.
- [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759).

Author: Josh Rosen <jo...@databricks.com>

Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits:

83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
963f567 [Josh Rosen] Reduce buffer size for R tests
d6986de [Josh Rosen] Lower page size in PySpark tests
013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects
5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort
ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used
715517b [Josh Rosen] Enable Unsafe by default


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/520ec0ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/520ec0ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/520ec0ff

Branch: refs/heads/master
Commit: 520ec0ff9db75267f627dc4615b2316a1a3d44d7
Parents: ab78b1d
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Jul 30 10:45:32 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jul 30 10:45:32 2015 -0700

----------------------------------------------------------------------
 R/run-tests.sh                                  |  2 +-
 .../unsafe/sort/UnsafeExternalSorter.java       | 14 ++++++++++++++
 python/pyspark/java_gateway.py                  |  6 +++++-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  2 +-
 .../apache/spark/sql/execution/Exchange.scala   |  7 ++++++-
 .../spark/sql/ColumnExpressionSuite.scala       |  3 ++-
 .../sql/execution/UnsafeExternalSortSuite.scala | 20 +-------------------
 7 files changed, 30 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/R/run-tests.sh
----------------------------------------------------------------------
diff --git a/R/run-tests.sh b/R/run-tests.sh
index e82ad0b..18a1e13 100755
--- a/R/run-tests.sh
+++ b/R/run-tests.sh
@@ -23,7 +23,7 @@ FAILED=0
 LOGFILE=$FWDIR/unit-tests.out
 rm -f $LOGFILE
 
-SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
+SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
 FAILED=$((PIPESTATUS[0]||$FAILED))
 
 if [[ $FAILED != 0 ]]; then

http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c21990f..866e0b4 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -20,6 +20,9 @@ package org.apache.spark.util.collection.unsafe.sort;
 import java.io.IOException;
 import java.util.LinkedList;
 
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +93,17 @@ public final class UnsafeExternalSorter {
     this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
     initializeForWriting();
+
+    // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
+    // the end of the task. This is necessary to avoid memory leaks in when the downstream operator
+    // does not fully consume the sorter's output (e.g. sort followed by limit).
+    taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
+      @Override
+      public BoxedUnit apply() {
+        freeMemory();
+        return null;
+      }
+    });
   }
 
   // TODO: metrics tracking + integration with shuffle write metrics

http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 90cd342..60be85e 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -52,7 +52,11 @@ def launch_gateway():
         script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
         submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
         if os.environ.get("SPARK_TESTING"):
-            submit_args = "--conf spark.ui.enabled=false " + submit_args
+            submit_args = ' '.join([
+                "--conf spark.ui.enabled=false",
+                "--conf spark.buffer.pageSize=4mb",
+                submit_args
+            ])
         command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
 
         # Start a socket that will be used by PythonGatewayServer to communicate its port to us

http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2564bbd..6644e85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -229,7 +229,7 @@ private[spark] object SQLConf {
       " a specific query.")
 
   val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
-    defaultValue = Some(false),
+    defaultValue = Some(true),
     doc = "When true, use the new optimized Tungsten physical execution backend.")
 
   val DIALECT = stringConf(

http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 41a0c51..70e5031 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,7 +47,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
 
   override def canProcessSafeRows: Boolean = true
 
-  override def canProcessUnsafeRows: Boolean = true
+  override def canProcessUnsafeRows: Boolean = {
+    // Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
+    // an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
+    // ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
+    !newPartitioning.isInstanceOf[RangePartitioning]
+  }
 
   /**
    * Determines whether records must be defensively copied before being sent to the shuffle.

http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 5c11024..eb64684 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.scalatest.Matchers._
 
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.{Project, TungstenProject}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.test.SQLTestUtils
@@ -538,6 +538,7 @@ class ColumnExpressionSuite extends QueryTest with SQLTestUtils {
     def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
       val projects = df.queryExecution.executedPlan.collect {
         case project: Project => project
+        case tungstenProject: TungstenProject => tungstenProject
       }
       assert(projects.size === expectedNumProjects)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/520ec0ff/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 7a4baa9..138636b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -36,10 +36,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
   }
 
-  ignore("sort followed by limit should not leak memory") {
-    // TODO: this test is going to fail until we implement a proper iterator interface
-    // with a close() method.
-    TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
+  test("sort followed by limit") {
     checkThatPlansAgree(
       (1 to 100).map(v => Tuple1(v)).toDF("a"),
       (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
@@ -48,21 +45,6 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     )
   }
 
-  test("sort followed by limit") {
-    TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
-    try {
-      checkThatPlansAgree(
-        (1 to 100).map(v => Tuple1(v)).toDF("a"),
-        (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
-        (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
-        sortAnswers = false
-      )
-    } finally {
-      TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
-
-    }
-  }
-
   test("sorting does not crash for large inputs") {
     val sortOrder = 'a.asc :: Nil
     val stringLength = 1024 * 1024 * 2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org