You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/07 05:09:38 UTC
spark git commit: [SPARK-21217][SQL] Support
ColumnVector.Array.toArray()
Repository: spark
Updated Branches:
refs/heads/master 53c2eb59b -> c09b31eb8
[SPARK-21217][SQL] Support ColumnVector.Array.to<type>Array()
## What changes were proposed in this pull request?
This PR implements bulk-copy for `ColumnVector.Array.to<type>Array()` methods (e.g. `toIntArray()`) in `ColumnVector.Array` by using `System.arrayCopy()` or `Platform.copyMemory()`.
Before this PR, when one of these method is called, the generic method in `ArrayData` is called. It is not fast since element-wise copy is performed.
This PR can improve performance of a benchmark program by 1.9x and 3.2x.
Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Int Array Best/Avg Time(ms) Rate(M/s) Per Row(ns)
------------------------------------------------------------------------------------------------
ON_HEAP 586 / 628 14.3 69.9
OFF_HEAP 893 / 902 9.4 106.5
```
With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Int Array Best/Avg Time(ms) Rate(M/s) Per Row(ns)
------------------------------------------------------------------------------------------------
ON_HEAP 306 / 331 27.4 36.4
OFF_HEAP 282 / 287 29.8 33.6
```
Source program
```
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val len = 8 * 1024 * 1024
val column = ColumnVector.allocate(len * 2, new ArrayType(IntegerType, false), memMode)
val data = column.arrayData
var i = 0
while (i < len) {
data.putInt(i, i)
i += 1
}
column.putArray(0, 0, len)
val benchmark = new Benchmark("Int Array", len, minNumIters = 20)
benchmark.addCase(s"$memMode") { iter =>
var i = 0
while (i < 50) {
column.getArray(0).toIntArray
i += 1
}
}
benchmark.run
}}
```
## How was this patch tested?
Added test suite
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #18425 from kiszk/SPARK-21217.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c09b31eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c09b31eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c09b31eb
Branch: refs/heads/master
Commit: c09b31eb8fa83d5463a045c9278f5874ae505a8e
Parents: 53c2eb5
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Fri Jul 7 13:09:32 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jul 7 13:09:32 2017 +0800
----------------------------------------------------------------------
.../sql/execution/vectorized/ColumnVector.java | 56 +++++++++++++++++++
.../vectorized/OffHeapColumnVector.java | 58 ++++++++++++++++++++
.../vectorized/OnHeapColumnVector.java | 58 ++++++++++++++++++++
.../vectorized/ColumnarBatchSuite.scala | 49 +++++++++++++++++
4 files changed, 221 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c09b31eb/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 24260a6..0c027f8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -100,6 +100,27 @@ public abstract class ColumnVector implements AutoCloseable {
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean[] toBooleanArray() { return data.getBooleans(offset, length); }
+
+ @Override
+ public byte[] toByteArray() { return data.getBytes(offset, length); }
+
+ @Override
+ public short[] toShortArray() { return data.getShorts(offset, length); }
+
+ @Override
+ public int[] toIntArray() { return data.getInts(offset, length); }
+
+ @Override
+ public long[] toLongArray() { return data.getLongs(offset, length); }
+
+ @Override
+ public float[] toFloatArray() { return data.getFloats(offset, length); }
+
+ @Override
+ public double[] toDoubleArray() { return data.getDoubles(offset, length); }
+
// TODO: this is extremely expensive.
@Override
public Object[] array() {
@@ -367,6 +388,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract boolean getBoolean(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract boolean[] getBooleans(int rowId, int count);
+
+ /**
* Sets the value at rowId to `value`.
*/
public abstract void putByte(int rowId, byte value);
@@ -387,6 +413,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract byte getByte(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract byte[] getBytes(int rowId, int count);
+
+ /**
* Sets the value at rowId to `value`.
*/
public abstract void putShort(int rowId, short value);
@@ -407,6 +438,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract short getShort(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract short[] getShorts(int rowId, int count);
+
+ /**
* Sets the value at rowId to `value`.
*/
public abstract void putInt(int rowId, int value);
@@ -433,6 +469,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract int getInt(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract int[] getInts(int rowId, int count);
+
+ /**
* Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds.
* We have this separate method for dictionaryIds as per SPARK-16928.
@@ -466,6 +507,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract long getLong(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract long[] getLongs(int rowId, int count);
+
+ /**
* Sets the value at rowId to `value`.
*/
public abstract void putFloat(int rowId, float value);
@@ -492,6 +538,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract float getFloat(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract float[] getFloats(int rowId, int count);
+
+ /**
* Sets the value at rowId to `value`.
*/
public abstract void putDouble(int rowId, double value);
@@ -518,6 +569,11 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract double getDouble(int rowId);
/**
+ * Gets values from [rowId, rowId + count)
+ */
+ public abstract double[] getDoubles(int rowId, int count);
+
+ /**
* Puts a byte array that already exists in this column.
*/
public abstract void putArray(int rowId, int offset, int length);
http://git-wip-us.apache.org/repos/asf/spark/blob/c09b31eb/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index a7d3744..2d1f3da 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -134,6 +134,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }
+ @Override
+ public boolean[] getBooleans(int rowId, int count) {
+ assert(dictionary == null);
+ boolean[] array = new boolean[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = (Platform.getByte(null, data + rowId + i) == 1);
+ }
+ return array;
+ }
+
//
// APIs dealing with Bytes
//
@@ -165,6 +175,14 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public byte[] getBytes(int rowId, int count) {
+ assert(dictionary == null);
+ byte[] array = new byte[count];
+ Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
+ return array;
+ }
+
//
// APIs dealing with shorts
//
@@ -197,6 +215,14 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public short[] getShorts(int rowId, int count) {
+ assert(dictionary == null);
+ short[] array = new short[count];
+ Platform.copyMemory(null, data + rowId * 2, array, Platform.SHORT_ARRAY_OFFSET, count * 2);
+ return array;
+ }
+
//
// APIs dealing with ints
//
@@ -244,6 +270,14 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public int[] getInts(int rowId, int count) {
+ assert(dictionary == null);
+ int[] array = new int[count];
+ Platform.copyMemory(null, data + rowId * 4, array, Platform.INT_ARRAY_OFFSET, count * 4);
+ return array;
+ }
+
/**
* Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds.
@@ -302,6 +336,14 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public long[] getLongs(int rowId, int count) {
+ assert(dictionary == null);
+ long[] array = new long[count];
+ Platform.copyMemory(null, data + rowId * 8, array, Platform.LONG_ARRAY_OFFSET, count * 8);
+ return array;
+ }
+
//
// APIs dealing with floats
//
@@ -348,6 +390,14 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public float[] getFloats(int rowId, int count) {
+ assert(dictionary == null);
+ float[] array = new float[count];
+ Platform.copyMemory(null, data + rowId * 4, array, Platform.FLOAT_ARRAY_OFFSET, count * 4);
+ return array;
+ }
+
//
// APIs dealing with doubles
@@ -395,6 +445,14 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public double[] getDoubles(int rowId, int count) {
+ assert(dictionary == null);
+ double[] array = new double[count];
+ Platform.copyMemory(null, data + rowId * 8, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8);
+ return array;
+ }
+
//
// APIs dealing with Arrays.
//
http://git-wip-us.apache.org/repos/asf/spark/blob/c09b31eb/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 94ed322..5064343 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -130,6 +130,16 @@ public final class OnHeapColumnVector extends ColumnVector {
return byteData[rowId] == 1;
}
+ @Override
+ public boolean[] getBooleans(int rowId, int count) {
+ assert(dictionary == null);
+ boolean[] array = new boolean[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = (byteData[rowId + i] == 1);
+ }
+ return array;
+ }
+
//
//
@@ -162,6 +172,14 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public byte[] getBytes(int rowId, int count) {
+ assert(dictionary == null);
+ byte[] array = new byte[count];
+ System.arraycopy(byteData, rowId, array, 0, count);
+ return array;
+ }
+
//
// APIs dealing with Shorts
//
@@ -192,6 +210,14 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public short[] getShorts(int rowId, int count) {
+ assert(dictionary == null);
+ short[] array = new short[count];
+ System.arraycopy(shortData, rowId, array, 0, count);
+ return array;
+ }
+
//
// APIs dealing with Ints
@@ -234,6 +260,14 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public int[] getInts(int rowId, int count) {
+ assert(dictionary == null);
+ int[] array = new int[count];
+ System.arraycopy(intData, rowId, array, 0, count);
+ return array;
+ }
+
/**
* Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds.
@@ -286,6 +320,14 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public long[] getLongs(int rowId, int count) {
+ assert(dictionary == null);
+ long[] array = new long[count];
+ System.arraycopy(longData, rowId, array, 0, count);
+ return array;
+ }
+
//
// APIs dealing with floats
//
@@ -325,6 +367,14 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public float[] getFloats(int rowId, int count) {
+ assert(dictionary == null);
+ float[] array = new float[count];
+ System.arraycopy(floatData, rowId, array, 0, count);
+ return array;
+ }
+
//
// APIs dealing with doubles
//
@@ -366,6 +416,14 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
+ @Override
+ public double[] getDoubles(int rowId, int count) {
+ assert(dictionary == null);
+ double[] array = new double[count];
+ System.arraycopy(doubleData, rowId, array, 0, count);
+ return array;
+ }
+
//
// APIs dealing with Arrays
//
http://git-wip-us.apache.org/repos/asf/spark/blob/c09b31eb/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 80d4157..ccf7aa7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -709,6 +709,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}
+ test("toArray for primitive types") {
+ // (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ (MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
+ val len = 4
+
+ val columnBool = ColumnVector.allocate(len, new ArrayType(BooleanType, false), memMode)
+ val boolArray = Array(false, true, false, true)
+ boolArray.zipWithIndex.map { case (v, i) => columnBool.arrayData.putBoolean(i, v) }
+ columnBool.putArray(0, 0, len)
+ assert(columnBool.getArray(0).toBooleanArray === boolArray)
+
+ val columnByte = ColumnVector.allocate(len, new ArrayType(ByteType, false), memMode)
+ val byteArray = Array[Byte](0, 1, 2, 3)
+ byteArray.zipWithIndex.map { case (v, i) => columnByte.arrayData.putByte(i, v) }
+ columnByte.putArray(0, 0, len)
+ assert(columnByte.getArray(0).toByteArray === byteArray)
+
+ val columnShort = ColumnVector.allocate(len, new ArrayType(ShortType, false), memMode)
+ val shortArray = Array[Short](0, 1, 2, 3)
+ shortArray.zipWithIndex.map { case (v, i) => columnShort.arrayData.putShort(i, v) }
+ columnShort.putArray(0, 0, len)
+ assert(columnShort.getArray(0).toShortArray === shortArray)
+
+ val columnInt = ColumnVector.allocate(len, new ArrayType(IntegerType, false), memMode)
+ val intArray = Array(0, 1, 2, 3)
+ intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, v) }
+ columnInt.putArray(0, 0, len)
+ assert(columnInt.getArray(0).toIntArray === intArray)
+
+ val columnLong = ColumnVector.allocate(len, new ArrayType(LongType, false), memMode)
+ val longArray = Array[Long](0, 1, 2, 3)
+ longArray.zipWithIndex.map { case (v, i) => columnLong.arrayData.putLong(i, v) }
+ columnLong.putArray(0, 0, len)
+ assert(columnLong.getArray(0).toLongArray === longArray)
+
+ val columnFloat = ColumnVector.allocate(len, new ArrayType(FloatType, false), memMode)
+ val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F)
+ floatArray.zipWithIndex.map { case (v, i) => columnFloat.arrayData.putFloat(i, v) }
+ columnFloat.putArray(0, 0, len)
+ assert(columnFloat.getArray(0).toFloatArray === floatArray)
+
+ val columnDouble = ColumnVector.allocate(len, new ArrayType(DoubleType, false), memMode)
+ val doubleArray = Array(0.0, 1.1, 2.2, 3.3)
+ doubleArray.zipWithIndex.map { case (v, i) => columnDouble.arrayData.putDouble(i, v) }
+ columnDouble.putArray(0, 0, len)
+ assert(columnDouble.getArray(0).toDoubleArray === doubleArray)
+ }}
+ }
+
test("Struct Column") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org