You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/31 07:13:26 UTC
spark git commit: [SPARK-23272][SQL] add calendar interval type
support to ColumnVector
Repository: spark
Updated Branches:
refs/heads/master 8c6a9c90a -> 695f7146b
[SPARK-23272][SQL] add calendar interval type support to ColumnVector
## What changes were proposed in this pull request?
`ColumnVector` is aimed to support all the data types, but `CalendarIntervalType` is missing. Actually we do support interval type for inner fields, e.g. `ColumnarRow`, `ColumnarArray` both support interval type. It's weird if we don't support interval type at the top level.
This PR adds the interval type support.
This PR also makes `ColumnVector.getChild` protect. We need it public because `MutableColumnaRow.getInterval` needs it. Now the interval implementation is in `ColumnVector.getInterval`.
## How was this patch tested?
a new test.
Author: Wenchen Fan <we...@databricks.com>
Closes #20438 from cloud-fan/interval.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695f7146
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695f7146
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695f7146
Branch: refs/heads/master
Commit: 695f7146bca342a0ee192d8c7f5ec48d4d8577a8
Parents: 8c6a9c9
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Jan 31 15:13:15 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jan 31 15:13:15 2018 +0800
----------------------------------------------------------------------
.../vectorized/MutableColumnarRow.java | 4 +-
.../spark/sql/vectorized/ArrowColumnVector.java | 2 +-
.../spark/sql/vectorized/ColumnVector.java | 26 ++++++++++-
.../spark/sql/vectorized/ColumnarArray.java | 4 +-
.../spark/sql/vectorized/ColumnarRow.java | 4 +-
.../vectorized/ColumnarBatchSuite.scala | 45 ++++++++++++++++++--
6 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 2bab095..66668f3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -146,9 +146,7 @@ public final class MutableColumnarRow extends InternalRow {
@Override
public CalendarInterval getInterval(int ordinal) {
if (columns[ordinal].isNullAt(rowId)) return null;
- final int months = columns[ordinal].getChild(0).getInt(rowId);
- final long microseconds = columns[ordinal].getChild(1).getLong(rowId);
- return new CalendarInterval(months, microseconds);
+ return columns[ordinal].getInterval(rowId);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 9803c3d..a75d76b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;
/**
- * A column vector backed by Apache Arrow. Currently time interval type and map type are not
+ * A column vector backed by Apache Arrow. Currently calendar interval type and map type are not
* supported.
*/
@InterfaceStability.Evolving
http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 4b955ce..111f5d9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -20,6 +20,7 @@ import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
/**
@@ -195,6 +196,7 @@ public abstract class ColumnVector implements AutoCloseable {
* struct field.
*/
public final ColumnarRow getStruct(int rowId) {
+ if (isNullAt(rowId)) return null;
return new ColumnarRow(this, rowId);
}
@@ -236,9 +238,29 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract byte[] getBinary(int rowId);
/**
- * Returns the ordinal's child column vector.
+ * Returns the calendar interval type value for rowId.
+ *
+ * In Spark, calendar interval type value is basically an integer value representing the number of
+ * months in this interval, and a long value representing the number of microseconds in this
+ * interval. An interval type vector is the same as a struct type vector with 2 fields: `months`
+ * and `microseconds`.
+ *
+ * To support interval type, implementations must implement {@link #getChild(int)} and define 2
+ * child vectors: the first child vector is an int type vector, containing all the month values of
+ * all the interval values in this vector. The second child vector is a long type vector,
+ * containing all the microsecond values of all the interval values in this vector.
+ */
+ public final CalendarInterval getInterval(int rowId) {
+ if (isNullAt(rowId)) return null;
+ final int months = getChild(0).getInt(rowId);
+ final long microseconds = getChild(1).getLong(rowId);
+ return new CalendarInterval(months, microseconds);
+ }
+
+ /**
+ * @return child [[ColumnVector]] at the given ordinal.
*/
- public abstract ColumnVector getChild(int ordinal);
+ protected abstract ColumnVector getChild(int ordinal);
/**
* Data type for this column.
http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index 0d2c3ec..72c07ee 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -135,9 +135,7 @@ public final class ColumnarArray extends ArrayData {
@Override
public CalendarInterval getInterval(int ordinal) {
- int month = data.getChild(0).getInt(offset + ordinal);
- long microseconds = data.getChild(1).getLong(offset + ordinal);
- return new CalendarInterval(month, microseconds);
+ return data.getInterval(offset + ordinal);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index 25db7e0..6ca749d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -139,9 +139,7 @@ public final class ColumnarRow extends InternalRow {
@Override
public CalendarInterval getInterval(int ordinal) {
if (data.getChild(ordinal).isNullAt(rowId)) return null;
- final int months = data.getChild(ordinal).getChild(0).getInt(rowId);
- final long microseconds = data.getChild(ordinal).getChild(1).getLong(rowId);
- return new CalendarInterval(months, microseconds);
+ return data.getChild(ordinal).getInterval(rowId);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/695f7146/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 1873c24..925c101 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -620,6 +620,39 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 0)
}
+ testVector("CalendarInterval APIs", 4, CalendarIntervalType) {
+ column =>
+ val reference = mutable.ArrayBuffer.empty[CalendarInterval]
+
+ val months = column.getChild(0)
+ val microseconds = column.getChild(1)
+ assert(months.dataType() == IntegerType)
+ assert(microseconds.dataType() == LongType)
+
+ months.putInt(0, 1)
+ microseconds.putLong(0, 100)
+ reference += new CalendarInterval(1, 100)
+
+ months.putInt(1, 0)
+ microseconds.putLong(1, 2000)
+ reference += new CalendarInterval(0, 2000)
+
+ column.putNull(2)
+ reference += null
+
+ months.putInt(3, 20)
+ microseconds.putLong(3, 0)
+ reference += new CalendarInterval(20, 0)
+
+ reference.zipWithIndex.foreach { case (v, i) =>
+ val errMsg = "VectorType=" + column.getClass.getSimpleName
+ assert(v == column.getInterval(i), errMsg)
+ if (v == null) assert(column.isNullAt(i), errMsg)
+ }
+
+ column.close()
+ }
+
testVector("Int Array", 10, new ArrayType(IntegerType, true)) {
column =>
@@ -739,14 +772,20 @@ class ColumnarBatchSuite extends SparkFunSuite {
c1.putInt(0, 123)
c2.putDouble(0, 3.45)
- c1.putInt(1, 456)
- c2.putDouble(1, 5.67)
+
+ column.putNull(1)
+
+ c1.putInt(2, 456)
+ c2.putDouble(2, 5.67)
val s = column.getStruct(0)
assert(s.getInt(0) == 123)
assert(s.getDouble(1) == 3.45)
- val s2 = column.getStruct(1)
+ assert(column.isNullAt(1))
+ assert(column.getStruct(1) == null)
+
+ val s2 = column.getStruct(2)
assert(s2.getInt(0) == 456)
assert(s2.getDouble(1) == 5.67)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org