You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/27 19:09:15 UTC

spark git commit: [SPARK-7847] [SQL] Fixes dynamic partition directory escaping

Repository: spark
Updated Branches:
  refs/heads/master ff0ddff46 -> 15459db4f


[SPARK-7847] [SQL] Fixes dynamic partition directory escaping

Please refer to [SPARK-7847] [1] for details.

[1]: https://issues.apache.org/jira/browse/SPARK-7847

Author: Cheng Lian <li...@databricks.com>

Closes #6389 from liancheng/spark-7847 and squashes the following commits:

935c652 [Cheng Lian] Adds test case for writing various data types as dynamic partition value
f4fc398 [Cheng Lian] Converts partition columns to Scala type when writing dynamic partitions
d0aeca0 [Cheng Lian] Fixes dynamic partition directory escaping


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15459db4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15459db4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15459db4

Branch: refs/heads/master
Commit: 15459db4f6867e95076cf53fade2fca833c4cf4e
Parents: ff0ddff
Author: Cheng Lian <li...@databricks.com>
Authored: Wed May 27 10:09:12 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 27 10:09:12 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 22 ++++--
 .../spark/sql/sources/PartitioningUtils.scala   | 76 +++++++++++++++++++-
 .../org/apache/spark/sql/sources/commands.scala | 57 ++-------------
 .../ParquetPartitionDiscoverySuite.scala        | 57 ++++++++++++++-
 4 files changed, 152 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15459db4/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index cb1e608..8b3e1b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.parquet
 
+import java.net.URI
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
@@ -282,21 +283,28 @@ private[sql] class ParquetRelation2(
         val cacheMetadata = useMetadataCache
 
         @transient val cachedStatuses = inputFiles.map { f =>
-          // In order to encode the authority of a Path containing special characters such as /,
-          // we need to use the string returned by the URI of the path to create a new Path.
-          val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
+          // In order to encode the authority of a Path containing special characters such as '/'
+          // (which does happen in some S3N credentials), we need to use the string returned by the
+          // URI of the path to create a new Path.
+          val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
           new FileStatus(
             f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
-            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
         }.toSeq
 
         @transient val cachedFooters = footers.map { f =>
           // In order to encode the authority of a Path containing special characters such as /,
           // we need to use the string returned by the URI of the path to create a new Path.
-          new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+          new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
         }.toSeq
 
+        private def escapePathUserInfo(path: Path): Path = {
+          val uri = path.toUri
+          new Path(new URI(
+            uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
+            uri.getQuery, uri.getFragment))
+        }
+
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
           val inputFormat = if (cacheMetadata) {
@@ -377,7 +385,7 @@ private[sql] class ParquetRelation2(
               .orElse(readSchema())
               .orElse(maybeMetastoreSchema)
               .getOrElse(sys.error("Failed to get the schema."))
-        
+
           // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
           // case insensitivity issue and possible schema mismatch (probably caused by schema
           // evolution).

http://git-wip-us.apache.org/repos/asf/spark/blob/15459db4/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index e0ead23..dafdf0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -221,7 +222,7 @@ private[sql] object PartitioningUtils {
       // Then falls back to string
       .getOrElse {
         if (raw == defaultPartitionName) Literal.create(null, NullType)
-        else Literal.create(raw, StringType)
+        else Literal.create(unescapePathName(raw), StringType)
       }
   }
 
@@ -243,4 +244,77 @@ private[sql] object PartitioningUtils {
       Literal.create(Cast(l, desiredType).eval(), desiredType)
     }
   }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  val charToEscape = {
+    val bitSet = new java.util.BitSet(128)
+
+    /**
+     * ASCII 01-1F are HTTP control characters that need to be escaped.
+     * \u000A and \u000D are \n and \r, respectively.
+     */
+    val clist = Array(
+      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
+      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
+      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
+      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
+      '{', '[', ']', '^')
+
+    clist.foreach(bitSet.set(_))
+
+    if (Shell.WINDOWS) {
+      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+    }
+
+    bitSet
+  }
+
+  def needsEscaping(c: Char): Boolean = {
+    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+  }
+
+  def escapePathName(path: String): String = {
+    val builder = new StringBuilder()
+    path.foreach { c =>
+      if (needsEscaping(c)) {
+        builder.append('%')
+        builder.append(f"${c.asInstanceOf[Int]}%02x")
+      } else {
+        builder.append(c)
+      }
+    }
+
+    builder.toString()
+  }
+
+  def unescapePathName(path: String): String = {
+    val sb = new StringBuilder
+    var i = 0
+
+    while (i < path.length) {
+      val c = path.charAt(i)
+      if (c == '%' && i + 2 < path.length) {
+        val code: Int = try {
+          Integer.valueOf(path.substring(i + 1, i + 3), 16)
+        } catch { case e: Exception =>
+          -1: Integer
+        }
+        if (code >= 0) {
+          sb.append(code.asInstanceOf[Char])
+          i += 3
+        } else {
+          sb.append(c)
+          i += 1
+        }
+      } else {
+        sb.append(c)
+        i += 1
+      }
+    }
+
+    sb.toString()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15459db4/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index fbd98ef..3132067 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.hadoop.util.Shell
 import parquet.hadoop.util.ContextUtil
 
 import org.apache.spark._
@@ -35,7 +34,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
 
 private[sql] case class InsertIntoDataSource(
     logicalRelation: LogicalRelation,
@@ -208,9 +208,11 @@ private[sql] case class InsertIntoHadoopFsRelation(
           writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart)
         }
       } else {
+        val partitionSchema = StructType.fromAttributes(partitionOutput)
+        val converter = CatalystTypeConverters.createToScalaConverter(partitionSchema)
         while (iterator.hasNext) {
           val row = iterator.next()
-          val partitionPart = partitionProj(row)
+          val partitionPart = converter(partitionProj(row)).asInstanceOf[Row]
           val dataPart = dataProj(row)
           writerContainer.outputWriterForRow(partitionPart).write(dataPart)
         }
@@ -416,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer(
       val valueString = if (string == null || string.isEmpty) {
         defaultPartitionName
       } else {
-        DynamicPartitionWriterContainer.escapePathName(string)
+        PartitioningUtils.escapePathName(string)
       }
       s"/$col=$valueString"
     }.mkString.stripPrefix(Path.SEPARATOR)
@@ -448,50 +450,3 @@ private[sql] class DynamicPartitionWriterContainer(
     }
   }
 }
-
-private[sql] object DynamicPartitionWriterContainer {
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-  // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  val charToEscape = {
-    val bitSet = new java.util.BitSet(128)
-
-    /**
-     * ASCII 01-1F are HTTP control characters that need to be escaped.
-     * \u000A and \u000D are \n and \r, respectively.
-     */
-    val clist = Array(
-      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
-      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
-      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
-      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
-      '{', '[', ']', '^')
-
-    clist.foreach(bitSet.set(_))
-
-    if (Shell.WINDOWS) {
-      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
-    }
-
-    bitSet
-  }
-
-  def needsEscaping(c: Char): Boolean = {
-    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
-  }
-
-  def escapePathName(path: String): String = {
-    val builder = new StringBuilder()
-    path.foreach { c =>
-      if (DynamicPartitionWriterContainer.needsEscaping(c)) {
-        builder.append('%')
-        builder.append(f"${c.asInstanceOf[Int]}%02x")
-      } else {
-        builder.append(c)
-      }
-    }
-
-    builder.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15459db4/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 90d4528..f231589 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -17,6 +17,8 @@
 package org.apache.spark.sql.parquet
 
 import java.io.File
+import java.math.BigInteger
+import java.sql.{Timestamp, Date}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -27,7 +29,7 @@ import org.apache.spark.sql.sources.PartitioningUtils._
 import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.{Column, QueryTest, Row, SQLContext}
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -377,4 +379,57 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
       }
     }
   }
+
+  test("SPARK-7847: Dynamic partition directory path escaping and unescaping") {
+    withTempPath { dir =>
+      val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s")
+      df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath)
+      checkAnswer(read.parquet(dir.getCanonicalPath), df.collect())
+    }
+  }
+
+  test("Various partition value types") {
+    val row =
+      Row(
+        100.toByte,
+        40000.toShort,
+        Int.MaxValue,
+        Long.MaxValue,
+        1.5.toFloat,
+        4.5,
+        new java.math.BigDecimal(new BigInteger("212500"), 5),
+        new java.math.BigDecimal(2.125),
+        java.sql.Date.valueOf("2015-05-23"),
+        new Timestamp(0),
+        "This is a string, /[]?=:",
+        "This is not a partition column")
+
+    // BooleanType is not supported yet
+    val partitionColumnTypes =
+      Seq(
+        ByteType,
+        ShortType,
+        IntegerType,
+        LongType,
+        FloatType,
+        DoubleType,
+        DecimalType(10, 5),
+        DecimalType.Unlimited,
+        DateType,
+        TimestampType,
+        StringType)
+
+    val partitionColumns = partitionColumnTypes.zipWithIndex.map {
+      case (t, index) => StructField(s"p_$index", t)
+    }
+
+    val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
+    val df = createDataFrame(sparkContext.parallelize(row :: Nil), schema)
+
+    withTempPath { dir =>
+      df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
+      val fields = schema.map(f => Column(f.name).cast(f.dataType))
+      checkAnswer(read.load(dir.toString).select(fields: _*), row)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org