You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/10/12 19:17:25 UTC

spark git commit: [SPARK-11007] [SQL] Adds dictionary aware Parquet decimal converters

Repository: spark
Updated Branches:
  refs/heads/master fcb37a041 -> 64b1d00e1


[SPARK-11007] [SQL] Adds dictionary aware Parquet decimal converters

For Parquet decimal columns that are encoded using plain-dictionary encoding, we can make the upper level converter aware of the dictionary, so that we can pre-instantiate all the decimals to avoid duplicated instantiation.

Note that plain-dictionary encoding isn't available for `FIXED_LEN_BYTE_ARRAY` for Parquet writer version `PARQUET_1_0`. So currently only decimals written as `INT32` and `INT64` can benefit from this optimization.

Author: Cheng Lian <li...@databricks.com>

Closes #9040 from liancheng/spark-11007.decimal-converter-dict-support.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64b1d00e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64b1d00e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64b1d00e

Branch: refs/heads/master
Commit: 64b1d00e1a7c1dc52c08a5e97baf6e7117f1a94f
Parents: fcb37a0
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Oct 12 10:17:19 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Oct 12 10:17:19 2015 -0700

----------------------------------------------------------------------
 .../parquet/CatalystRowConverter.scala          |  83 ++++++++++++++++---
 sql/core/src/test/resources/dec-in-i32.parquet  | Bin 0 -> 420 bytes
 sql/core/src/test/resources/dec-in-i64.parquet  | Bin 0 -> 437 bytes
 .../datasources/parquet/ParquetIOSuite.scala    |  19 +++++
 .../ParquetProtobufCompatibilitySuite.scala     |  22 ++---
 .../datasources/parquet/ParquetTest.scala       |   5 ++
 6 files changed, 103 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64b1d00e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 247d353..49007e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{DOUBLE, INT32, INT64, BINARY, FIXED_LEN_BYTE_ARRAY}
 import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
 
 import org.apache.spark.Logging
@@ -222,8 +222,25 @@ private[parquet] class CatalystRowConverter(
             updater.setShort(value.asInstanceOf[ShortType#InternalType])
         }
 
+      // For INT32 backed decimals
+      case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
+        new CatalystIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+
+      // For INT64 backed decimals
+      case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
+        new CatalystLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+
+      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
+      case t: DecimalType
+        if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
+           parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
+        new CatalystBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+
       case t: DecimalType =>
-        new CatalystDecimalConverter(t, updater)
+        throw new RuntimeException(
+          s"Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is " +
+            s"$parquetType.  Parquet DECIMAL type can only be backed by INT32, INT64, " +
+            "FIXED_LEN_BYTE_ARRAY, or BINARY.")
 
       case StringType =>
         new CatalystStringConverter(updater)
@@ -274,9 +291,10 @@ private[parquet] class CatalystRowConverter(
           override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
         })
 
-      case _ =>
+      case t =>
         throw new RuntimeException(
-          s"Unable to create Parquet converter for data type ${catalystType.json}")
+          s"Unable to create Parquet converter for data type ${t.json} " +
+            s"whose Parquet type is $parquetType")
     }
   }
 
@@ -314,11 +332,18 @@ private[parquet] class CatalystRowConverter(
   /**
    * Parquet converter for fixed-precision decimals.
    */
-  private final class CatalystDecimalConverter(
-      decimalType: DecimalType,
-      updater: ParentContainerUpdater)
+  private abstract class CatalystDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
     extends CatalystPrimitiveConverter(updater) {
 
+    protected var expandedDictionary: Array[Decimal] = _
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
     // Converts decimals stored as INT32
     override def addInt(value: Int): Unit = {
       addLong(value: Long)
@@ -326,18 +351,19 @@ private[parquet] class CatalystRowConverter(
 
     // Converts decimals stored as INT64
     override def addLong(value: Long): Unit = {
-      updater.set(Decimal(value, decimalType.precision, decimalType.scale))
+      updater.set(decimalFromLong(value))
     }
 
     // Converts decimals stored as either FIXED_LENGTH_BYTE_ARRAY or BINARY
     override def addBinary(value: Binary): Unit = {
-      updater.set(toDecimal(value))
+      updater.set(decimalFromBinary(value))
     }
 
-    private def toDecimal(value: Binary): Decimal = {
-      val precision = decimalType.precision
-      val scale = decimalType.scale
+    protected def decimalFromLong(value: Long): Decimal = {
+      Decimal(value, precision, scale)
+    }
 
+    protected def decimalFromBinary(value: Binary): Decimal = {
       if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
         // Constructs a `Decimal` with an unscaled `Long` value if possible.
         val unscaled = binaryToUnscaledLong(value)
@@ -371,6 +397,39 @@ private[parquet] class CatalystRowConverter(
     }
   }
 
+  private class CatalystIntDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends CatalystDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromLong(dictionary.decodeToInt(id).toLong)
+      }
+    }
+  }
+
+  private class CatalystLongDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends CatalystDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromLong(dictionary.decodeToLong(id))
+      }
+    }
+  }
+
+  private class CatalystBinaryDictionaryAwareDecimalConverter(
+      precision: Int, scale: Int, updater: ParentContainerUpdater)
+    extends CatalystDecimalConverter(precision, scale, updater) {
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
+        decimalFromBinary(dictionary.decodeToBinary(id))
+      }
+    }
+  }
+
   /**
    * Parquet converter for arrays.  Spark SQL arrays are represented as Parquet lists.  Standard
    * Parquet lists are represented as a 3-level group annotated by `LIST`:

http://git-wip-us.apache.org/repos/asf/spark/blob/64b1d00e/sql/core/src/test/resources/dec-in-i32.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/dec-in-i32.parquet b/sql/core/src/test/resources/dec-in-i32.parquet
new file mode 100755
index 0000000..bb5d4af
Binary files /dev/null and b/sql/core/src/test/resources/dec-in-i32.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/64b1d00e/sql/core/src/test/resources/dec-in-i64.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/dec-in-i64.parquet b/sql/core/src/test/resources/dec-in-i64.parquet
new file mode 100755
index 0000000..e07c4a0
Binary files /dev/null and b/sql/core/src/test/resources/dec-in-i64.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/64b1d00e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 599cf94..7274479 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -488,6 +488,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
     }
   }
+
+  test("read dictionary encoded decimals written as INT32") {
+    checkAnswer(
+      // Decimal column in this file is encoded using plain dictionary
+      readResourceParquetFile("dec-in-i32.parquet"),
+      sqlContext.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec))
+  }
+
+  test("read dictionary encoded decimals written as INT64") {
+    checkAnswer(
+      // Decimal column in this file is encoded using plain dictionary
+      readResourceParquetFile("dec-in-i64.parquet"),
+      sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
+  }
+
+  // TODO Adds test case for reading dictionary encoded decimals written as `FIXED_LEN_BYTE_ARRAY`
+  // The Parquet writer version Spark 1.6 and prior versions use is `PARQUET_1_0`, which doesn't
+  // provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`.  Should add a test here once
+  // we upgrade to `PARQUET_2_0`.
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/64b1d00e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
index b290429..98333e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
@@ -17,23 +17,17 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.SharedSQLContext
 
 class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest with SharedSQLContext {
-
-  private def readParquetProtobufFile(name: String): DataFrame = {
-    val url = Thread.currentThread().getContextClassLoader.getResource(name)
-    sqlContext.read.parquet(url.toString)
-  }
-
   test("unannotated array of primitive type") {
-    checkAnswer(readParquetProtobufFile("old-repeated-int.parquet"), Row(Seq(1, 2, 3)))
+    checkAnswer(readResourceParquetFile("old-repeated-int.parquet"), Row(Seq(1, 2, 3)))
   }
 
   test("unannotated array of struct") {
     checkAnswer(
-      readParquetProtobufFile("old-repeated-message.parquet"),
+      readResourceParquetFile("old-repeated-message.parquet"),
       Row(
         Seq(
           Row("First inner", null, null),
@@ -41,14 +35,14 @@ class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest with Sh
           Row(null, null, "Third inner"))))
 
     checkAnswer(
-      readParquetProtobufFile("proto-repeated-struct.parquet"),
+      readResourceParquetFile("proto-repeated-struct.parquet"),
       Row(
         Seq(
           Row("0 - 1", "0 - 2", "0 - 3"),
           Row("1 - 1", "1 - 2", "1 - 3"))))
 
     checkAnswer(
-      readParquetProtobufFile("proto-struct-with-array-many.parquet"),
+      readResourceParquetFile("proto-struct-with-array-many.parquet"),
       Seq(
         Row(
           Seq(
@@ -66,13 +60,13 @@ class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest with Sh
 
   test("struct with unannotated array") {
     checkAnswer(
-      readParquetProtobufFile("proto-struct-with-array.parquet"),
+      readResourceParquetFile("proto-struct-with-array.parquet"),
       Row(10, 9, Seq.empty, null, Row(9), Seq(Row(9), Row(10))))
   }
 
   test("unannotated array of struct with unannotated array") {
     checkAnswer(
-      readParquetProtobufFile("nested-array-struct.parquet"),
+      readResourceParquetFile("nested-array-struct.parquet"),
       Seq(
         Row(2, Seq(Row(1, Seq(Row(3))))),
         Row(5, Seq(Row(4, Seq(Row(6))))),
@@ -81,7 +75,7 @@ class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest with Sh
 
   test("unannotated array of string") {
     checkAnswer(
-      readParquetProtobufFile("proto-repeated-string.parquet"),
+      readResourceParquetFile("proto-repeated-string.parquet"),
       Seq(
         Row(Seq("hello", "world")),
         Row(Seq("good", "bye")),

http://git-wip-us.apache.org/repos/asf/spark/blob/64b1d00e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 9840ad9..8ffb01f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -139,4 +139,9 @@ private[sql] trait ParquetTest extends SQLTestUtils {
       withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { f }
     }
   }
+
+  protected def readResourceParquetFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    sqlContext.read.parquet(url.toString)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org