You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/01/19 19:44:57 UTC

spark git commit: [SPARK-12870][SQL] better format bucket id in file name

Repository: spark
Updated Branches:
  refs/heads/master 0ddba6d88 -> e14817b52


[SPARK-12870][SQL] better format bucket id in file name

for normal parquet file without bucket, it's file name ends with a jobUUID which maybe all numbers and mistakeny regarded as bucket id. This PR improves the format of bucket id in file name by using a different seperator, `_`, so that the regex is more robust.

Author: Wenchen Fan <we...@databricks.com>

Closes #10799 from cloud-fan/fix-bucket.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e14817b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e14817b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e14817b5

Branch: refs/heads/master
Commit: e14817b528ccab4b4685b45a95e2325630b5fc53
Parents: 0ddba6d
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Jan 19 10:44:51 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Jan 19 10:44:51 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/bucket.scala      | 14 ++++++++++----
 .../sql/execution/datasources/json/JSONRelation.scala |  2 +-
 .../datasources/parquet/ParquetRelation.scala         |  2 +-
 .../org/apache/spark/sql/hive/orc/OrcRelation.scala   |  2 +-
 4 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e14817b5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
index c7ecd61..3e0d484 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
@@ -57,15 +57,21 @@ private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFact
 
 private[sql] object BucketingUtils {
   // The file name of bucketed data should have 3 parts:
-  //   1. some other information in the head of file name, ends with `-`
-  //   2. bucket id part, some numbers
+  //   1. some other information in the head of file name
+  //   2. bucket id part, some numbers, starts with "_"
+  //      * The other-information part may use `-` as separator and may have numbers at the end,
+  //        e.g. a normal parquet file without bucketing may have name:
+  //        part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly
+  //        treat `431234567891` as bucket id. So here we pick `_` as separator.
   //   3. optional file extension part, in the tail of file name, starts with `.`
   // An example of bucketed parquet file name with bucket id 3:
-  //   part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-00003.gz.parquet
-  private val bucketedFileName = """.*-(\d+)(?:\..*)?$""".r
+  //   part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+  private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
 
   def getBucketId(fileName: String): Option[Int] = fileName match {
     case bucketedFileName(bucketId) => Some(bucketId.toInt)
     case other => None
   }
+
+  def bucketIdToString(id: Int): String = f"_$id%05d"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e14817b5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 20c60b9..31c5620 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -193,7 +193,7 @@ private[json] class JsonOutputWriter(
         val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
-        val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+        val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
       }
     }.getRecordWriter(context)

http://git-wip-us.apache.org/repos/asf/spark/blob/e14817b5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 30ddec6..b460ec1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -90,7 +90,7 @@ private[sql] class ParquetOutputWriter(
           val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
           val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
-          val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+          val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
           new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e14817b5/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4040916..800823f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -103,7 +103,7 @@ private[orc] class OrcOutputWriter(
     val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
-    val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+    val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
     val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc"
 
     new OrcOutputFormat().getRecordWriter(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org