You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/13 09:48:11 UTC
spark git commit: [SPARK-20920][SQL] ForkJoinPool pools are leaked
when writing hive tables with many partitions
Repository: spark
Updated Branches:
refs/heads/master 278ba7a2c -> 7b7c85ede
[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions
## What changes were proposed in this pull request?
Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command
## How was this patch tested?
Existing tests.
Author: Sean Owen <so...@cloudera.com>
Closes #18216 from srowen/SPARK-20920.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b7c85ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b7c85ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b7c85ed
Branch: refs/heads/master
Commit: 7b7c85ede398996aafffb126440e5f0c67f67210
Parents: 278ba7a
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jun 13 10:48:07 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jun 13 10:48:07 2017 +0100
----------------------------------------------------------------------
.../spark/sql/execution/command/ddl.scala | 21 ++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7b7c85ed/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 793fb9b..5a7f8cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -21,7 +21,6 @@ import java.util.Locale
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
-import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
@@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -588,8 +587,15 @@ case class AlterTableRecoverPartitionsCommand(
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
- val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
- table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
+
+ val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
+ val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
+ try {
+ scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
+ spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
+ } finally {
+ evalPool.shutdown()
+ }
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")
@@ -610,8 +616,6 @@ case class AlterTableRecoverPartitionsCommand(
Seq.empty[Row]
}
- @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
-
private def scanPartitions(
spark: SparkSession,
fs: FileSystem,
@@ -620,7 +624,8 @@ case class AlterTableRecoverPartitionsCommand(
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
- resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
+ resolver: Resolver,
+ evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
@@ -644,7 +649,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
- partitionNames.drop(1), threshold, resolver)
+ partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org