You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/07/21 20:53:10 UTC
spark git commit: [SPARK-8357] Fix unsafe memory leak on empty inputs
in GeneratedAggregate
Repository: spark
Updated Branches:
refs/heads/master 87d890cc1 -> 9ba7c64de
[SPARK-8357] Fix unsafe memory leak on empty inputs in GeneratedAggregate
This patch fixes a managed memory leak in GeneratedAggregate. The leak occurs when the unsafe aggregation path is used to perform grouped aggregation on an empty input; in this case, GeneratedAggregate allocates an UnsafeFixedWidthAggregationMap that is never cleaned up because `next()` is never called on the aggregate result iterator.
This patch fixes this by short-circuiting on empty inputs.
This patch is an updated version of #6810.
Closes #6810.
Author: navis.ryu <na...@apache.org>
Author: Josh Rosen <jo...@databricks.com>
Closes #7560 from JoshRosen/SPARK-8357 and squashes the following commits:
3486ce4 [Josh Rosen] Some minor cleanup
c649310 [Josh Rosen] Revert SparkPlan change:
3c7db0f [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-8357
adc8239 [Josh Rosen] Back out Projection changes.
c5419b3 [navis.ryu] addressed comments
143e1ef [navis.ryu] fixed format & added test for CCE case
735972f [navis.ryu] used new conf apis
1a02a55 [navis.ryu] Rolled-back test-conf cleanup & fixed possible CCE & added more tests
51178e8 [navis.ryu] addressed comments
4d326b9 [navis.ryu] fixed test fails
15c5afc [navis.ryu] added a test as suggested by JoshRosen
d396589 [navis.ryu] added comments
1b07556 [navis.ryu] [SPARK-8357] [SQL] Memory leakage on unsafe aggregation path with empty input
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ba7c64d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ba7c64d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ba7c64d
Branch: refs/heads/master
Commit: 9ba7c64decfc92853bd281e9e7bfb95211080dd4
Parents: 87d890c
Author: navis.ryu <na...@apache.org>
Authored: Tue Jul 21 11:52:52 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jul 21 11:52:52 2015 -0700
----------------------------------------------------------------------
.../sql/execution/GeneratedAggregate.scala | 14 +++++-
.../org/apache/spark/sql/SQLQuerySuite.scala | 9 ++++
.../spark/sql/execution/AggregateSuite.scala | 48 ++++++++++++++++++++
3 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9ba7c64d/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index c069da0..ecde9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -266,7 +266,18 @@ case class GeneratedAggregate(
val joinedRow = new JoinedRow3
- if (groupingExpressions.isEmpty) {
+ if (!iter.hasNext) {
+ // This is an empty input, so return early so that we do not allocate data structures
+ // that won't be cleaned up (see SPARK-8357).
+ if (groupingExpressions.isEmpty) {
+ // This is a global aggregate, so return an empty aggregation buffer.
+ val resultProjection = resultProjectionBuilder()
+ Iterator(resultProjection(newAggregationBuffer(EmptyRow)))
+ } else {
+ // This is a grouped aggregate, so return an empty iterator.
+ Iterator[InternalRow]()
+ }
+ } else if (groupingExpressions.isEmpty) {
// TODO: Codegening anything other than the updateProjection is probably over kill.
val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow]
var currentRow: InternalRow = null
@@ -280,6 +291,7 @@ case class GeneratedAggregate(
val resultProjection = resultProjectionBuilder()
Iterator(resultProjection(buffer))
} else if (unsafeEnabled) {
+ assert(iter.hasNext, "There should be at least one row for this path")
log.info("Using Unsafe-based aggregator")
val aggregationMap = new UnsafeFixedWidthAggregationMap(
newAggregationBuffer,
http://git-wip-us.apache.org/repos/asf/spark/blob/9ba7c64d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 61d5f20..beee101 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -648,6 +648,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
Row(2, 1, 2, 2, 1))
}
+ test("count of empty table") {
+ withTempTable("t") {
+ Seq.empty[(Int, Int)].toDF("a", "b").registerTempTable("t")
+ checkAnswer(
+ sql("select count(a) from t"),
+ Row(0))
+ }
+ }
+
test("inner join where, one match per row") {
checkAnswer(
sql("SELECT * FROM upperCaseData JOIN lowerCaseData WHERE n = N"),
http://git-wip-us.apache.org/repos/asf/spark/blob/9ba7c64d/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala
new file mode 100644
index 0000000..20def6b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.test.TestSQLContext
+
+class AggregateSuite extends SparkPlanTest {
+
+ test("SPARK-8357 unsafe aggregation path should not leak memory with empty input") {
+ val codegenDefault = TestSQLContext.getConf(SQLConf.CODEGEN_ENABLED)
+ val unsafeDefault = TestSQLContext.getConf(SQLConf.UNSAFE_ENABLED)
+ try {
+ TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, true)
+ TestSQLContext.setConf(SQLConf.UNSAFE_ENABLED, true)
+ val df = Seq.empty[(Int, Int)].toDF("a", "b")
+ checkAnswer(
+ df,
+ GeneratedAggregate(
+ partial = true,
+ Seq(df.col("b").expr),
+ Seq(Alias(Count(df.col("a").expr), "cnt")()),
+ unsafeEnabled = true,
+ _: SparkPlan),
+ Seq.empty
+ )
+ } finally {
+ TestSQLContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault)
+ TestSQLContext.setConf(SQLConf.UNSAFE_ENABLED, unsafeDefault)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org