You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/11/27 05:49:13 UTC

spark git commit: [SPARK-22602][SQL] remove ColumnVector#loadBytes

Repository: spark
Updated Branches:
  refs/heads/master d49d9e403 -> 5a02e3a2a


[SPARK-22602][SQL] remove ColumnVector#loadBytes

## What changes were proposed in this pull request?

`ColumnVector#loadBytes` is only used as an optimization for reading UTF8String in `WritableColumnVector`, this PR moves this optimization to `WritableColumnVector` and simplified it.

## How was this patch tested?

existing test

Author: Wenchen Fan <we...@databricks.com>

Closes #19815 from cloud-fan/load-bytes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a02e3a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a02e3a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a02e3a2

Branch: refs/heads/master
Commit: 5a02e3a2ac8a25d92d98d3b3b0d1173dddb9cc91
Parents: d49d9e4
Author: Wenchen Fan <we...@databricks.com>
Authored: Sun Nov 26 21:49:09 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Sun Nov 26 21:49:09 2017 -0800

----------------------------------------------------------------------
 .../execution/vectorized/ArrowColumnVector.java |  5 ----
 .../sql/execution/vectorized/ColumnVector.java  |  8 ++----
 .../sql/execution/vectorized/ColumnarArray.java |  8 ------
 .../vectorized/OffHeapColumnVector.java         | 23 ++++++----------
 .../vectorized/OnHeapColumnVector.java          | 12 ++++----
 .../vectorized/WritableColumnVector.java        | 29 +++++---------------
 6 files changed, 24 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
index 3a10e98..5c502c9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -240,11 +240,6 @@ public final class ArrowColumnVector extends ColumnVector {
     return accessor.getArrayOffset(rowId);
   }
 
-  @Override
-  public void loadBytes(ColumnarArray array) {
-    throw new UnsupportedOperationException();
-  }
-
   //
   // APIs dealing with Decimals
   //

http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 360ed83e..940457f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -181,11 +181,6 @@ public abstract class ColumnVector implements AutoCloseable {
   }
 
   /**
-   * Loads the data into array.byteArray.
-   */
-  public abstract void loadBytes(ColumnarArray array);
-
-  /**
    * Returns the value for rowId.
    */
   public MapData getMap(int ordinal) {
@@ -198,7 +193,8 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract Decimal getDecimal(int rowId, int precision, int scale);
 
   /**
-   * Returns the UTF8String for rowId.
+   * Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of
+   * this column vector, please copy it if you want to keep it after this column vector is freed.
    */
   public abstract UTF8String getUTF8String(int rowId);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
index 34bde3e..b9da641 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
@@ -33,14 +33,6 @@ public final class ColumnarArray extends ArrayData {
   public int length;
   public int offset;
 
-  // Populate if binary data is required for the Array. This is stored here as an optimization
-  // for string data.
-  public byte[] byteArray;
-  public int byteArrayOffset;
-
-  // Reused staging buffer, used for loading from offheap.
-  protected byte[] tmpByteArray = new byte[1];
-
   ColumnarArray(ColumnVector data) {
     this.data = data;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 6b5c783..1cbaf08 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * Column data backed using offheap memory.
@@ -75,16 +76,14 @@ public final class OffHeapColumnVector extends WritableColumnVector {
     reset();
   }
 
+  /**
+   * Returns the off heap pointer for the values buffer.
+   */
   @VisibleForTesting
   public long valuesNativeAddress() {
     return data;
   }
 
-  @VisibleForTesting
-  public long nullsNativeAddress() {
-    return nulls;
-  }
-
   @Override
   public void close() {
     super.close();
@@ -207,6 +206,11 @@ public final class OffHeapColumnVector extends WritableColumnVector {
     return array;
   }
 
+  @Override
+  protected UTF8String getBytesAsUTF8String(int rowId, int count) {
+    return UTF8String.fromAddress(null, data + rowId, count);
+  }
+
   //
   // APIs dealing with shorts
   //
@@ -524,15 +528,6 @@ public final class OffHeapColumnVector extends WritableColumnVector {
     return result;
   }
 
-  @Override
-  public void loadBytes(ColumnarArray array) {
-    if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
-    Platform.copyMemory(
-        null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length);
-    array.byteArray = array.tmpByteArray;
-    array.byteArrayOffset = 0;
-  }
-
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index a7b103a..85d7229 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * A column backed by an in memory JVM array. This stores the NULLs as a byte per value
@@ -203,6 +204,11 @@ public final class OnHeapColumnVector extends WritableColumnVector {
     return array;
   }
 
+  @Override
+  protected UTF8String getBytesAsUTF8String(int rowId, int count) {
+    return UTF8String.fromBytes(byteData, rowId, count);
+  }
+
   //
   // APIs dealing with Shorts
   //
@@ -484,12 +490,6 @@ public final class OnHeapColumnVector extends WritableColumnVector {
     arrayLengths[rowId] = length;
   }
 
-  @Override
-  public void loadBytes(ColumnarArray array) {
-    array.byteArray = byteData;
-    array.byteArrayOffset = array.offset;
-  }
-
   //
   // APIs dealing with Byte Arrays
   //

http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 96cfeed..e7653f0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -280,18 +280,6 @@ public abstract class WritableColumnVector extends ColumnVector {
     return putByteArray(rowId, value, 0, value.length);
   }
 
-  /**
-   * Returns the value for rowId.
-   */
-  private ColumnarArray getByteArray(int rowId) {
-    ColumnarArray array = getArray(rowId);
-    array.data.loadBytes(array);
-    return array;
-  }
-
-  /**
-   * Returns the decimal for rowId.
-   */
   @Override
   public Decimal getDecimal(int rowId, int precision, int scale) {
     if (precision <= Decimal.MAX_INT_DIGITS()) {
@@ -318,14 +306,10 @@ public abstract class WritableColumnVector extends ColumnVector {
     }
   }
 
-  /**
-   * Returns the UTF8String for rowId.
-   */
   @Override
   public UTF8String getUTF8String(int rowId) {
     if (dictionary == null) {
-      ColumnarArray a = getByteArray(rowId);
-      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
+      return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId));
     } else {
       byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
       return UTF8String.fromBytes(bytes);
@@ -333,15 +317,16 @@ public abstract class WritableColumnVector extends ColumnVector {
   }
 
   /**
-   * Returns the byte array for rowId.
+   * Gets the values of bytes from [rowId, rowId + count), as a UTF8String.
+   * This method is similar to {@link ColumnVector#getBytes(int, int)}, but can save data copy as
+   * UTF8String is used as a pointer.
    */
+  protected abstract UTF8String getBytesAsUTF8String(int rowId, int count);
+
   @Override
   public byte[] getBinary(int rowId) {
     if (dictionary == null) {
-      ColumnarArray array = getByteArray(rowId);
-      byte[] bytes = new byte[array.length];
-      System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
-      return bytes;
+      return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId));
     } else {
       return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org