You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/11/27 05:49:13 UTC
spark git commit: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Repository: spark
Updated Branches:
refs/heads/master d49d9e403 -> 5a02e3a2a
[SPARK-22602][SQL] remove ColumnVector#loadBytes
## What changes were proposed in this pull request?
`ColumnVector#loadBytes` is only used as an optimization for reading UTF8String in `WritableColumnVector`, this PR moves this optimization to `WritableColumnVector` and simplified it.
## How was this patch tested?
existing test
Author: Wenchen Fan <we...@databricks.com>
Closes #19815 from cloud-fan/load-bytes.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a02e3a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a02e3a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a02e3a2
Branch: refs/heads/master
Commit: 5a02e3a2ac8a25d92d98d3b3b0d1173dddb9cc91
Parents: d49d9e4
Author: Wenchen Fan <we...@databricks.com>
Authored: Sun Nov 26 21:49:09 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Sun Nov 26 21:49:09 2017 -0800
----------------------------------------------------------------------
.../execution/vectorized/ArrowColumnVector.java | 5 ----
.../sql/execution/vectorized/ColumnVector.java | 8 ++----
.../sql/execution/vectorized/ColumnarArray.java | 8 ------
.../vectorized/OffHeapColumnVector.java | 23 ++++++----------
.../vectorized/OnHeapColumnVector.java | 12 ++++----
.../vectorized/WritableColumnVector.java | 29 +++++---------------
6 files changed, 24 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
index 3a10e98..5c502c9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -240,11 +240,6 @@ public final class ArrowColumnVector extends ColumnVector {
return accessor.getArrayOffset(rowId);
}
- @Override
- public void loadBytes(ColumnarArray array) {
- throw new UnsupportedOperationException();
- }
-
//
// APIs dealing with Decimals
//
http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 360ed83e..940457f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -181,11 +181,6 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
- * Loads the data into array.byteArray.
- */
- public abstract void loadBytes(ColumnarArray array);
-
- /**
* Returns the value for rowId.
*/
public MapData getMap(int ordinal) {
@@ -198,7 +193,8 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract Decimal getDecimal(int rowId, int precision, int scale);
/**
- * Returns the UTF8String for rowId.
+ * Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of
+ * this column vector, please copy it if you want to keep it after this column vector is freed.
*/
public abstract UTF8String getUTF8String(int rowId);
http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
index 34bde3e..b9da641 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
@@ -33,14 +33,6 @@ public final class ColumnarArray extends ArrayData {
public int length;
public int offset;
- // Populate if binary data is required for the Array. This is stored here as an optimization
- // for string data.
- public byte[] byteArray;
- public int byteArrayOffset;
-
- // Reused staging buffer, used for loading from offheap.
- protected byte[] tmpByteArray = new byte[1];
-
ColumnarArray(ColumnVector data) {
this.data = data;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 6b5c783..1cbaf08 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
/**
* Column data backed using offheap memory.
@@ -75,16 +76,14 @@ public final class OffHeapColumnVector extends WritableColumnVector {
reset();
}
+ /**
+ * Returns the off heap pointer for the values buffer.
+ */
@VisibleForTesting
public long valuesNativeAddress() {
return data;
}
- @VisibleForTesting
- public long nullsNativeAddress() {
- return nulls;
- }
-
@Override
public void close() {
super.close();
@@ -207,6 +206,11 @@ public final class OffHeapColumnVector extends WritableColumnVector {
return array;
}
+ @Override
+ protected UTF8String getBytesAsUTF8String(int rowId, int count) {
+ return UTF8String.fromAddress(null, data + rowId, count);
+ }
+
//
// APIs dealing with shorts
//
@@ -524,15 +528,6 @@ public final class OffHeapColumnVector extends WritableColumnVector {
return result;
}
- @Override
- public void loadBytes(ColumnarArray array) {
- if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
- Platform.copyMemory(
- null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length);
- array.byteArray = array.tmpByteArray;
- array.byteArrayOffset = 0;
- }
-
// Split out the slow path.
@Override
protected void reserveInternal(int newCapacity) {
http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index a7b103a..85d7229 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
/**
* A column backed by an in memory JVM array. This stores the NULLs as a byte per value
@@ -203,6 +204,11 @@ public final class OnHeapColumnVector extends WritableColumnVector {
return array;
}
+ @Override
+ protected UTF8String getBytesAsUTF8String(int rowId, int count) {
+ return UTF8String.fromBytes(byteData, rowId, count);
+ }
+
//
// APIs dealing with Shorts
//
@@ -484,12 +490,6 @@ public final class OnHeapColumnVector extends WritableColumnVector {
arrayLengths[rowId] = length;
}
- @Override
- public void loadBytes(ColumnarArray array) {
- array.byteArray = byteData;
- array.byteArrayOffset = array.offset;
- }
-
//
// APIs dealing with Byte Arrays
//
http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 96cfeed..e7653f0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -280,18 +280,6 @@ public abstract class WritableColumnVector extends ColumnVector {
return putByteArray(rowId, value, 0, value.length);
}
- /**
- * Returns the value for rowId.
- */
- private ColumnarArray getByteArray(int rowId) {
- ColumnarArray array = getArray(rowId);
- array.data.loadBytes(array);
- return array;
- }
-
- /**
- * Returns the decimal for rowId.
- */
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
@@ -318,14 +306,10 @@ public abstract class WritableColumnVector extends ColumnVector {
}
}
- /**
- * Returns the UTF8String for rowId.
- */
@Override
public UTF8String getUTF8String(int rowId) {
if (dictionary == null) {
- ColumnarArray a = getByteArray(rowId);
- return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
+ return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId));
} else {
byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(bytes);
@@ -333,15 +317,16 @@ public abstract class WritableColumnVector extends ColumnVector {
}
/**
- * Returns the byte array for rowId.
+ * Gets the values of bytes from [rowId, rowId + count), as a UTF8String.
+ * This method is similar to {@link ColumnVector#getBytes(int, int)}, but can save data copy as
+ * UTF8String is used as a pointer.
*/
+ protected abstract UTF8String getBytesAsUTF8String(int rowId, int count);
+
@Override
public byte[] getBinary(int rowId) {
if (dictionary == null) {
- ColumnarArray array = getByteArray(rowId);
- byte[] bytes = new byte[array.length];
- System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
- return bytes;
+ return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId));
} else {
return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org