You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/26 21:43:56 UTC

spark git commit: [SPARK-13499] [SQL] Performance improvements for parquet reader.

Repository: spark
Updated Branches:
  refs/heads/master 6df1e55a6 -> 0598a2b81


[SPARK-13499] [SQL] Performance improvements for parquet reader.

## What changes were proposed in this pull request?

This patch includes these performance fixes:
  - Remove unnecessary setNotNull() calls. The NULL bits are cleared already.
  - Speed up RLE group decoding
  - Speed up dictionary decoding by decoding NULLs directly into the result.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

In addition to the updated benchmarks, on TPCDS, the result of these changes
running Q55 (sf40) is:

```
TPCDS:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
---------------------------------------------------------------------------------
q55 (Before)                             6398 / 6616         18.0          55.5
q55 (After)                              4983 / 5189         23.1          43.3
```

Author: Nong Li <no...@databricks.com>

Closes #11375 from nongli/spark-13499.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0598a2b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0598a2b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0598a2b8

Branch: refs/heads/master
Commit: 0598a2b81d1426dd2cf9e6fc32cef345364d18c6
Parents: 6df1e55
Author: Nong Li <no...@databricks.com>
Authored: Fri Feb 26 12:43:50 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Feb 26 12:43:50 2016 -0800

----------------------------------------------------------------------
 .../parquet/UnsafeRowParquetRecordReader.java   | 17 +----
 .../parquet/VectorizedRleValuesReader.java      | 66 ++++++++++++--------
 .../parquet/ParquetReadBenchmark.scala          | 30 ++++-----
 3 files changed, 59 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0598a2b8/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 4576ac2..9d50cfa 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -628,7 +628,8 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
             dictionaryIds.reserve(total);
           }
           // Read and decode dictionary ids.
-          readIntBatch(rowId, num, dictionaryIds);
+          defColumn.readIntegers(
+              num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
           decodeDictionaryIds(rowId, num, column);
         } else {
           switch (descriptor.getType()) {
@@ -739,18 +740,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
         default:
           throw new NotImplementedException("Unsupported type: " + descriptor.getType());
       }
-
-      if (dictionaryIds.numNulls() > 0) {
-        // Copy the NULLs over.
-        // TODO: we can improve this by decoding the NULLs directly into column. This would
-        // mean we decode the int ids into `dictionaryIds` and the NULLs into `column` and then
-        // just do the ID remapping as above.
-        for (int i = 0; i < num; ++i) {
-          if (dictionaryIds.getIsNull(rowId + i)) {
-            column.putNull(rowId + i);
-          }
-        }
-      }
     }
 
     /**
@@ -769,7 +758,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       // TODO: implement remaining type conversions
       if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType) {
         defColumn.readIntegers(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, 0);
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
       } else if (column.dataType() == DataTypes.ByteType) {
         defColumn.readBytes(
             num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);

http://git-wip-us.apache.org/repos/asf/spark/blob/0598a2b8/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 629959a..b2048c0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
@@ -176,11 +175,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
    *  if (this.readInt() == level) {
    *    c[rowId] = data.readInteger();
    *  } else {
-   *    c[rowId] = nullValue;
+   *    c[rowId] = null;
    *  }
    */
   public void readIntegers(int total, ColumnVector c, int rowId, int level,
-      VectorizedValuesReader data, int nullValue) {
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -189,7 +188,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case RLE:
           if (currentValue == level) {
             data.readIntegers(n, c, rowId);
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -198,9 +196,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putInt(rowId + i, data.readInteger());
-              c.putNotNull(rowId + i);
             } else {
-              c.putInt(rowId + i, nullValue);
               c.putNull(rowId + i);
             }
           }
@@ -223,7 +219,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case RLE:
           if (currentValue == level) {
             data.readBooleans(n, c, rowId);
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -232,7 +227,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putBoolean(rowId + i, data.readBoolean());
-              c.putNotNull(rowId + i);
             } else {
               c.putNull(rowId + i);
             }
@@ -257,7 +251,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
             for (int i = 0; i < n; i++) {
               c.putLong(rowId + i, data.readInteger());
             }
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -266,7 +259,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putLong(rowId + i, data.readInteger());
-              c.putNotNull(rowId + i);
             } else {
               c.putNull(rowId + i);
             }
@@ -289,7 +281,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case RLE:
           if (currentValue == level) {
             data.readBytes(n, c, rowId);
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -298,7 +289,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putByte(rowId + i, data.readByte());
-              c.putNotNull(rowId + i);
             } else {
               c.putNull(rowId + i);
             }
@@ -321,7 +311,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case RLE:
           if (currentValue == level) {
             data.readLongs(n, c, rowId);
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -330,7 +319,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putLong(rowId + i, data.readLong());
-              c.putNotNull(rowId + i);
             } else {
               c.putNull(rowId + i);
             }
@@ -353,7 +341,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case RLE:
           if (currentValue == level) {
             data.readFloats(n, c, rowId);
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -362,7 +349,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putFloat(rowId + i, data.readFloat());
-              c.putNotNull(rowId + i);
             } else {
               c.putNull(rowId + i);
             }
@@ -385,7 +371,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case RLE:
           if (currentValue == level) {
             data.readDoubles(n, c, rowId);
-            c.putNotNulls(rowId, n);
           } else {
             c.putNulls(rowId, n);
           }
@@ -394,7 +379,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
               c.putDouble(rowId + i, data.readDouble());
-              c.putNotNull(rowId + i);
             } else {
               c.putNull(rowId + i);
             }
@@ -416,7 +400,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
       switch (mode) {
         case RLE:
           if (currentValue == level) {
-            c.putNotNulls(rowId, n);
             data.readBinary(n, c, rowId);
           } else {
             c.putNulls(rowId, n);
@@ -425,7 +408,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
-              c.putNotNull(rowId + i);
               data.readBinary(1, c, rowId + i);
             } else {
               c.putNull(rowId + i);
@@ -439,6 +421,40 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
+  /**
+   * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
+   * populated into `nulls`.
+   */
+  public void readIntegers(int total, ColumnVector values, ColumnVector nulls, int rowId, int level,
+                           VectorizedValuesReader data) {
+    int left = total;
+    while (left > 0) {
+      if (this.currentCount == 0) this.readNextGroup();
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == level) {
+            data.readIntegers(n, values, rowId);
+          } else {
+            nulls.putNulls(rowId, n);
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == level) {
+              values.putInt(rowId + i, data.readInteger());
+            } else {
+              nulls.putNull(rowId + i);
+            }
+          }
+          break;
+      }
+      rowId += n;
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
 
   // The RLE reader implements the vectorized decoding interface when used to decode dictionary
   // IDs. This is different than the above APIs that decodes definitions levels along with values.
@@ -560,12 +576,14 @@ public final class VectorizedRleValuesReader extends ValuesReader
     throw new RuntimeException("Unreachable");
   }
 
+  private int ceil8(int value) {
+    return (value + 7) / 8;
+  }
+
   /**
    * Reads the next group.
    */
   private void readNextGroup()  {
-    Preconditions.checkArgument(this.offset < this.end,
-        "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
     int header = readUnsignedVarInt();
     this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
     switch (mode) {
@@ -576,14 +594,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
       case PACKED:
         int numGroups = header >>> 1;
         this.currentCount = numGroups * 8;
+        int bytesToRead = ceil8(this.currentCount * this.bitWidth);
 
         if (this.currentBuffer.length < this.currentCount) {
           this.currentBuffer = new int[this.currentCount];
         }
         currentBufferIdx = 0;
-        int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
-
-        bytesToRead = Math.min(bytesToRead, this.end - this.offset);
         int valueIndex = 0;
         for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
           this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);

http://git-wip-us.apache.org/repos/asf/spark/blob/0598a2b8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index dafc589..660f0f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -150,21 +150,21 @@ object ParquetReadBenchmark {
 
         /*
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
-        SQL Single Int Column Scan:        Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-        -------------------------------------------------------------------------------
-        SQL Parquet Reader                      1350.56            11.65         1.00 X
-        SQL Parquet MR                          1844.09             8.53         0.73 X
-        SQL Parquet Vectorized                  1062.04            14.81         1.27 X
+        SQL Single Int Column Scan:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        -------------------------------------------------------------------------------------------
+        SQL Parquet Reader                       1042 / 1208         15.1          66.2       1.0X
+        SQL Parquet MR                           1544 / 1607         10.2          98.2       0.7X
+        SQL Parquet Vectorized                    674 /  739         23.3          42.9       1.5X
         */
         sqlBenchmark.run()
 
         /*
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
-        Parquet Reader Single Int Column Scan:     Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-        -------------------------------------------------------------------------------
-        ParquetReader                            610.40            25.77         1.00 X
-        ParquetReader(Batched)                   172.66            91.10         3.54 X
-        ParquetReader(Batch -> Row)              192.28            81.80         3.17 X
+        Parquet Reader Single Int Column    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        -------------------------------------------------------------------------------------------
+        ParquetReader                             565 /  609         27.8          35.9       1.0X
+        ParquetReader(Batched)                    165 /  174         95.3          10.5       3.4X
+        ParquetReader(Batch -> Row)               158 /  188         99.3          10.1       3.6X
         */
         parquetReaderBenchmark.run()
       }
@@ -218,12 +218,12 @@ object ParquetReadBenchmark {
 
         /*
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
-        Int and String Scan:               Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+        Int and String Scan:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------
-        SQL Parquet Reader                      1737.94             6.03         1.00 X
-        SQL Parquet MR                          2393.08             4.38         0.73 X
-        SQL Parquet Vectorized                  1442.99             7.27         1.20 X
-        ParquetReader                           1032.11            10.16         1.68 X
+        SQL Parquet Reader                       1381 / 1679          7.6         131.7       1.0X
+        SQL Parquet MR                           2005 / 2177          5.2         191.2       0.7X
+        SQL Parquet Vectorized                    919 / 1044         11.4          87.6       1.5X
+        ParquetReader                            1035 / 1163         10.1          98.7       1.3X
         */
         benchmark.run()
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org