You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/07/21 20:53:10 UTC

spark git commit: [SPARK-8357] Fix unsafe memory leak on empty inputs in GeneratedAggregate

Repository: spark
Updated Branches:
  refs/heads/master 87d890cc1 -> 9ba7c64de


[SPARK-8357] Fix unsafe memory leak on empty inputs in GeneratedAggregate

This patch fixes a managed memory leak in GeneratedAggregate.  The leak occurs when the unsafe aggregation path is used to perform grouped aggregation on an empty input; in this case, GeneratedAggregate allocates an UnsafeFixedWidthAggregationMap that is never cleaned up because `next()` is never called on the aggregate result iterator.

This patch fixes this by short-circuiting on empty inputs.

This patch is an updated version of #6810.

Closes #6810.

Author: navis.ryu <na...@apache.org>
Author: Josh Rosen <jo...@databricks.com>

Closes #7560 from JoshRosen/SPARK-8357 and squashes the following commits:

3486ce4 [Josh Rosen] Some minor cleanup
c649310 [Josh Rosen] Revert SparkPlan change:
3c7db0f [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-8357
adc8239 [Josh Rosen] Back out Projection changes.
c5419b3 [navis.ryu] addressed comments
143e1ef [navis.ryu] fixed format & added test for CCE case
735972f [navis.ryu] used new conf apis
1a02a55 [navis.ryu] Rolled-back test-conf cleanup & fixed possible CCE & added more tests
51178e8 [navis.ryu] addressed comments
4d326b9 [navis.ryu] fixed test fails
15c5afc [navis.ryu] added a test as suggested by JoshRosen
d396589 [navis.ryu] added comments
1b07556 [navis.ryu] [SPARK-8357] [SQL] Memory leakage on unsafe aggregation path with empty input


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ba7c64d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ba7c64d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ba7c64d

Branch: refs/heads/master
Commit: 9ba7c64decfc92853bd281e9e7bfb95211080dd4
Parents: 87d890c
Author: navis.ryu <na...@apache.org>
Authored: Tue Jul 21 11:52:52 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jul 21 11:52:52 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/GeneratedAggregate.scala      | 14 +++++-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  9 ++++
 .../spark/sql/execution/AggregateSuite.scala    | 48 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ba7c64d/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index c069da0..ecde9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -266,7 +266,18 @@ case class GeneratedAggregate(
 
       val joinedRow = new JoinedRow3
 
-      if (groupingExpressions.isEmpty) {
+      if (!iter.hasNext) {
+        // This is an empty input, so return early so that we do not allocate data structures
+        // that won't be cleaned up (see SPARK-8357).
+        if (groupingExpressions.isEmpty) {
+          // This is a global aggregate, so return an empty aggregation buffer.
+          val resultProjection = resultProjectionBuilder()
+          Iterator(resultProjection(newAggregationBuffer(EmptyRow)))
+        } else {
+          // This is a grouped aggregate, so return an empty iterator.
+          Iterator[InternalRow]()
+        }
+      } else if (groupingExpressions.isEmpty) {
         // TODO: Codegening anything other than the updateProjection is probably over kill.
         val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow]
         var currentRow: InternalRow = null
@@ -280,6 +291,7 @@ case class GeneratedAggregate(
         val resultProjection = resultProjectionBuilder()
         Iterator(resultProjection(buffer))
       } else if (unsafeEnabled) {
+        assert(iter.hasNext, "There should be at least one row for this path")
         log.info("Using Unsafe-based aggregator")
         val aggregationMap = new UnsafeFixedWidthAggregationMap(
           newAggregationBuffer,

http://git-wip-us.apache.org/repos/asf/spark/blob/9ba7c64d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 61d5f20..beee101 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -648,6 +648,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
       Row(2, 1, 2, 2, 1))
   }
 
+  test("count of empty table") {
+    withTempTable("t") {
+      Seq.empty[(Int, Int)].toDF("a", "b").registerTempTable("t")
+      checkAnswer(
+        sql("select count(a) from t"),
+        Row(0))
+    }
+  }
+
   test("inner join where, one match per row") {
     checkAnswer(
       sql("SELECT * FROM upperCaseData JOIN lowerCaseData WHERE n = N"),

http://git-wip-us.apache.org/repos/asf/spark/blob/9ba7c64d/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala
new file mode 100644
index 0000000..20def6b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.test.TestSQLContext
+
+class AggregateSuite extends SparkPlanTest {
+
+  test("SPARK-8357 unsafe aggregation path should not leak memory with empty input") {
+    val codegenDefault = TestSQLContext.getConf(SQLConf.CODEGEN_ENABLED)
+    val unsafeDefault = TestSQLContext.getConf(SQLConf.UNSAFE_ENABLED)
+    try {
+      TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, true)
+      TestSQLContext.setConf(SQLConf.UNSAFE_ENABLED, true)
+      val df = Seq.empty[(Int, Int)].toDF("a", "b")
+      checkAnswer(
+        df,
+        GeneratedAggregate(
+          partial = true,
+          Seq(df.col("b").expr),
+          Seq(Alias(Count(df.col("a").expr), "cnt")()),
+          unsafeEnabled = true,
+          _: SparkPlan),
+        Seq.empty
+      )
+    } finally {
+      TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault)
+      TestSQLContext.setConf(SQLConf.UNSAFE_ENABLED, unsafeDefault)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org