You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/01/30 00:41:20 UTC

spark git commit: [SPARK-5309][SQL] Add support for dictionaries in PrimitiveConverter for Strin...

Repository: spark
Updated Branches:
  refs/heads/master bce0ba1fb -> 940f37561


[SPARK-5309][SQL] Add support for dictionaries in PrimitiveConverter for Strin...

...gs.

Parquet Converters allow developers to take advantage of dictionary encoding of column data to reduce Column Binary decoding.

The Spark PrimitiveConverter was not using that API and consequently for String columns that used dictionary compression repeated Binary to String conversions for the same String.

In measurements this could account for over 25% of entire query time.
For example a 500M row table split across 16 blocks was aggregated and summed in a litte under 30s before this change and a little under 20s after the change.

Author: Michael Davies <Mi...@gmail.com>

Closes #4187 from MickDavies/SPARK-5309-2 and squashes the following commits:

327287e [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings.
33c002c [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/940f3756
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/940f3756
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/940f3756

Branch: refs/heads/master
Commit: 940f3756116647a25fddb54111112b95ba9b8740
Parents: bce0ba1
Author: Michael Davies <Mi...@gmail.com>
Authored: Thu Jan 29 15:40:59 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Jan 29 15:40:59 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetConverter.scala    | 48 +++++++++++++++-----
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 11 +++++
 2 files changed, 47 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/940f3756/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 9d91502..10df8c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
 
 import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
 
+import parquet.column.Dictionary
 import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
 import parquet.schema.MessageType
 
@@ -102,12 +103,8 @@ private[sql] object CatalystConverter {
       }
       // Strings, Shorts and Bytes do not have a corresponding type in Parquet
       // so we need to treat them separately
-      case StringType => {
-        new CatalystPrimitiveConverter(parent, fieldIndex) {
-          override def addBinary(value: Binary): Unit =
-            parent.updateString(fieldIndex, value)
-        }
-      }
+      case StringType =>
+        new CatalystPrimitiveStringConverter(parent, fieldIndex)
       case ShortType => {
         new CatalystPrimitiveConverter(parent, fieldIndex) {
           override def addInt(value: Int): Unit =
@@ -197,8 +194,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
   protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
     updateField(fieldIndex, value.getBytes)
 
-  protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
-    updateField(fieldIndex, value.toStringUsingUTF8)
+  protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
+    updateField(fieldIndex, value)
 
   protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
     updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
@@ -384,8 +381,8 @@ private[parquet] class CatalystPrimitiveRowConverter(
   override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
     current.update(fieldIndex, value.getBytes)
 
-  override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
-    current.setString(fieldIndex, value.toStringUsingUTF8)
+  override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
+    current.setString(fieldIndex, value)
 
   override protected[parquet] def updateDecimal(
       fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
@@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter(
     parent.updateLong(fieldIndex, value)
 }
 
+/**
+ * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String.
+ * Supports dictionaries to reduce Binary to String conversion overhead.
+ *
+ * Follows pattern in Parquet of using dictionaries, where supported, for String conversion.
+ *
+ * @param parent The parent group converter.
+ * @param fieldIndex The index inside the record.
+ */
+private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int)
+  extends CatalystPrimitiveConverter(parent, fieldIndex) {
+
+  private[this] var dict: Array[String] = null
+
+  override def hasDictionarySupport: Boolean = true
+
+  override def setDictionary(dictionary: Dictionary):Unit =
+    dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8}
+
+
+  override def addValueFromDictionary(dictionaryId: Int): Unit =
+    parent.updateString(fieldIndex, dict(dictionaryId))
+
+  override def addBinary(value: Binary): Unit =
+    parent.updateString(fieldIndex, value.toStringUsingUTF8)
+}
+
 private[parquet] object CatalystArrayConverter {
   val INITIAL_ARRAY_SIZE = 20
 }
@@ -583,9 +607,9 @@ private[parquet] class CatalystNativeArrayConverter(
     elements += 1
   }
 
-  override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
+  override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = {
     checkGrowBuffer()
-    buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType]
+    buffer(elements) = value.asInstanceOf[NativeType]
     elements += 1
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/940f3756/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 1263ff8..3d82f4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -85,4 +85,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
       checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
     }
   }
+
+  test("SPARK-5309 strings stored using dictionary compression in parquet") {
+    withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") {
+
+      checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
+        (0 until 10).map(i => Row("same", "run_" + i, 100)))
+
+      checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"),
+        List(Row("same", "run_5", 100)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org