You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/18 20:59:49 UTC

spark git commit: [SPARK-7570] [SQL] Ignores _temporary during partition discovery

Repository: spark
Updated Branches:
  refs/heads/master e1ac2a955 -> 010a1c278


[SPARK-7570] [SQL] Ignores _temporary during partition discovery

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6091)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #6091 from liancheng/spark-7570 and squashes the following commits:

8ff07e8 [Cheng Lian] Ignores _temporary during partition discovery


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/010a1c27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/010a1c27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/010a1c27

Branch: refs/heads/master
Commit: 010a1c278037130a69dcc79427d2b0380a2c82d8
Parents: e1ac2a9
Author: Cheng Lian <li...@databricks.com>
Authored: Mon May 18 11:59:44 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon May 18 11:59:44 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/sources/PartitioningUtils.scala   | 15 ++++++----
 .../ParquetPartitionDiscoverySuite.scala        | 31 +++++++++++---------
 2 files changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/010a1c27/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index d1f0cda..8f8138d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -23,8 +23,7 @@ import java.math.{BigDecimal => JBigDecimal}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
-import com.google.common.cache.{CacheBuilder, Cache}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -69,7 +68,7 @@ private[sql] object PartitioningUtils {
   private[sql] def parsePartitions(
       paths: Seq[Path],
       defaultPartitionName: String): PartitionSpec = {
-    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
+    val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName)))
     val fields = {
       val (PartitionValues(columnNames, literals)) = partitionValues.head
       columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
@@ -103,13 +102,19 @@ private[sql] object PartitioningUtils {
    */
   private[sql] def parsePartition(
       path: Path,
-      defaultPartitionName: String): PartitionValues = {
+      defaultPartitionName: String): Option[PartitionValues] = {
     val columns = ArrayBuffer.empty[(String, Literal)]
     // Old Hadoop versions don't have `Path.isRoot`
     var finished = path.getParent == null
     var chopped = path
 
     while (!finished) {
+      // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
+      // uncleaned.  Here we simply ignore them.
+      if (chopped.getName == "_temporary") {
+        return None
+      }
+
       val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
       maybeColumn.foreach(columns += _)
       chopped = chopped.getParent
@@ -117,7 +122,7 @@ private[sql] object PartitioningUtils {
     }
 
     val (columnNames, values) = columns.reverse.unzip
-    PartitionValues(columnNames, values)
+    Some(PartitionValues(columnNames, values))
   }
 
   private def parsePartitionColumn(

http://git-wip-us.apache.org/repos/asf/spark/blob/010a1c27/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 8079c46..1927114 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -54,44 +54,47 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
   }
 
   test("parse partition") {
-    def check(path: String, expected: PartitionValues): Unit = {
+    def check(path: String, expected: Option[PartitionValues]): Unit = {
       assert(expected === parsePartition(new Path(path), defaultPartitionName))
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), defaultPartitionName)
+        parsePartition(new Path(path), defaultPartitionName).get
       }.getMessage
 
       assert(message.contains(expected))
     }
 
-    check(
-      "file:///",
+    check("file:///", Some {
       PartitionValues(
         ArrayBuffer.empty[String],
-        ArrayBuffer.empty[Literal]))
+        ArrayBuffer.empty[Literal])
+    })
 
-    check(
-      "file://path/a=10",
+    check("file://path/a=10", Some {
       PartitionValues(
         ArrayBuffer("a"),
-        ArrayBuffer(Literal.create(10, IntegerType))))
+        ArrayBuffer(Literal.create(10, IntegerType)))
+    })
 
-    check(
-      "file://path/a=10/b=hello/c=1.5",
+    check("file://path/a=10/b=hello/c=1.5", Some {
       PartitionValues(
         ArrayBuffer("a", "b", "c"),
         ArrayBuffer(
           Literal.create(10, IntegerType),
           Literal.create("hello", StringType),
-          Literal.create(1.5, FloatType))))
+          Literal.create(1.5, FloatType)))
+    })
 
-    check(
-      "file://path/a=10/b_hello/c=1.5",
+    check("file://path/a=10/b_hello/c=1.5", Some {
       PartitionValues(
         ArrayBuffer("c"),
-        ArrayBuffer(Literal.create(1.5, FloatType))))
+        ArrayBuffer(Literal.create(1.5, FloatType)))
+    })
+
+    check("file://path/a=10/_temporary/c=1.5", None)
+    check("file://path/a=10/c=1.5/_temporary", None)
 
     checkThrows[AssertionError]("file://path/=10", "Empty partition column name")
     checkThrows[AssertionError]("file://path/a=", "Empty partition column value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org