You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/08/12 06:40:16 UTC

spark git commit: [SPARK-16975][SQL] Column-partition path starting '_' should be handled correctly

Repository: spark
Updated Branches:
  refs/heads/master ccc6dc0f4 -> abff92bfd


[SPARK-16975][SQL] Column-partition path starting '_' should be handled correctly

## What changes were proposed in this pull request?

Currently, Spark ignores path names starting with underscore `_` and `.`. This causes read-failures for the column-partitioned file data sources whose partition column names starts from '_', e.g. `_col`.

**Before**
```scala
scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
scala> spark.read.parquet("/tmp/parquet")
org.apache.spark.sql.AnalysisException: Unable to infer schema for ParquetFormat at /tmp/parquet20. It must be specified manually;
```

**After**
```scala
scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
scala> spark.read.parquet("/tmp/parquet")
res2: org.apache.spark.sql.DataFrame = [id: bigint, _locality_code: int]
```

## How was this patch tested?

Pass the Jenkins with a new test case.

Author: Dongjoon Hyun <do...@apache.org>

Closes #14585 from dongjoon-hyun/SPARK-16975-PARQUET.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abff92bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abff92bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abff92bf

Branch: refs/heads/master
Commit: abff92bfdc7d4c9d2308794f0350561fe0ceb4dd
Parents: ccc6dc0
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Aug 12 14:40:12 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri Aug 12 14:40:12 2016 +0800

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileCatalog.scala          | 2 +-
 .../sql/execution/datasources/fileSourceInterfaces.scala    | 2 +-
 .../sql/execution/datasources/json/JsonFileFormat.scala     | 2 +-
 .../execution/datasources/parquet/ParquetFileFormat.scala   | 3 ++-
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 9 +++++++++
 5 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abff92bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 811e96c..cef9d4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -204,6 +204,6 @@ abstract class PartitioningAwareFileCatalog(
 
   private def isDataPath(path: Path): Boolean = {
     val name = path.getName
-    !(name.startsWith("_") || name.startsWith("."))
+    !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/abff92bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index f068779..e03a232 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -364,7 +364,7 @@ object HadoopFsRelation extends Logging {
     // We filter everything that starts with _ and ., except _common_metadata and _metadata
     // because Parquet needs to find those metadata files from leaf files returned by this method.
     // We should refactor this logic to not mix metadata files with data files.
-    (pathName.startsWith("_") || pathName.startsWith(".")) &&
+    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
       !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abff92bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 19681be..27910e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -54,7 +54,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
           .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
       val jsonFiles = files.filterNot { status =>
         val name = status.getPath.getName
-        name.startsWith("_") || name.startsWith(".")
+        (name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
       }.toArray
 
       val jsonSchema = InferSchema.infer(

http://git-wip-us.apache.org/repos/asf/spark/blob/abff92bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 7794f31..9c4778a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -236,7 +236,8 @@ class ParquetFileFormat
     // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
     val leaves = allFiles.filter { f =>
       isSummaryFile(f.getPath) ||
-          !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+        !((f.getPath.getName.startsWith("_") && !f.getPath.getName.contains("=")) ||
+          f.getPath.getName.startsWith("."))
     }.toArray.sortBy(_.getPath.toString)
 
     FileTypes(

http://git-wip-us.apache.org/repos/asf/spark/blob/abff92bf/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index eac588f..4fcde58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import java.io.File
 import java.math.MathContext
 import java.sql.{Date, Timestamp}
 
@@ -2637,6 +2638,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
+    withTempDir { dir =>
+      val parquetDir = new File(dir, "parquet").getCanonicalPath
+      spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(parquetDir)
+      spark.read.parquet(parquetDir)
+    }
+  }
+
   test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") {
     withTable("tbl") {
       sql("CREATE TABLE tbl(a INT, b INT) USING parquet")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org