You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/12 16:12:44 UTC
spark git commit: [SPARK-21046][SQL] simplify the array offset and
length in ColumnVector
Repository: spark
Updated Branches:
refs/heads/master a92e095e7 -> 22dd65f58
[SPARK-21046][SQL] simplify the array offset and length in ColumnVector
## What changes were proposed in this pull request?
Currently when a `ColumnVector` stores array type elements, we will use 2 arrays for lengths and offsets and implement them individually in on-heap and off-heap column vector.
In this PR, we use one array to represent both offsets and lengths, so that we can treat it as `ColumnVector` and all the logic can go to the base class `ColumnVector`
## How was this patch tested?
existing tests.
Author: Wenchen Fan <we...@databricks.com>
Closes #18260 from cloud-fan/put.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22dd65f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22dd65f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22dd65f5
Branch: refs/heads/master
Commit: 22dd65f58e12cb3a883d106fcccdff25a2a00fe8
Parents: a92e095
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Jun 13 00:12:34 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jun 13 00:12:34 2017 +0800
----------------------------------------------------------------------
.../sql/execution/vectorized/ColumnVector.java | 35 +++++++-------
.../vectorized/OffHeapColumnVector.java | 47 ++-----------------
.../vectorized/OnHeapColumnVector.java | 49 +++-----------------
.../vectorized/ColumnarBatchSuite.scala | 17 +++----
4 files changed, 38 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/22dd65f5/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 24260a6..e50799e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql.execution.vectorized;
import java.math.BigDecimal;
@@ -518,19 +519,13 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract double getDouble(int rowId);
/**
- * Puts a byte array that already exists in this column.
- */
- public abstract void putArray(int rowId, int offset, int length);
-
- /**
- * Returns the length of the array at rowid.
+ * After writing array elements to the child column vector, call this method to set the offset and
+ * size of the written array.
*/
- public abstract int getArrayLength(int rowId);
-
- /**
- * Returns the offset of the array at rowid.
- */
- public abstract int getArrayOffset(int rowId);
+ public void putArrayOffsetAndSize(int rowId, int offset, int size) {
+ long offsetAndSize = (((long) offset) << 32) | size;
+ putLong(rowId, offsetAndSize);
+ }
/**
* Returns a utility object to get structs.
@@ -553,8 +548,9 @@ public abstract class ColumnVector implements AutoCloseable {
* Returns the array at rowid.
*/
public final Array getArray(int rowId) {
- resultArray.length = getArrayLength(rowId);
- resultArray.offset = getArrayOffset(rowId);
+ long offsetAndSize = getLong(rowId);
+ resultArray.offset = (int) (offsetAndSize >> 32);
+ resultArray.length = (int) offsetAndSize;
return resultArray;
}
@@ -566,7 +562,12 @@ public abstract class ColumnVector implements AutoCloseable {
/**
* Sets the value at rowId to `value`.
*/
- public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
+ public int putByteArray(int rowId, byte[] value, int offset, int length) {
+ int result = arrayData().appendBytes(length, value, offset);
+ putArrayOffsetAndSize(rowId, result, length);
+ return result;
+ }
+
public final int putByteArray(int rowId, byte[] value) {
return putByteArray(rowId, value, 0, value.length);
}
@@ -829,13 +830,13 @@ public abstract class ColumnVector implements AutoCloseable {
public final int appendByteArray(byte[] value, int offset, int length) {
int copiedOffset = arrayData().appendBytes(length, value, offset);
reserve(elementsAppended + 1);
- putArray(elementsAppended, copiedOffset, length);
+ putArrayOffsetAndSize(elementsAppended, copiedOffset, length);
return elementsAppended++;
}
public final int appendArray(int length) {
reserve(elementsAppended + 1);
- putArray(elementsAppended, arrayData().elementsAppended, length);
+ putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length);
return elementsAppended++;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/22dd65f5/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index a7d3744..4dc4d34 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -34,19 +34,15 @@ public final class OffHeapColumnVector extends ColumnVector {
// The data stored in these two allocations need to maintain binary compatible. We can
// directly pass this buffer to external components.
private long nulls;
+ // The actually data of this column vector will be stored here. If it's an array column vector,
+ // we will store the offsets and lengths here, and store the element data in child column vector.
private long data;
- // Set iff the type is array.
- private long lengthData;
- private long offsetData;
-
protected OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.OFF_HEAP);
nulls = 0;
data = 0;
- lengthData = 0;
- offsetData = 0;
reserveInternal(capacity);
reset();
@@ -66,12 +62,8 @@ public final class OffHeapColumnVector extends ColumnVector {
public void close() {
Platform.freeMemory(nulls);
Platform.freeMemory(data);
- Platform.freeMemory(lengthData);
- Platform.freeMemory(offsetData);
nulls = 0;
data = 0;
- lengthData = 0;
- offsetData = 0;
}
//
@@ -395,35 +387,6 @@ public final class OffHeapColumnVector extends ColumnVector {
}
}
- //
- // APIs dealing with Arrays.
- //
- @Override
- public void putArray(int rowId, int offset, int length) {
- assert(offset >= 0 && offset + length <= childColumns[0].capacity);
- Platform.putInt(null, lengthData + 4 * rowId, length);
- Platform.putInt(null, offsetData + 4 * rowId, offset);
- }
-
- @Override
- public int getArrayLength(int rowId) {
- return Platform.getInt(null, lengthData + 4 * rowId);
- }
-
- @Override
- public int getArrayOffset(int rowId) {
- return Platform.getInt(null, offsetData + 4 * rowId);
- }
-
- // APIs dealing with ByteArrays
- @Override
- public int putByteArray(int rowId, byte[] value, int offset, int length) {
- int result = arrayData().appendBytes(length, value, offset);
- Platform.putInt(null, lengthData + 4 * rowId, length);
- Platform.putInt(null, offsetData + 4 * rowId, result);
- return result;
- }
-
@Override
public void loadBytes(ColumnVector.Array array) {
if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
@@ -438,10 +401,8 @@ public final class OffHeapColumnVector extends ColumnVector {
protected void reserveInternal(int newCapacity) {
int oldCapacity = (this.data == 0L) ? 0 : capacity;
if (this.resultArray != null) {
- this.lengthData =
- Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
- this.offsetData =
- Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
+ // need a long as offset and length for each array.
+ this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
} else if (type instanceof ByteType || type instanceof BooleanType) {
this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
} else if (type instanceof ShortType) {
http://git-wip-us.apache.org/repos/asf/spark/blob/22dd65f5/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 94ed322..4d23405 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -43,14 +43,12 @@ public final class OnHeapColumnVector extends ColumnVector {
private byte[] byteData;
private short[] shortData;
private int[] intData;
+ // This is not only used to store data for long column vector, but also can store offsets and
+ // lengths for array column vector.
private long[] longData;
private float[] floatData;
private double[] doubleData;
- // Only set if type is Array.
- private int[] arrayLengths;
- private int[] arrayOffsets;
-
protected OnHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.ON_HEAP);
reserveInternal(capacity);
@@ -366,55 +364,22 @@ public final class OnHeapColumnVector extends ColumnVector {
}
}
- //
- // APIs dealing with Arrays
- //
-
- @Override
- public int getArrayLength(int rowId) {
- return arrayLengths[rowId];
- }
- @Override
- public int getArrayOffset(int rowId) {
- return arrayOffsets[rowId];
- }
-
- @Override
- public void putArray(int rowId, int offset, int length) {
- arrayOffsets[rowId] = offset;
- arrayLengths[rowId] = length;
- }
-
@Override
public void loadBytes(ColumnVector.Array array) {
array.byteArray = byteData;
array.byteArrayOffset = array.offset;
}
- //
- // APIs dealing with Byte Arrays
- //
-
- @Override
- public int putByteArray(int rowId, byte[] value, int offset, int length) {
- int result = arrayData().appendBytes(length, value, offset);
- arrayOffsets[rowId] = result;
- arrayLengths[rowId] = length;
- return result;
- }
-
// Spilt this function out since it is the slow path.
@Override
protected void reserveInternal(int newCapacity) {
if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
- int[] newLengths = new int[newCapacity];
- int[] newOffsets = new int[newCapacity];
- if (this.arrayLengths != null) {
- System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
- System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
+ // need 1 long as offset and length for each array.
+ if (longData == null || longData.length < newCapacity) {
+ long[] newData = new long[newCapacity];
+ if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
+ longData = newData;
}
- arrayLengths = newLengths;
- arrayOffsets = newOffsets;
} else if (type instanceof BooleanType) {
if (byteData == null || byteData.length < newCapacity) {
byte[] newData = new byte[newCapacity];
http://git-wip-us.apache.org/repos/asf/spark/blob/22dd65f5/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index e48e3f6..5c4128a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17)
// Put the same "ll" at offset. This should not allocate more memory in the column.
- column.putArray(idx, offset, 2)
+ column.putArrayOffsetAndSize(idx, offset, 2)
reference += "ll"
idx += 1
assert(column.arrayData().elementsAppended == 17)
@@ -644,7 +644,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17 + (s + s).length)
reference.zipWithIndex.foreach { v =>
- assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
+ val offsetAndLength = column.getLong(v._2)
+ assert(v._1.length == offsetAndLength.toInt, "MemoryMode=" + memMode)
assert(v._1 == column.getUTF8String(v._2).toString,
"MemoryMode" + memMode)
}
@@ -659,7 +660,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)
// Fill the underlying data with all the arrays back to back.
- val data = column.arrayData();
+ val data = column.arrayData()
var i = 0
while (i < 6) {
data.putInt(i, i)
@@ -667,10 +668,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
- column.putArray(0, 0, 1)
- column.putArray(1, 1, 2)
- column.putArray(2, 2, 0)
- column.putArray(3, 3, 3)
+ column.putArrayOffsetAndSize(0, 0, 1)
+ column.putArrayOffsetAndSize(1, 1, 2)
+ column.putArrayOffsetAndSize(2, 3, 0)
+ column.putArrayOffsetAndSize(3, 3, 3)
val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
@@ -703,7 +704,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
data.reserve(array.length)
assert(data.capacity == array.length * 2)
data.putInts(0, array.length, array, 0)
- column.putArray(0, 0, array.length)
+ column.putArrayOffsetAndSize(0, 0, array.length)
assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
=== array)
}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org