You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/08/10 21:45:17 UTC

spark git commit: [SPARK-16928] [SQL] Recursive call of ColumnVector::getInt() breaks JIT inlining

Repository: spark
Updated Branches:
  refs/heads/master 214ba66a0 -> bf5cb8af4


[SPARK-16928] [SQL] Recursive call of ColumnVector::getInt() breaks JIT inlining

## What changes were proposed in this pull request?

In both `OnHeapColumnVector` and `OffHeapColumnVector`, we implemented `getInt()` with the following code pattern:
```
public int getInt(int rowId) {
if (dictionary == null)
{ return intData[rowId]; }
else
{ return dictionary.decodeToInt(dictionaryIds.getInt(rowId)); }
}
```
As `dictionaryIds` is also a `ColumnVector`, this results in a recursive call of `getInt()` and breaks JIT inlining. As a result, `getInt()` will not get inlined.

We fix this by adding a separate method `getDictId()` specific for `dictionaryIds` to use.

## How was this patch tested?

We tested the difference with the following aggregate query on a TPCDS dataset (with scale factor = 5):
```
select
  max(ss_sold_date_sk) as max_ss_sold_date_sk,
from store_sales
```
The query runtime is improved, from 202ms (before) to 159ms (after).

Author: Qifan Pu <qi...@gmail.com>

Closes #14513 from ooq/SPARK-16928.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf5cb8af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf5cb8af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf5cb8af

Branch: refs/heads/master
Commit: bf5cb8af4a649e0c7ac565891427484eab9ee5d9
Parents: 214ba66
Author: Qifan Pu <qi...@gmail.com>
Authored: Wed Aug 10 14:45:13 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Aug 10 14:45:13 2016 -0700

----------------------------------------------------------------------
 .../parquet/VectorizedColumnReader.java         | 22 +++++++++----------
 .../sql/execution/vectorized/ColumnVector.java  | 11 ++++++++--
 .../vectorized/OffHeapColumnVector.java         | 23 +++++++++++++++-----
 .../vectorized/OnHeapColumnVector.java          | 23 +++++++++++++++-----
 4 files changed, 54 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf5cb8af/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6c47dc0..4ed59b0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -221,15 +221,15 @@ public class VectorizedColumnReader {
         if (column.dataType() == DataTypes.IntegerType ||
             DecimalType.is32BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
+            column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
           }
         } else if (column.dataType() == DataTypes.ByteType) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+            column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
           }
         } else if (column.dataType() == DataTypes.ShortType) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+            column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
           }
         } else {
           throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -240,7 +240,7 @@ public class VectorizedColumnReader {
         if (column.dataType() == DataTypes.LongType ||
             DecimalType.is64BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
+            column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
           }
         } else {
           throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -249,20 +249,20 @@ public class VectorizedColumnReader {
 
       case FLOAT:
         for (int i = rowId; i < rowId + num; ++i) {
-          column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
+          column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
         }
         break;
 
       case DOUBLE:
         for (int i = rowId; i < rowId + num; ++i) {
-          column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
+          column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
         }
         break;
       case INT96:
         if (column.dataType() == DataTypes.TimestampType) {
           for (int i = rowId; i < rowId + num; ++i) {
             // TODO: Convert dictionary of Binaries to dictionary of Longs
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+            Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
             column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
           }
         } else {
@@ -275,7 +275,7 @@ public class VectorizedColumnReader {
         // and reuse it across batches. This should mean adding a ByteArray would just update
         // the length and offset.
         for (int i = rowId; i < rowId + num; ++i) {
-          Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+          Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
           column.putByteArray(i, v.getBytes());
         }
         break;
@@ -283,17 +283,17 @@ public class VectorizedColumnReader {
         // DecimalType written in the legacy mode
         if (DecimalType.is32BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+            Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
             column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
           }
         } else if (DecimalType.is64BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+            Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
             column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
           }
         } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+            Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
             column.putByteArray(i, v.getBytes());
           }
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5cb8af/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 59173d2..a7cb3b1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -429,6 +429,13 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract int getInt(int rowId);
 
   /**
+   * Returns the dictionary Id for rowId.
+   * This should only be called when the ColumnVector is dictionaryIds.
+   * We have this separate method for dictionaryIds as per SPARK-16928.
+   */
+  public abstract int getDictId(int rowId);
+
+  /**
    * Sets the value at rowId to `value`.
    */
   public abstract void putLong(int rowId, long value);
@@ -615,7 +622,7 @@ public abstract class ColumnVector implements AutoCloseable {
       ColumnVector.Array a = getByteArray(rowId);
       return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
     } else {
-      Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId));
+      Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
       return UTF8String.fromBytes(v.getBytes());
     }
   }
@@ -630,7 +637,7 @@ public abstract class ColumnVector implements AutoCloseable {
       System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
       return bytes;
     } else {
-      Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId));
+      Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
       return v.getBytes();
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5cb8af/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 913a05a..12fa109 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -161,7 +161,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return Platform.getByte(null, data + rowId);
     } else {
-      return (byte) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+      return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -193,7 +193,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return Platform.getShort(null, data + 2 * rowId);
     } else {
-      return (short) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+      return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -240,10 +240,21 @@ public final class OffHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return Platform.getInt(null, data + 4 * rowId);
     } else {
-      return dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
   }
 
+  /**
+   * Returns the dictionary Id for rowId.
+   * This should only be called when the ColumnVector is dictionaryIds.
+   * We have this separate method for dictionaryIds as per SPARK-16928.
+   */
+  public int getDictId(int rowId) {
+    assert(dictionary == null)
+            : "A ColumnVector dictionary should not have a dictionary for itself.";
+    return Platform.getInt(null, data + 4 * rowId);
+  }
+
   //
   // APIs dealing with Longs
   //
@@ -287,7 +298,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return Platform.getLong(null, data + 8 * rowId);
     } else {
-      return dictionary.decodeToLong(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -333,7 +344,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return Platform.getFloat(null, data + rowId * 4);
     } else {
-      return dictionary.decodeToFloat(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -380,7 +391,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return Platform.getDouble(null, data + rowId * 8);
     } else {
-      return dictionary.decodeToDouble(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5cb8af/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 85067df..9b410ba 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -158,7 +158,7 @@ public final class OnHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return byteData[rowId];
     } else {
-      return (byte) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+      return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -188,7 +188,7 @@ public final class OnHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return shortData[rowId];
     } else {
-      return (short) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+      return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -230,10 +230,21 @@ public final class OnHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return intData[rowId];
     } else {
-      return dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
     }
   }
 
+  /**
+   * Returns the dictionary Id for rowId.
+   * This should only be called when the ColumnVector is dictionaryIds.
+   * We have this separate method for dictionaryIds as per SPARK-16928.
+   */
+  public int getDictId(int rowId) {
+    assert(dictionary == null)
+            : "A ColumnVector dictionary should not have a dictionary for itself.";
+    return intData[rowId];
+  }
+
   //
   // APIs dealing with Longs
   //
@@ -271,7 +282,7 @@ public final class OnHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return longData[rowId];
     } else {
-      return dictionary.decodeToLong(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -310,7 +321,7 @@ public final class OnHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return floatData[rowId];
     } else {
-      return dictionary.decodeToFloat(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
     }
   }
 
@@ -351,7 +362,7 @@ public final class OnHeapColumnVector extends ColumnVector {
     if (dictionary == null) {
       return doubleData[rowId];
     } else {
-      return dictionary.decodeToDouble(dictionaryIds.getInt(rowId));
+      return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org