You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/31 07:13:26 UTC

spark git commit: [SPARK-23272][SQL] add calendar interval type support to ColumnVector

Repository: spark
Updated Branches:
  refs/heads/master 8c6a9c90a -> 695f7146b


[SPARK-23272][SQL] add calendar interval type support to ColumnVector

## What changes were proposed in this pull request?

`ColumnVector` is aimed to support all the data types, but `CalendarIntervalType` is missing. Actually we do support interval type for inner fields, e.g. `ColumnarRow`, `ColumnarArray` both support interval type. It's weird if we don't support interval type at the top level.

This PR adds the interval type support.

This PR also makes `ColumnVector.getChild` protect. We need it public because `MutableColumnaRow.getInterval` needs it. Now the interval implementation is in `ColumnVector.getInterval`.

## How was this patch tested?

a new test.

Author: Wenchen Fan <we...@databricks.com>

Closes #20438 from cloud-fan/interval.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695f7146
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695f7146
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695f7146

Branch: refs/heads/master
Commit: 695f7146bca342a0ee192d8c7f5ec48d4d8577a8
Parents: 8c6a9c9
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Jan 31 15:13:15 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jan 31 15:13:15 2018 +0800

----------------------------------------------------------------------
 .../vectorized/MutableColumnarRow.java          |  4 +-
 .../spark/sql/vectorized/ArrowColumnVector.java |  2 +-
 .../spark/sql/vectorized/ColumnVector.java      | 26 ++++++++++-
 .../spark/sql/vectorized/ColumnarArray.java     |  4 +-
 .../spark/sql/vectorized/ColumnarRow.java       |  4 +-
 .../vectorized/ColumnarBatchSuite.scala         | 45 ++++++++++++++++++--
 6 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 2bab095..66668f3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -146,9 +146,7 @@ public final class MutableColumnarRow extends InternalRow {
   @Override
   public CalendarInterval getInterval(int ordinal) {
     if (columns[ordinal].isNullAt(rowId)) return null;
-    final int months = columns[ordinal].getChild(0).getInt(rowId);
-    final long microseconds = columns[ordinal].getChild(1).getLong(rowId);
-    return new CalendarInterval(months, microseconds);
+    return columns[ordinal].getInterval(rowId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 9803c3d..a75d76b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * A column vector backed by Apache Arrow. Currently time interval type and map type are not
+ * A column vector backed by Apache Arrow. Currently calendar interval type and map type are not
  * supported.
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 4b955ce..111f5d9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -20,6 +20,7 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -195,6 +196,7 @@ public abstract class ColumnVector implements AutoCloseable {
    * struct field.
    */
   public final ColumnarRow getStruct(int rowId) {
+    if (isNullAt(rowId)) return null;
     return new ColumnarRow(this, rowId);
   }
 
@@ -236,9 +238,29 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract byte[] getBinary(int rowId);
 
   /**
-   * Returns the ordinal's child column vector.
+   * Returns the calendar interval type value for rowId.
+   *
+   * In Spark, calendar interval type value is basically an integer value representing the number of
+   * months in this interval, and a long value representing the number of microseconds in this
+   * interval. An interval type vector is the same as a struct type vector with 2 fields: `months`
+   * and `microseconds`.
+   *
+   * To support interval type, implementations must implement {@link #getChild(int)} and define 2
+   * child vectors: the first child vector is an int type vector, containing all the month values of
+   * all the interval values in this vector. The second child vector is a long type vector,
+   * containing all the microsecond values of all the interval values in this vector.
+   */
+  public final CalendarInterval getInterval(int rowId) {
+    if (isNullAt(rowId)) return null;
+    final int months = getChild(0).getInt(rowId);
+    final long microseconds = getChild(1).getLong(rowId);
+    return new CalendarInterval(months, microseconds);
+  }
+
+  /**
+   * @return child [[ColumnVector]] at the given ordinal.
    */
-  public abstract ColumnVector getChild(int ordinal);
+  protected abstract ColumnVector getChild(int ordinal);
 
   /**
    * Data type for this column.

http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index 0d2c3ec..72c07ee 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -135,9 +135,7 @@ public final class ColumnarArray extends ArrayData {
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-    int month = data.getChild(0).getInt(offset + ordinal);
-    long microseconds = data.getChild(1).getLong(offset + ordinal);
-    return new CalendarInterval(month, microseconds);
+    return data.getInterval(offset + ordinal);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index 25db7e0..6ca749d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -139,9 +139,7 @@ public final class ColumnarRow extends InternalRow {
   @Override
   public CalendarInterval getInterval(int ordinal) {
     if (data.getChild(ordinal).isNullAt(rowId)) return null;
-    final int months = data.getChild(ordinal).getChild(0).getInt(rowId);
-    final long microseconds = data.getChild(ordinal).getChild(1).getLong(rowId);
-    return new CalendarInterval(months, microseconds);
+    return data.getChild(ordinal).getInterval(rowId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 1873c24..925c101 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -620,6 +620,39 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.arrayData().elementsAppended == 0)
   }
 
+  testVector("CalendarInterval APIs", 4, CalendarIntervalType) {
+    column =>
+      val reference = mutable.ArrayBuffer.empty[CalendarInterval]
+
+      val months = column.getChild(0)
+      val microseconds = column.getChild(1)
+      assert(months.dataType() == IntegerType)
+      assert(microseconds.dataType() == LongType)
+
+      months.putInt(0, 1)
+      microseconds.putLong(0, 100)
+      reference += new CalendarInterval(1, 100)
+
+      months.putInt(1, 0)
+      microseconds.putLong(1, 2000)
+      reference += new CalendarInterval(0, 2000)
+
+      column.putNull(2)
+      reference += null
+
+      months.putInt(3, 20)
+      microseconds.putLong(3, 0)
+      reference += new CalendarInterval(20, 0)
+
+      reference.zipWithIndex.foreach { case (v, i) =>
+        val errMsg = "VectorType=" + column.getClass.getSimpleName
+        assert(v == column.getInterval(i), errMsg)
+        if (v == null) assert(column.isNullAt(i), errMsg)
+      }
+
+      column.close()
+  }
+
   testVector("Int Array", 10, new ArrayType(IntegerType, true)) {
     column =>
 
@@ -739,14 +772,20 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       c1.putInt(0, 123)
       c2.putDouble(0, 3.45)
-      c1.putInt(1, 456)
-      c2.putDouble(1, 5.67)
+
+      column.putNull(1)
+
+      c1.putInt(2, 456)
+      c2.putDouble(2, 5.67)
 
       val s = column.getStruct(0)
       assert(s.getInt(0) == 123)
       assert(s.getDouble(1) == 3.45)
 
-      val s2 = column.getStruct(1)
+      assert(column.isNullAt(1))
+      assert(column.getStruct(1) == null)
+
+      val s2 = column.getStruct(2)
       assert(s2.getInt(0) == 456)
       assert(s2.getDouble(1) == 5.67)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org