You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jacek Laskowski <ja...@japila.pl> on 2017/07/21 07:26:36 UTC
Fwd: spark git commit: [SPARK-21472][SQL] Introduce ArrowColumnVector
as a reader for Arrow vectors.
Hi,
Looks like the change has broken the build for me:
[INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @
spark-sql_2.11 ---
/Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
error: not found: type Array
public void loadBytes(Array array) {
^
...
222 warnings found
one error found
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [ 4.864 s]
[INFO] Spark Project Tags ................................. SUCCESS [ 5.689 s]
[INFO] Spark Project Sketch ............................... SUCCESS [ 4.646 s]
[INFO] Spark Project Local DB ............................. SUCCESS [ 6.074 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 10.305 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 7.355 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 7.639 s]
[INFO] Spark Project Launcher ............................. SUCCESS [ 10.364 s]
[INFO] Spark Project Core ................................. SUCCESS [02:01 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [ 9.711 s]
[INFO] Spark Project GraphX ............................... SUCCESS [ 16.652 s]
[INFO] Spark Project Streaming ............................ SUCCESS [ 36.845 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [01:41 min]
[INFO] Spark Project SQL .................................. FAILURE [02:14 min]
Is this only me or others suffer from it too?
Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski
---------- Forwarded message ----------
From: <we...@apache.org>
Date: Thu, Jul 20, 2017 at 3:00 PM
Subject: spark git commit: [SPARK-21472][SQL] Introduce
ArrowColumnVector as a reader for Arrow vectors.
To: commits@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master 5d1850d4b -> cb19880cd
[SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow vectors.
## What changes were proposed in this pull request?
Introducing `ArrowColumnVector` as a reader for Arrow vectors.
It extends `ColumnVector`, so we will be able to use it with
`ColumnarBatch` and its functionalities.
Currently it supports primitive types and `StringType`, `ArrayType`
and `StructType`.
## How was this patch tested?
Added tests for `ArrowColumnVector` and existing tests.
Author: Takuya UESHIN <ue...@databricks.com>
Closes #18680 from ueshin/issues/SPARK-21472.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c
Branch: refs/heads/master
Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a
Parents: 5d1850d
Author: Takuya UESHIN <ue...@databricks.com>
Authored: Thu Jul 20 21:00:30 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 20 21:00:30 2017 +0800
----------------------------------------------------------------------
.../execution/vectorized/ArrowColumnVector.java | 590 +++++++++++++++++++
.../sql/execution/vectorized/ColumnVector.java | 16 +-
.../vectorized/ReadOnlyColumnVector.java | 251 ++++++++
.../sql/execution/arrow/ArrowConverters.scala | 32 +-
.../spark/sql/execution/arrow/ArrowUtils.scala | 109 ++++
.../execution/arrow/ArrowConvertersSuite.scala | 2 +-
.../sql/execution/arrow/ArrowUtilsSuite.scala | 65 ++
.../vectorized/ArrowColumnVectorSuite.scala | 410 +++++++++++++
8 files changed, 1436 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
new file mode 100644
index 0000000..68e0abc
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column vector backed by Apache Arrow.
+ */
+public final class ArrowColumnVector extends ReadOnlyColumnVector {
+
+ private final ArrowVectorAccessor accessor;
+ private final int valueCount;
+
+ private void ensureAccessible(int index) {
+ if (index < 0 || index >= valueCount) {
+ throw new IndexOutOfBoundsException(
+ String.format("index: %d, valueCount: %d", index, valueCount));
+ }
+ }
+
+ private void ensureAccessible(int index, int count) {
+ if (index < 0 || index + count > valueCount) {
+ throw new IndexOutOfBoundsException(
+ String.format("index range: [%d, %d), valueCount: %d", index,
index + count, valueCount));
+ }
+ }
+
+ @Override
+ public long nullsNativeAddress() {
+ throw new RuntimeException("Cannot get native address for arrow column");
+ }
+
+ @Override
+ public long valuesNativeAddress() {
+ throw new RuntimeException("Cannot get native address for arrow column");
+ }
+
+ @Override
+ public void close() {
+ if (childColumns != null) {
+ for (int i = 0; i < childColumns.length; i++) {
+ childColumns[i].close();
+ }
+ }
+ accessor.close();
+ }
+
+ //
+ // APIs dealing with nulls
+ //
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.isNullAt(rowId);
+ }
+
+ //
+ // APIs dealing with Booleans
+ //
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getBoolean(rowId);
+ }
+
+ @Override
+ public boolean[] getBooleans(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ boolean[] array = new boolean[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getBoolean(rowId + i);
+ }
+ return array;
+ }
+
+ //
+ // APIs dealing with Bytes
+ //
+
+ @Override
+ public byte getByte(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getByte(rowId);
+ }
+
+ @Override
+ public byte[] getBytes(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ byte[] array = new byte[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getByte(rowId + i);
+ }
+ return array;
+ }
+
+ //
+ // APIs dealing with Shorts
+ //
+
+ @Override
+ public short getShort(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getShort(rowId);
+ }
+
+ @Override
+ public short[] getShorts(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ short[] array = new short[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getShort(rowId + i);
+ }
+ return array;
+ }
+
+ //
+ // APIs dealing with Ints
+ //
+
+ @Override
+ public int getInt(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getInt(rowId);
+ }
+
+ @Override
+ public int[] getInts(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ int[] array = new int[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getInt(rowId + i);
+ }
+ return array;
+ }
+
+ @Override
+ public int getDictId(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Longs
+ //
+
+ @Override
+ public long getLong(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getLong(rowId);
+ }
+
+ @Override
+ public long[] getLongs(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ long[] array = new long[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getLong(rowId + i);
+ }
+ return array;
+ }
+
+ //
+ // APIs dealing with floats
+ //
+
+ @Override
+ public float getFloat(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getFloat(rowId);
+ }
+
+ @Override
+ public float[] getFloats(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ float[] array = new float[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getFloat(rowId + i);
+ }
+ return array;
+ }
+
+ //
+ // APIs dealing with doubles
+ //
+
+ @Override
+ public double getDouble(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getDouble(rowId);
+ }
+
+ @Override
+ public double[] getDoubles(int rowId, int count) {
+ ensureAccessible(rowId, count);
+ double[] array = new double[count];
+ for (int i = 0; i < count; ++i) {
+ array[i] = accessor.getDouble(rowId + i);
+ }
+ return array;
+ }
+
+ //
+ // APIs dealing with Arrays
+ //
+
+ @Override
+ public int getArrayLength(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getArrayLength(rowId);
+ }
+
+ @Override
+ public int getArrayOffset(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getArrayOffset(rowId);
+ }
+
+ @Override
+ public void loadBytes(Array array) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Decimals
+ //
+
+ @Override
+ public Decimal getDecimal(int rowId, int precision, int scale) {
+ ensureAccessible(rowId);
+ return accessor.getDecimal(rowId, precision, scale);
+ }
+
+ //
+ // APIs dealing with UTF8Strings
+ //
+
+ @Override
+ public UTF8String getUTF8String(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getUTF8String(rowId);
+ }
+
+ //
+ // APIs dealing with Binaries
+ //
+
+ @Override
+ public byte[] getBinary(int rowId) {
+ ensureAccessible(rowId);
+ return accessor.getBinary(rowId);
+ }
+
+ public ArrowColumnVector(ValueVector vector) {
+ super(vector.getValueCapacity(),
ArrowUtils.fromArrowField(vector.getField()),
+ MemoryMode.OFF_HEAP);
+
+ if (vector instanceof NullableBitVector) {
+ accessor = new BooleanAccessor((NullableBitVector) vector);
+ } else if (vector instanceof NullableTinyIntVector) {
+ accessor = new ByteAccessor((NullableTinyIntVector) vector);
+ } else if (vector instanceof NullableSmallIntVector) {
+ accessor = new ShortAccessor((NullableSmallIntVector) vector);
+ } else if (vector instanceof NullableIntVector) {
+ accessor = new IntAccessor((NullableIntVector) vector);
+ } else if (vector instanceof NullableBigIntVector) {
+ accessor = new LongAccessor((NullableBigIntVector) vector);
+ } else if (vector instanceof NullableFloat4Vector) {
+ accessor = new FloatAccessor((NullableFloat4Vector) vector);
+ } else if (vector instanceof NullableFloat8Vector) {
+ accessor = new DoubleAccessor((NullableFloat8Vector) vector);
+ } else if (vector instanceof NullableDecimalVector) {
+ accessor = new DecimalAccessor((NullableDecimalVector) vector);
+ } else if (vector instanceof NullableVarCharVector) {
+ accessor = new StringAccessor((NullableVarCharVector) vector);
+ } else if (vector instanceof NullableVarBinaryVector) {
+ accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
+ } else if (vector instanceof ListVector) {
+ ListVector listVector = (ListVector) vector;
+ accessor = new ArrayAccessor(listVector);
+
+ childColumns = new ColumnVector[1];
+ childColumns[0] = new ArrowColumnVector(listVector.getDataVector());
+ resultArray = new Array(childColumns[0]);
+ } else if (vector instanceof MapVector) {
+ MapVector mapVector = (MapVector) vector;
+ accessor = new StructAccessor(mapVector);
+
+ childColumns = new ArrowColumnVector[mapVector.size()];
+ for (int i = 0; i < childColumns.length; ++i) {
+ childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
+ }
+ resultStruct = new ColumnarBatch.Row(childColumns);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ valueCount = accessor.getValueCount();
+ numNulls = accessor.getNullCount();
+ anyNullsSet = numNulls > 0;
+ }
+
+ private static abstract class ArrowVectorAccessor {
+
+ private final ValueVector vector;
+ private final ValueVector.Accessor nulls;
+
+ private final int valueCount;
+ private final int nullCount;
+
+ ArrowVectorAccessor(ValueVector vector) {
+ this.vector = vector;
+ this.nulls = vector.getAccessor();
+ this.valueCount = nulls.getValueCount();
+ this.nullCount = nulls.getNullCount();
+ }
+
+ final boolean isNullAt(int rowId) {
+ return nulls.isNull(rowId);
+ }
+
+ final int getValueCount() {
+ return valueCount;
+ }
+
+ final int getNullCount() {
+ return nullCount;
+ }
+
+ final void close() {
+ vector.close();
+ }
+
+ boolean getBoolean(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ byte getByte(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ short getShort(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ int getInt(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ long getLong(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ float getFloat(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ double getDouble(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ Decimal getDecimal(int rowId, int precision, int scale) {
+ throw new UnsupportedOperationException();
+ }
+
+ UTF8String getUTF8String(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ byte[] getBinary(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ int getArrayLength(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ int getArrayOffset(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class BooleanAccessor extends ArrowVectorAccessor {
+
+ private final NullableBitVector.Accessor accessor;
+
+ BooleanAccessor(NullableBitVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final boolean getBoolean(int rowId) {
+ return accessor.get(rowId) == 1;
+ }
+ }
+
+ private static class ByteAccessor extends ArrowVectorAccessor {
+
+ private final NullableTinyIntVector.Accessor accessor;
+
+ ByteAccessor(NullableTinyIntVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final byte getByte(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class ShortAccessor extends ArrowVectorAccessor {
+
+ private final NullableSmallIntVector.Accessor accessor;
+
+ ShortAccessor(NullableSmallIntVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final short getShort(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class IntAccessor extends ArrowVectorAccessor {
+
+ private final NullableIntVector.Accessor accessor;
+
+ IntAccessor(NullableIntVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final int getInt(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class LongAccessor extends ArrowVectorAccessor {
+
+ private final NullableBigIntVector.Accessor accessor;
+
+ LongAccessor(NullableBigIntVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final long getLong(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class FloatAccessor extends ArrowVectorAccessor {
+
+ private final NullableFloat4Vector.Accessor accessor;
+
+ FloatAccessor(NullableFloat4Vector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final float getFloat(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class DoubleAccessor extends ArrowVectorAccessor {
+
+ private final NullableFloat8Vector.Accessor accessor;
+
+ DoubleAccessor(NullableFloat8Vector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final double getDouble(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class DecimalAccessor extends ArrowVectorAccessor {
+
+ private final NullableDecimalVector.Accessor accessor;
+
+ DecimalAccessor(NullableDecimalVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final Decimal getDecimal(int rowId, int precision, int scale) {
+ if (isNullAt(rowId)) return null;
+ return Decimal.apply(accessor.getObject(rowId), precision, scale);
+ }
+ }
+
+ private static class StringAccessor extends ArrowVectorAccessor {
+
+ private final NullableVarCharVector.Accessor accessor;
+ private final NullableVarCharHolder stringResult = new
NullableVarCharHolder();
+
+ StringAccessor(NullableVarCharVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final UTF8String getUTF8String(int rowId) {
+ accessor.get(rowId, stringResult);
+ if (stringResult.isSet == 0) {
+ return null;
+ } else {
+ return UTF8String.fromAddress(null,
+ stringResult.buffer.memoryAddress() + stringResult.start,
+ stringResult.end - stringResult.start);
+ }
+ }
+ }
+
+ private static class BinaryAccessor extends ArrowVectorAccessor {
+
+ private final NullableVarBinaryVector.Accessor accessor;
+
+ BinaryAccessor(NullableVarBinaryVector vector) {
+ super(vector);
+ this.accessor = vector.getAccessor();
+ }
+
+ @Override
+ final byte[] getBinary(int rowId) {
+ return accessor.getObject(rowId);
+ }
+ }
+
+ private static class ArrayAccessor extends ArrowVectorAccessor {
+
+ private final UInt4Vector.Accessor accessor;
+
+ ArrayAccessor(ListVector vector) {
+ super(vector);
+ this.accessor = vector.getOffsetVector().getAccessor();
+ }
+
+ @Override
+ final int getArrayLength(int rowId) {
+ return accessor.get(rowId + 1) - accessor.get(rowId);
+ }
+
+ @Override
+ final int getArrayOffset(int rowId) {
+ return accessor.get(rowId);
+ }
+ }
+
+ private static class StructAccessor extends ArrowVectorAccessor {
+
+ StructAccessor(MapVector vector) {
+ super(vector);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 0c027f8..7796638 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -646,7 +646,7 @@ public abstract class ColumnVector implements
AutoCloseable {
/**
* Returns the decimal for rowId.
*/
- public final Decimal getDecimal(int rowId, int precision, int scale) {
+ public Decimal getDecimal(int rowId, int precision, int scale) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
return Decimal.createUnsafe(getInt(rowId), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
@@ -661,7 +661,7 @@ public abstract class ColumnVector implements
AutoCloseable {
}
- public final void putDecimal(int rowId, Decimal value, int precision) {
+ public void putDecimal(int rowId, Decimal value, int precision) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
putInt(rowId, (int) value.toUnscaledLong());
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
@@ -675,7 +675,7 @@ public abstract class ColumnVector implements
AutoCloseable {
/**
* Returns the UTF8String for rowId.
*/
- public final UTF8String getUTF8String(int rowId) {
+ public UTF8String getUTF8String(int rowId) {
if (dictionary == null) {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
@@ -688,7 +688,7 @@ public abstract class ColumnVector implements
AutoCloseable {
/**
* Returns the byte array for rowId.
*/
- public final byte[] getBinary(int rowId) {
+ public byte[] getBinary(int rowId) {
if (dictionary == null) {
ColumnVector.Array array = getByteArray(rowId);
byte[] bytes = new byte[array.length];
@@ -956,7 +956,7 @@ public abstract class ColumnVector implements
AutoCloseable {
/**
* Data type for this column.
*/
- protected final DataType type;
+ protected DataType type;
/**
* Number of nulls in this column. This is an optimization for the
reader, to skip NULL checks.
@@ -988,17 +988,17 @@ public abstract class ColumnVector implements
AutoCloseable {
/**
* If this is a nested type (array or struct), the column for the child data.
*/
- protected final ColumnVector[] childColumns;
+ protected ColumnVector[] childColumns;
/**
* Reusable Array holder for getArray().
*/
- protected final Array resultArray;
+ protected Array resultArray;
/**
* Reusable Struct holder for getStruct().
*/
- protected final ColumnarBatch.Row resultStruct;
+ protected ColumnarBatch.Row resultStruct;
/**
* The Dictionary for this column.
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
new file mode 100644
index 0000000..e9f6e7c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
+
+/**
+ * An abstract class for read-only column vector.
+ */
+public abstract class ReadOnlyColumnVector extends ColumnVector {
+
+ protected ReadOnlyColumnVector(int capacity, DataType type,
MemoryMode memMode) {
+ super(capacity, DataTypes.NullType, memMode);
+ this.type = type;
+ isConstant = true;
+ }
+
+ //
+ // APIs dealing with nulls
+ //
+
+ @Override
+ public final void putNotNull(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putNull(int rowId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putNulls(int rowId, int count) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putNotNulls(int rowId, int count) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Booleans
+ //
+
+ @Override
+ public final void putBoolean(int rowId, boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putBooleans(int rowId, int count, boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Bytes
+ //
+
+ @Override
+ public final void putByte(int rowId, byte value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putBytes(int rowId, int count, byte value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Shorts
+ //
+
+ @Override
+ public final void putShort(int rowId, short value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putShorts(int rowId, int count, short value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putShorts(int rowId, int count, short[] src, int
srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Ints
+ //
+
+ @Override
+ public final void putInt(int rowId, int value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putInts(int rowId, int count, int value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putIntsLittleEndian(int rowId, int count, byte[]
src, int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Longs
+ //
+
+ @Override
+ public final void putLong(int rowId, long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putLongs(int rowId, int count, long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putLongsLittleEndian(int rowId, int count, byte[]
src, int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with floats
+ //
+
+ @Override
+ public final void putFloat(int rowId, float value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putFloats(int rowId, int count, float value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putFloats(int rowId, int count, float[] src, int
srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with doubles
+ //
+
+ @Override
+ public final void putDouble(int rowId, double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, double[] src,
int srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, byte[] src, int
srcIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Arrays
+ //
+
+ @Override
+ public final void putArray(int rowId, int offset, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Byte Arrays
+ //
+
+ @Override
+ public final int putByteArray(int rowId, byte[] value, int offset,
int count) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // APIs dealing with Decimals
+ //
+
+ @Override
+ public final void putDecimal(int rowId, Decimal value, int precision) {
+ throw new UnsupportedOperationException();
+ }
+
+ //
+ // Other APIs
+ //
+
+ @Override
+ public final void setDictionary(Dictionary dictionary) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final ColumnVector reserveDictionaryIds(int capacity) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected final void reserveInternal(int newCapacity) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 6af5c73..c913efe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -71,34 +71,6 @@ private[sql] object ArrowPayload {
private[sql] object ArrowConverters {
/**
- * Map a Spark DataType to ArrowType.
- */
- private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
- dataType match {
- case BooleanType => ArrowType.Bool.INSTANCE
- case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
- case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
- case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
- case FloatType => new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
- case DoubleType => new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
- case ByteType => new ArrowType.Int(8, true)
- case StringType => ArrowType.Utf8.INSTANCE
- case BinaryType => ArrowType.Binary.INSTANCE
- case _ => throw new UnsupportedOperationException(s"Unsupported
data type: $dataType")
- }
- }
-
- /**
- * Convert a Spark Dataset schema to Arrow schema.
- */
- private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
- val arrowFields = schema.fields.map { f =>
- new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType),
List.empty[Field].asJava)
- }
- new Schema(arrowFields.toList.asJava)
- }
-
- /**
* Maps Iterator from InternalRow to ArrowPayload. Limit
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
@@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
batch: ArrowRecordBatch,
schema: StructType,
allocator: BufferAllocator): Array[Byte] = {
- val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
+ val arrowSchema = ArrowUtils.toArrowSchema(schema)
val root = VectorSchemaRoot.create(arrowSchema, allocator)
val out = new ByteArrayOutputStream()
val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
@@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
* Create an Arrow ColumnWriter given the type and ordinal of row.
*/
def apply(dataType: DataType, ordinal: Int, allocator:
BufferAllocator): ColumnWriter = {
- val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
+ val dtype = ArrowUtils.toArrowType(dataType)
dataType match {
case BooleanType => new BooleanColumnWriter(dtype, ordinal, allocator)
case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
new file mode 100644
index 0000000..2caf1ef
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.memory.RootAllocator
+import org.apache.arrow.vector.types.FloatingPointPrecision
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
+
+import org.apache.spark.sql.types._
+
+object ArrowUtils {
+
+ val rootAllocator = new RootAllocator(Long.MaxValue)
+
+ // todo: support more types.
+
+ def toArrowType(dt: DataType): ArrowType = dt match {
+ case BooleanType => ArrowType.Bool.INSTANCE
+ case ByteType => new ArrowType.Int(8, true)
+ case ShortType => new ArrowType.Int(8 * 2, true)
+ case IntegerType => new ArrowType.Int(8 * 4, true)
+ case LongType => new ArrowType.Int(8 * 8, true)
+ case FloatType => new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
+ case DoubleType => new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
+ case StringType => ArrowType.Utf8.INSTANCE
+ case BinaryType => ArrowType.Binary.INSTANCE
+ case DecimalType.Fixed(precision, scale) => new
ArrowType.Decimal(precision, scale)
+ case _ => throw new UnsupportedOperationException(s"Unsupported
data type: ${dt.simpleString}")
+ }
+
+ def fromArrowType(dt: ArrowType): DataType = dt match {
+ case ArrowType.Bool.INSTANCE => BooleanType
+ case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 => ByteType
+ case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 2 => ShortType
+ case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 4 => IntegerType
+ case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
8 * 8 => LongType
+ case float: ArrowType.FloatingPoint
+ if float.getPrecision() == FloatingPointPrecision.SINGLE => FloatType
+ case float: ArrowType.FloatingPoint
+ if float.getPrecision() == FloatingPointPrecision.DOUBLE => DoubleType
+ case ArrowType.Utf8.INSTANCE => StringType
+ case ArrowType.Binary.INSTANCE => BinaryType
+ case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
+ case _ => throw new UnsupportedOperationException(s"Unsupported
data type: $dt")
+ }
+
+ def toArrowField(name: String, dt: DataType, nullable: Boolean): Field = {
+ dt match {
+ case ArrayType(elementType, containsNull) =>
+ val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null)
+ new Field(name, fieldType, Seq(toArrowField("element",
elementType, containsNull)).asJava)
+ case StructType(fields) =>
+ val fieldType = new FieldType(nullable,
ArrowType.Struct.INSTANCE, null)
+ new Field(name, fieldType,
+ fields.map { field =>
+ toArrowField(field.name, field.dataType, field.nullable)
+ }.toSeq.asJava)
+ case dataType =>
+ val fieldType = new FieldType(nullable, toArrowType(dataType), null)
+ new Field(name, fieldType, Seq.empty[Field].asJava)
+ }
+ }
+
+ def fromArrowField(field: Field): DataType = {
+ field.getType match {
+ case ArrowType.List.INSTANCE =>
+ val elementField = field.getChildren().get(0)
+ val elementType = fromArrowField(elementField)
+ ArrayType(elementType, containsNull = elementField.isNullable)
+ case ArrowType.Struct.INSTANCE =>
+ val fields = field.getChildren().asScala.map { child =>
+ val dt = fromArrowField(child)
+ StructField(child.getName, dt, child.isNullable)
+ }
+ StructType(fields)
+ case arrowType => fromArrowType(arrowType)
+ }
+ }
+
+ def toArrowSchema(schema: StructType): Schema = {
+ new Schema(schema.map { field =>
+ toArrowField(field.name, field.dataType, field.nullable)
+ }.asJava)
+ }
+
+ def fromArrowSchema(schema: Schema): StructType = {
+ StructType(schema.getFields.asScala.map { field =>
+ val dt = fromArrowField(field)
+ StructField(field.getName, dt, field.isNullable)
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index 159328c..55b4655 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends
SharedSQLContext with BeforeAndAfterAll {
val allocator = new RootAllocator(Long.MaxValue)
val jsonReader = new JsonFileReader(jsonFile, allocator)
- val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema)
+ val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema)
val jsonSchema = jsonReader.start()
Validator.compareSchemas(arrowSchema, jsonSchema)
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
new file mode 100644
index 0000000..638619f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types._
+
+class ArrowUtilsSuite extends SparkFunSuite {
+
+ def roundtrip(dt: DataType): Unit = {
+ dt match {
+ case schema: StructType =>
+ assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema))
=== schema)
+ case _ =>
+ roundtrip(new StructType().add("value", dt))
+ }
+ }
+
+ test("simple") {
+ roundtrip(BooleanType)
+ roundtrip(ByteType)
+ roundtrip(ShortType)
+ roundtrip(IntegerType)
+ roundtrip(LongType)
+ roundtrip(FloatType)
+ roundtrip(DoubleType)
+ roundtrip(StringType)
+ roundtrip(BinaryType)
+ roundtrip(DecimalType.SYSTEM_DEFAULT)
+ }
+
+ test("array") {
+ roundtrip(ArrayType(IntegerType, containsNull = true))
+ roundtrip(ArrayType(IntegerType, containsNull = false))
+ roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
containsNull = true))
+ roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
containsNull = true))
+ roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
containsNull = false))
+ roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
containsNull = false))
+ }
+
+ test("struct") {
+ roundtrip(new StructType())
+ roundtrip(new StructType().add("i", IntegerType))
+ roundtrip(new StructType().add("arr", ArrayType(IntegerType)))
+ roundtrip(new StructType().add("i", IntegerType).add("arr",
ArrayType(IntegerType)))
+ roundtrip(new StructType().add(
+ "struct",
+ new StructType().add("i", IntegerType).add("arr",
ArrayType(IntegerType))))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
new file mode 100644
index 0000000..d24a9e1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.arrow.ArrowUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ArrowColumnVectorSuite extends SparkFunSuite {
+
+ test("boolean") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("boolean", 0,
Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("boolean", BooleanType,
nullable = true)
+ .createVector(allocator).asInstanceOf[NullableBitVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, if (i % 2 == 0) 1 else 0)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === BooleanType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getBoolean(i) === (i % 2 == 0))
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i =>
(i % 2 == 0)))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("byte") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("byte", ByteType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableTinyIntVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, i.toByte)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === ByteType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getByte(i) === i.toByte)
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getBytes(0, 10) === (0 until 10).map(i => i.toByte))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("short") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("short", ShortType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableSmallIntVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, i.toShort)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === ShortType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getShort(i) === i.toShort)
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getShorts(0, 10) === (0 until 10).map(i => i.toShort))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("int") {
+ val allocator = ArrowUtils.rootAllocator.newChildAllocator("int",
0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableIntVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, i)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === IntegerType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getInt(i) === i)
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getInts(0, 10) === (0 until 10))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("long") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("long", LongType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableBigIntVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, i.toLong)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === LongType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getLong(i) === i.toLong)
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getLongs(0, 10) === (0 until 10).map(i => i.toLong))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("float") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("float", FloatType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableFloat4Vector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, i.toFloat)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === FloatType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getFloat(i) === i.toFloat)
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getFloats(0, 10) === (0 until 10).map(i => i.toFloat))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("double") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("double", DoubleType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableFloat8Vector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ mutator.setSafe(i, i.toDouble)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === DoubleType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getDouble(i) === i.toDouble)
+ }
+ assert(columnVector.isNullAt(10))
+
+ assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i =>
i.toDouble))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("string") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("string", StringType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableVarCharVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ val utf8 = s"str$i".getBytes("utf8")
+ mutator.setSafe(i, utf8, 0, utf8.length)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === StringType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getUTF8String(i) === UTF8String.fromString(s"str$i"))
+ }
+ assert(columnVector.isNullAt(10))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("binary") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableVarBinaryVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+
+ (0 until 10).foreach { i =>
+ val utf8 = s"str$i".getBytes("utf8")
+ mutator.setSafe(i, utf8, 0, utf8.length)
+ }
+ mutator.setNull(10)
+ mutator.setValueCount(11)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === BinaryType)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ (0 until 10).foreach { i =>
+ assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8"))
+ }
+ assert(columnVector.isNullAt(10))
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("array") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue)
+ val vector = ArrowUtils.toArrowField("array",
ArrayType(IntegerType), nullable = true)
+ .createVector(allocator).asInstanceOf[ListVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+ val elementVector = vector.getDataVector().asInstanceOf[NullableIntVector]
+ val elementMutator = elementVector.getMutator()
+
+ // [1, 2]
+ mutator.startNewValue(0)
+ elementMutator.setSafe(0, 1)
+ elementMutator.setSafe(1, 2)
+ mutator.endValue(0, 2)
+
+ // [3, null, 5]
+ mutator.startNewValue(1)
+ elementMutator.setSafe(2, 3)
+ elementMutator.setNull(3)
+ elementMutator.setSafe(4, 5)
+ mutator.endValue(1, 3)
+
+ // null
+
+ // []
+ mutator.startNewValue(3)
+ mutator.endValue(3, 0)
+
+ elementMutator.setValueCount(5)
+ mutator.setValueCount(4)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === ArrayType(IntegerType))
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ val array0 = columnVector.getArray(0)
+ assert(array0.numElements() === 2)
+ assert(array0.getInt(0) === 1)
+ assert(array0.getInt(1) === 2)
+
+ val array1 = columnVector.getArray(1)
+ assert(array1.numElements() === 3)
+ assert(array1.getInt(0) === 3)
+ assert(array1.isNullAt(1))
+ assert(array1.getInt(2) === 5)
+
+ assert(columnVector.isNullAt(2))
+
+ val array3 = columnVector.getArray(3)
+ assert(array3.numElements() === 0)
+
+ columnVector.close()
+ allocator.close()
+ }
+
+ test("struct") {
+ val allocator =
ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
+ val schema = new StructType().add("int", IntegerType).add("long", LongType)
+ val vector = ArrowUtils.toArrowField("struct", schema, nullable = true)
+ .createVector(allocator).asInstanceOf[NullableMapVector]
+ vector.allocateNew()
+ val mutator = vector.getMutator()
+ val intVector = vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector]
+ val intMutator = intVector.getMutator()
+ val longVector =
vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector]
+ val longMutator = longVector.getMutator()
+
+ // (1, 1L)
+ mutator.setIndexDefined(0)
+ intMutator.setSafe(0, 1)
+ longMutator.setSafe(0, 1L)
+
+ // (2, null)
+ mutator.setIndexDefined(1)
+ intMutator.setSafe(1, 2)
+ longMutator.setNull(1)
+
+ // (null, 3L)
+ mutator.setIndexDefined(2)
+ intMutator.setNull(2)
+ longMutator.setSafe(2, 3L)
+
+ // null
+ mutator.setNull(3)
+
+ // (5, 5L)
+ mutator.setIndexDefined(4)
+ intMutator.setSafe(4, 5)
+ longMutator.setSafe(4, 5L)
+
+ intMutator.setValueCount(5)
+ longMutator.setValueCount(5)
+ mutator.setValueCount(5)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === schema)
+ assert(columnVector.anyNullsSet)
+ assert(columnVector.numNulls === 1)
+
+ val row0 = columnVector.getStruct(0, 2)
+ assert(row0.getInt(0) === 1)
+ assert(row0.getLong(1) === 1L)
+
+ val row1 = columnVector.getStruct(1, 2)
+ assert(row1.getInt(0) === 2)
+ assert(row1.isNullAt(1))
+
+ val row2 = columnVector.getStruct(2, 2)
+ assert(row2.isNullAt(0))
+ assert(row2.getLong(1) === 3L)
+
+ assert(columnVector.isNullAt(3))
+
+ val row4 = columnVector.getStruct(4, 2)
+ assert(row4.getInt(0) === 5)
+ assert(row4.getLong(1) === 5L)
+
+ columnVector.close()
+ allocator.close()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
Re: Fwd: spark git commit: [SPARK-21472][SQL] Introduce
ArrowColumnVector as a reader for Arrow vectors.
Posted by Takuya UESHIN <ue...@happy-camper.st>.
Hi,
Thank you for reporting it.
I believe it's fixed now.
If you still have a problem, please let me know.
Thanks.
On Fri, Jul 21, 2017 at 4:57 PM, Liang-Chi Hsieh <vi...@gmail.com> wrote:
>
> Yeah, I think it should be "ColumnVector.Array". Already ping @ueshin for
> this issue.
>
>
> Jacek Laskowski wrote
> > Hi,
> >
> > Looks like the change has broken the build for me:
> >
> > [INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @
> > spark-sql_2.11 ---
> > /Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache
> /spark/sql/execution/vectorized/ArrowColumnVector.java:243:
> > error: not found: type Array
> > public void loadBytes(Array array) {
> > ^
> >
> > ...
> >
> > 222 warnings found
> > one error found
> > [INFO]
> > ------------------------------------------------------------------------
> > [INFO] Reactor Summary:
> > [INFO]
> > [INFO] Spark Project Parent POM ........................... SUCCESS [
> > 4.864 s]
> > [INFO] Spark Project Tags ................................. SUCCESS [
> > 5.689 s]
> > [INFO] Spark Project Sketch ............................... SUCCESS [
> > 4.646 s]
> > [INFO] Spark Project Local DB ............................. SUCCESS [
> > 6.074 s]
> > [INFO] Spark Project Networking ........................... SUCCESS [
> > 10.305 s]
> > [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [
> > 7.355 s]
> > [INFO] Spark Project Unsafe ............................... SUCCESS [
> > 7.639 s]
> > [INFO] Spark Project Launcher ............................. SUCCESS [
> > 10.364 s]
> > [INFO] Spark Project Core ................................. SUCCESS
> [02:01
> > min]
> > [INFO] Spark Project ML Local Library ..................... SUCCESS [
> > 9.711 s]
> > [INFO] Spark Project GraphX ............................... SUCCESS [
> > 16.652 s]
> > [INFO] Spark Project Streaming ............................ SUCCESS [
> > 36.845 s]
> > [INFO] Spark Project Catalyst ............................. SUCCESS
> [01:41
> > min]
> > [INFO] Spark Project SQL .................................. FAILURE
> [02:14
> > min]
> >
> > Is this only me or others suffer from it too?
> >
> > Pozdrawiam,
> > Jacek Laskowski
> > ----
> > https://medium.com/@jaceklaskowski/
> > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> > Follow me at https://twitter.com/jaceklaskowski
> >
> >
> >
> > ---------- Forwarded message ----------
> > From: <
>
> > wenchen@
>
> > >
> > Date: Thu, Jul 20, 2017 at 3:00 PM
> > Subject: spark git commit: [SPARK-21472][SQL] Introduce
> > ArrowColumnVector as a reader for Arrow vectors.
> > To:
>
> > commits@.apache
>
> >
> >
> > Repository: spark
> > Updated Branches:
> > refs/heads/master 5d1850d4b -> cb19880cd
> >
> >
> > [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow
> > vectors.
> >
> > ## What changes were proposed in this pull request?
> >
> > Introducing `ArrowColumnVector` as a reader for Arrow vectors.
> > It extends `ColumnVector`, so we will be able to use it with
> > `ColumnarBatch` and its functionalities.
> > Currently it supports primitive types and `StringType`, `ArrayType`
> > and `StructType`.
> >
> > ## How was this patch tested?
> >
> > Added tests for `ArrowColumnVector` and existing tests.
> >
> > Author: Takuya UESHIN <
>
> > ueshin@
>
> > >
> >
> > Closes #18680 from ueshin/issues/SPARK-21472.
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/spark/repo
> > Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c
> > Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c
> > Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c
> >
> > Branch: refs/heads/master
> > Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a
> > Parents: 5d1850d
> > Author: Takuya UESHIN <
>
> > ueshin@
>
> > >
> > Authored: Thu Jul 20 21:00:30 2017 +0800
> > Committer: Wenchen Fan <
>
> > wenchen@
>
> > >
> > Committed: Thu Jul 20 21:00:30 2017 +0800
> >
> > ----------------------------------------------------------------------
> > .../execution/vectorized/ArrowColumnVector.java | 590
> +++++++++++++++++++
> > .../sql/execution/vectorized/ColumnVector.java | 16 +-
> > .../vectorized/ReadOnlyColumnVector.java | 251 ++++++++
> > .../sql/execution/arrow/ArrowConverters.scala | 32 +-
> > .../spark/sql/execution/arrow/ArrowUtils.scala | 109 ++++
> > .../execution/arrow/ArrowConvertersSuite.scala | 2 +-
> > .../sql/execution/arrow/ArrowUtilsSuite.scala | 65 ++
> > .../vectorized/ArrowColumnVectorSuite.scala | 410 +++++++++++++
> > 8 files changed, 1436 insertions(+), 39 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/main/java/org/apache/spark/sql/execution/vectori
> zed/ArrowColumnVector.java
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ArrowColumnVector.java
> > b/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ArrowColumnVector.java
> > new file mode 100644
> > index 0000000..68e0abc
> > --- /dev/null
> > +++
> > b/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ArrowColumnVector.java
> > @@ -0,0 +1,590 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.spark.sql.execution.vectorized;
> > +
> > +import org.apache.arrow.vector.*;
> > +import org.apache.arrow.vector.complex.*;
> > +import org.apache.arrow.vector.holders.NullableVarCharHolder;
> > +
> > +import org.apache.spark.memory.MemoryMode;
> > +import org.apache.spark.sql.execution.arrow.ArrowUtils;
> > +import org.apache.spark.sql.types.*;
> > +import org.apache.spark.unsafe.types.UTF8String;
> > +
> > +/**
> > + * A column vector backed by Apache Arrow.
> > + */
> > +public final class ArrowColumnVector extends ReadOnlyColumnVector {
> > +
> > + private final ArrowVectorAccessor accessor;
> > + private final int valueCount;
> > +
> > + private void ensureAccessible(int index) {
> > + if (index < 0 || index >= valueCount) {
> > + throw new IndexOutOfBoundsException(
> > + String.format("index: %d, valueCount: %d", index, valueCount));
> > + }
> > + }
> > +
> > + private void ensureAccessible(int index, int count) {
> > + if (index < 0 || index + count > valueCount) {
> > + throw new IndexOutOfBoundsException(
> > + String.format("index range: [%d, %d), valueCount: %d", index,
> > index + count, valueCount));
> > + }
> > + }
> > +
> > + @Override
> > + public long nullsNativeAddress() {
> > + throw new RuntimeException("Cannot get native address for arrow
> > column");
> > + }
> > +
> > + @Override
> > + public long valuesNativeAddress() {
> > + throw new RuntimeException("Cannot get native address for arrow
> > column");
> > + }
> > +
> > + @Override
> > + public void close() {
> > + if (childColumns != null) {
> > + for (int i = 0; i < childColumns.length; i++) {
> > + childColumns[i].close();
> > + }
> > + }
> > + accessor.close();
> > + }
> > +
> > + //
> > + // APIs dealing with nulls
> > + //
> > +
> > + @Override
> > + public boolean isNullAt(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.isNullAt(rowId);
> > + }
> > +
> > + //
> > + // APIs dealing with Booleans
> > + //
> > +
> > + @Override
> > + public boolean getBoolean(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getBoolean(rowId);
> > + }
> > +
> > + @Override
> > + public boolean[] getBooleans(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + boolean[] array = new boolean[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getBoolean(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + //
> > + // APIs dealing with Bytes
> > + //
> > +
> > + @Override
> > + public byte getByte(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getByte(rowId);
> > + }
> > +
> > + @Override
> > + public byte[] getBytes(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + byte[] array = new byte[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getByte(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + //
> > + // APIs dealing with Shorts
> > + //
> > +
> > + @Override
> > + public short getShort(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getShort(rowId);
> > + }
> > +
> > + @Override
> > + public short[] getShorts(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + short[] array = new short[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getShort(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + //
> > + // APIs dealing with Ints
> > + //
> > +
> > + @Override
> > + public int getInt(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getInt(rowId);
> > + }
> > +
> > + @Override
> > + public int[] getInts(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + int[] array = new int[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getInt(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + @Override
> > + public int getDictId(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Longs
> > + //
> > +
> > + @Override
> > + public long getLong(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getLong(rowId);
> > + }
> > +
> > + @Override
> > + public long[] getLongs(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + long[] array = new long[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getLong(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + //
> > + // APIs dealing with floats
> > + //
> > +
> > + @Override
> > + public float getFloat(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getFloat(rowId);
> > + }
> > +
> > + @Override
> > + public float[] getFloats(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + float[] array = new float[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getFloat(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + //
> > + // APIs dealing with doubles
> > + //
> > +
> > + @Override
> > + public double getDouble(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getDouble(rowId);
> > + }
> > +
> > + @Override
> > + public double[] getDoubles(int rowId, int count) {
> > + ensureAccessible(rowId, count);
> > + double[] array = new double[count];
> > + for (int i = 0; i < count; ++i) {
> > + array[i] = accessor.getDouble(rowId + i);
> > + }
> > + return array;
> > + }
> > +
> > + //
> > + // APIs dealing with Arrays
> > + //
> > +
> > + @Override
> > + public int getArrayLength(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getArrayLength(rowId);
> > + }
> > +
> > + @Override
> > + public int getArrayOffset(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getArrayOffset(rowId);
> > + }
> > +
> > + @Override
> > + public void loadBytes(Array array) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Decimals
> > + //
> > +
> > + @Override
> > + public Decimal getDecimal(int rowId, int precision, int scale) {
> > + ensureAccessible(rowId);
> > + return accessor.getDecimal(rowId, precision, scale);
> > + }
> > +
> > + //
> > + // APIs dealing with UTF8Strings
> > + //
> > +
> > + @Override
> > + public UTF8String getUTF8String(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getUTF8String(rowId);
> > + }
> > +
> > + //
> > + // APIs dealing with Binaries
> > + //
> > +
> > + @Override
> > + public byte[] getBinary(int rowId) {
> > + ensureAccessible(rowId);
> > + return accessor.getBinary(rowId);
> > + }
> > +
> > + public ArrowColumnVector(ValueVector vector) {
> > + super(vector.getValueCapacity(),
> > ArrowUtils.fromArrowField(vector.getField()),
> > + MemoryMode.OFF_HEAP);
> > +
> > + if (vector instanceof NullableBitVector) {
> > + accessor = new BooleanAccessor((NullableBitVector) vector);
> > + } else if (vector instanceof NullableTinyIntVector) {
> > + accessor = new ByteAccessor((NullableTinyIntVector) vector);
> > + } else if (vector instanceof NullableSmallIntVector) {
> > + accessor = new ShortAccessor((NullableSmallIntVector) vector);
> > + } else if (vector instanceof NullableIntVector) {
> > + accessor = new IntAccessor((NullableIntVector) vector);
> > + } else if (vector instanceof NullableBigIntVector) {
> > + accessor = new LongAccessor((NullableBigIntVector) vector);
> > + } else if (vector instanceof NullableFloat4Vector) {
> > + accessor = new FloatAccessor((NullableFloat4Vector) vector);
> > + } else if (vector instanceof NullableFloat8Vector) {
> > + accessor = new DoubleAccessor((NullableFloat8Vector) vector);
> > + } else if (vector instanceof NullableDecimalVector) {
> > + accessor = new DecimalAccessor((NullableDecimalVector) vector);
> > + } else if (vector instanceof NullableVarCharVector) {
> > + accessor = new StringAccessor((NullableVarCharVector) vector);
> > + } else if (vector instanceof NullableVarBinaryVector) {
> > + accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
> > + } else if (vector instanceof ListVector) {
> > + ListVector listVector = (ListVector) vector;
> > + accessor = new ArrayAccessor(listVector);
> > +
> > + childColumns = new ColumnVector[1];
> > + childColumns[0] = new
> > ArrowColumnVector(listVector.getDataVector());
> > + resultArray = new Array(childColumns[0]);
> > + } else if (vector instanceof MapVector) {
> > + MapVector mapVector = (MapVector) vector;
> > + accessor = new StructAccessor(mapVector);
> > +
> > + childColumns = new ArrowColumnVector[mapVector.size()];
> > + for (int i = 0; i < childColumns.length; ++i) {
> > + childColumns[i] = new
> > ArrowColumnVector(mapVector.getVectorById(i));
> > + }
> > + resultStruct = new ColumnarBatch.Row(childColumns);
> > + } else {
> > + throw new UnsupportedOperationException();
> > + }
> > + valueCount = accessor.getValueCount();
> > + numNulls = accessor.getNullCount();
> > + anyNullsSet = numNulls > 0;
> > + }
> > +
> > + private static abstract class ArrowVectorAccessor {
> > +
> > + private final ValueVector vector;
> > + private final ValueVector.Accessor nulls;
> > +
> > + private final int valueCount;
> > + private final int nullCount;
> > +
> > + ArrowVectorAccessor(ValueVector vector) {
> > + this.vector = vector;
> > + this.nulls = vector.getAccessor();
> > + this.valueCount = nulls.getValueCount();
> > + this.nullCount = nulls.getNullCount();
> > + }
> > +
> > + final boolean isNullAt(int rowId) {
> > + return nulls.isNull(rowId);
> > + }
> > +
> > + final int getValueCount() {
> > + return valueCount;
> > + }
> > +
> > + final int getNullCount() {
> > + return nullCount;
> > + }
> > +
> > + final void close() {
> > + vector.close();
> > + }
> > +
> > + boolean getBoolean(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + byte getByte(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + short getShort(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + int getInt(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + long getLong(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + float getFloat(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + double getDouble(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + Decimal getDecimal(int rowId, int precision, int scale) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + UTF8String getUTF8String(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + byte[] getBinary(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + int getArrayLength(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + int getArrayOffset(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > + }
> > +
> > + private static class BooleanAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableBitVector.Accessor accessor;
> > +
> > + BooleanAccessor(NullableBitVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final boolean getBoolean(int rowId) {
> > + return accessor.get(rowId) == 1;
> > + }
> > + }
> > +
> > + private static class ByteAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableTinyIntVector.Accessor accessor;
> > +
> > + ByteAccessor(NullableTinyIntVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final byte getByte(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class ShortAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableSmallIntVector.Accessor accessor;
> > +
> > + ShortAccessor(NullableSmallIntVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final short getShort(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class IntAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableIntVector.Accessor accessor;
> > +
> > + IntAccessor(NullableIntVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final int getInt(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class LongAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableBigIntVector.Accessor accessor;
> > +
> > + LongAccessor(NullableBigIntVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final long getLong(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class FloatAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableFloat4Vector.Accessor accessor;
> > +
> > + FloatAccessor(NullableFloat4Vector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final float getFloat(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class DoubleAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableFloat8Vector.Accessor accessor;
> > +
> > + DoubleAccessor(NullableFloat8Vector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final double getDouble(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class DecimalAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableDecimalVector.Accessor accessor;
> > +
> > + DecimalAccessor(NullableDecimalVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final Decimal getDecimal(int rowId, int precision, int scale) {
> > + if (isNullAt(rowId)) return null;
> > + return Decimal.apply(accessor.getObject(rowId), precision,
> scale);
> > + }
> > + }
> > +
> > + private static class StringAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableVarCharVector.Accessor accessor;
> > + private final NullableVarCharHolder stringResult = new
> > NullableVarCharHolder();
> > +
> > + StringAccessor(NullableVarCharVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final UTF8String getUTF8String(int rowId) {
> > + accessor.get(rowId, stringResult);
> > + if (stringResult.isSet == 0) {
> > + return null;
> > + } else {
> > + return UTF8String.fromAddress(null,
> > + stringResult.buffer.memoryAddress() + stringResult.start,
> > + stringResult.end - stringResult.start);
> > + }
> > + }
> > + }
> > +
> > + private static class BinaryAccessor extends ArrowVectorAccessor {
> > +
> > + private final NullableVarBinaryVector.Accessor accessor;
> > +
> > + BinaryAccessor(NullableVarBinaryVector vector) {
> > + super(vector);
> > + this.accessor = vector.getAccessor();
> > + }
> > +
> > + @Override
> > + final byte[] getBinary(int rowId) {
> > + return accessor.getObject(rowId);
> > + }
> > + }
> > +
> > + private static class ArrayAccessor extends ArrowVectorAccessor {
> > +
> > + private final UInt4Vector.Accessor accessor;
> > +
> > + ArrayAccessor(ListVector vector) {
> > + super(vector);
> > + this.accessor = vector.getOffsetVector().getAccessor();
> > + }
> > +
> > + @Override
> > + final int getArrayLength(int rowId) {
> > + return accessor.get(rowId + 1) - accessor.get(rowId);
> > + }
> > +
> > + @Override
> > + final int getArrayOffset(int rowId) {
> > + return accessor.get(rowId);
> > + }
> > + }
> > +
> > + private static class StructAccessor extends ArrowVectorAccessor {
> > +
> > + StructAccessor(MapVector vector) {
> > + super(vector);
> > + }
> > + }
> > +}
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/main/java/org/apache/spark/sql/execution/vectori
> zed/ColumnVector.java
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ColumnVector.java
> > b/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ColumnVector.java
> > index 0c027f8..7796638 100644
> > ---
> > a/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ColumnVector.java
> > +++
> > b/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ColumnVector.java
> > @@ -646,7 +646,7 @@ public abstract class ColumnVector implements
> > AutoCloseable {
> > /**
> > * Returns the decimal for rowId.
> > */
> > - public final Decimal getDecimal(int rowId, int precision, int scale) {
> > + public Decimal getDecimal(int rowId, int precision, int scale) {
> > if (precision <= Decimal.MAX_INT_DIGITS()) {
> > return Decimal.createUnsafe(getInt(rowId), precision, scale);
> > } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
> > @@ -661,7 +661,7 @@ public abstract class ColumnVector implements
> > AutoCloseable {
> > }
> >
> >
> > - public final void putDecimal(int rowId, Decimal value, int precision)
> {
> > + public void putDecimal(int rowId, Decimal value, int precision) {
> > if (precision <= Decimal.MAX_INT_DIGITS()) {
> > putInt(rowId, (int) value.toUnscaledLong());
> > } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
> > @@ -675,7 +675,7 @@ public abstract class ColumnVector implements
> > AutoCloseable {
> > /**
> > * Returns the UTF8String for rowId.
> > */
> > - public final UTF8String getUTF8String(int rowId) {
> > + public UTF8String getUTF8String(int rowId) {
> > if (dictionary == null) {
> > ColumnVector.Array a = getByteArray(rowId);
> > return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset,
> > a.length);
> > @@ -688,7 +688,7 @@ public abstract class ColumnVector implements
> > AutoCloseable {
> > /**
> > * Returns the byte array for rowId.
> > */
> > - public final byte[] getBinary(int rowId) {
> > + public byte[] getBinary(int rowId) {
> > if (dictionary == null) {
> > ColumnVector.Array array = getByteArray(rowId);
> > byte[] bytes = new byte[array.length];
> > @@ -956,7 +956,7 @@ public abstract class ColumnVector implements
> > AutoCloseable {
> > /**
> > * Data type for this column.
> > */
> > - protected final DataType type;
> > + protected DataType type;
> >
> > /**
> > * Number of nulls in this column. This is an optimization for the
> > reader, to skip NULL checks.
> > @@ -988,17 +988,17 @@ public abstract class ColumnVector implements
> > AutoCloseable {
> > /**
> > * If this is a nested type (array or struct), the column for the
> child
> > data.
> > */
> > - protected final ColumnVector[] childColumns;
> > + protected ColumnVector[] childColumns;
> >
> > /**
> > * Reusable Array holder for getArray().
> > */
> > - protected final Array resultArray;
> > + protected Array resultArray;
> >
> > /**
> > * Reusable Struct holder for getStruct().
> > */
> > - protected final ColumnarBatch.Row resultStruct;
> > + protected ColumnarBatch.Row resultStruct;
> >
> > /**
> > * The Dictionary for this column.
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/main/java/org/apache/spark/sql/execution/vectori
> zed/ReadOnlyColumnVector.java
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ReadOnlyColumnVector.java
> > b/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ReadOnlyColumnVector.java
> > new file mode 100644
> > index 0000000..e9f6e7c
> > --- /dev/null
> > +++
> > b/sql/core/src/main/java/org/apache/spark/sql/execution/vect
> orized/ReadOnlyColumnVector.java
> > @@ -0,0 +1,251 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.spark.sql.execution.vectorized;
> > +
> > +import org.apache.spark.memory.MemoryMode;
> > +import org.apache.spark.sql.types.*;
> > +
> > +/**
> > + * An abstract class for read-only column vector.
> > + */
> > +public abstract class ReadOnlyColumnVector extends ColumnVector {
> > +
> > + protected ReadOnlyColumnVector(int capacity, DataType type,
> > MemoryMode memMode) {
> > + super(capacity, DataTypes.NullType, memMode);
> > + this.type = type;
> > + isConstant = true;
> > + }
> > +
> > + //
> > + // APIs dealing with nulls
> > + //
> > +
> > + @Override
> > + public final void putNotNull(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putNull(int rowId) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putNulls(int rowId, int count) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putNotNulls(int rowId, int count) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Booleans
> > + //
> > +
> > + @Override
> > + public final void putBoolean(int rowId, boolean value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putBooleans(int rowId, int count, boolean value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Bytes
> > + //
> > +
> > + @Override
> > + public final void putByte(int rowId, byte value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putBytes(int rowId, int count, byte value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putBytes(int rowId, int count, byte[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Shorts
> > + //
> > +
> > + @Override
> > + public final void putShort(int rowId, short value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putShorts(int rowId, int count, short value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putShorts(int rowId, int count, short[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Ints
> > + //
> > +
> > + @Override
> > + public final void putInt(int rowId, int value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putInts(int rowId, int count, int value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putInts(int rowId, int count, int[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putIntsLittleEndian(int rowId, int count, byte[]
> > src, int srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Longs
> > + //
> > +
> > + @Override
> > + public final void putLong(int rowId, long value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putLongs(int rowId, int count, long value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putLongs(int rowId, int count, long[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putLongsLittleEndian(int rowId, int count, byte[]
> > src, int srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with floats
> > + //
> > +
> > + @Override
> > + public final void putFloat(int rowId, float value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putFloats(int rowId, int count, float value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putFloats(int rowId, int count, float[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putFloats(int rowId, int count, byte[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with doubles
> > + //
> > +
> > + @Override
> > + public final void putDouble(int rowId, double value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putDoubles(int rowId, int count, double value) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putDoubles(int rowId, int count, double[] src,
> > int srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final void putDoubles(int rowId, int count, byte[] src, int
> > srcIndex) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Arrays
> > + //
> > +
> > + @Override
> > + public final void putArray(int rowId, int offset, int length) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Byte Arrays
> > + //
> > +
> > + @Override
> > + public final int putByteArray(int rowId, byte[] value, int offset,
> > int count) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // APIs dealing with Decimals
> > + //
> > +
> > + @Override
> > + public final void putDecimal(int rowId, Decimal value, int precision)
> {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + //
> > + // Other APIs
> > + //
> > +
> > + @Override
> > + public final void setDictionary(Dictionary dictionary) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + public final ColumnVector reserveDictionaryIds(int capacity) {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + @Override
> > + protected final void reserveInternal(int newCapacity) {
> > + throw new UnsupportedOperationException();
> > + }
> > +}
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/main/scala/org/apache/spark/sql/execution/arrow/
> ArrowConverters.scala
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConverters.scala
> > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConverters.scala
> > index 6af5c73..c913efe 100644
> > ---
> > a/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConverters.scala
> > +++
> > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConverters.scala
> > @@ -71,34 +71,6 @@ private[sql] object ArrowPayload {
> > private[sql] object ArrowConverters {
> >
> > /**
> > - * Map a Spark DataType to ArrowType.
> > - */
> > - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType
> > = {
> > - dataType match {
> > - case BooleanType => ArrowType.Bool.INSTANCE
> > - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize,
> > true)
> > - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize,
> > true)
> > - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
> > - case FloatType => new
> > ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
> > - case DoubleType => new
> > ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
> > - case ByteType => new ArrowType.Int(8, true)
> > - case StringType => ArrowType.Utf8.INSTANCE
> > - case BinaryType => ArrowType.Binary.INSTANCE
> > - case _ => throw new UnsupportedOperationException(s"Unsupported
> > data type: $dataType")
> > - }
> > - }
> > -
> > - /**
> > - * Convert a Spark Dataset schema to Arrow schema.
> > - */
> > - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
> > - val arrowFields = schema.fields.map { f =>
> > - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType),
> > List.empty[Field].asJava)
> > - }
> > - new Schema(arrowFields.toList.asJava)
> > - }
> > -
> > - /**
> > * Maps Iterator from InternalRow to ArrowPayload. Limit
> > ArrowRecordBatch size in ArrowPayload
> > * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
> > */
> > @@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
> > batch: ArrowRecordBatch,
> > schema: StructType,
> > allocator: BufferAllocator): Array[Byte] = {
> > - val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
> > + val arrowSchema = ArrowUtils.toArrowSchema(schema)
> > val root = VectorSchemaRoot.create(arrowSchema, allocator)
> > val out = new ByteArrayOutputStream()
> > val writer = new ArrowFileWriter(root, null,
> > Channels.newChannel(out))
> > @@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
> > * Create an Arrow ColumnWriter given the type and ordinal of row.
> > */
> > def apply(dataType: DataType, ordinal: Int, allocator:
> > BufferAllocator): ColumnWriter = {
> > - val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
> > + val dtype = ArrowUtils.toArrowType(dataType)
> > dataType match {
> > case BooleanType => new BooleanColumnWriter(dtype, ordinal,
> > allocator)
> > case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/main/scala/org/apache/spark/sql/execution/arrow/
> ArrowUtils.scala
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowUtils.scala
> > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowUtils.scala
> > new file mode 100644
> > index 0000000..2caf1ef
> > --- /dev/null
> > +++
> > b/sql/core/src/main/scala/org/apache/spark/sql/execution/arr
> ow/ArrowUtils.scala
> > @@ -0,0 +1,109 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.spark.sql.execution.arrow
> > +
> > +import scala.collection.JavaConverters._
> > +
> > +import org.apache.arrow.memory.RootAllocator
> > +import org.apache.arrow.vector.types.FloatingPointPrecision
> > +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType,
> > Schema}
> > +
> > +import org.apache.spark.sql.types._
> > +
> > +object ArrowUtils {
> > +
> > + val rootAllocator = new RootAllocator(Long.MaxValue)
> > +
> > + // todo: support more types.
> > +
> > + def toArrowType(dt: DataType): ArrowType = dt match {
> > + case BooleanType => ArrowType.Bool.INSTANCE
> > + case ByteType => new ArrowType.Int(8, true)
> > + case ShortType => new ArrowType.Int(8 * 2, true)
> > + case IntegerType => new ArrowType.Int(8 * 4, true)
> > + case LongType => new ArrowType.Int(8 * 8, true)
> > + case FloatType => new
> > ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
> > + case DoubleType => new
> > ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
> > + case StringType => ArrowType.Utf8.INSTANCE
> > + case BinaryType => ArrowType.Binary.INSTANCE
> > + case DecimalType.Fixed(precision, scale) => new
> > ArrowType.Decimal(precision, scale)
> > + case _ => throw new UnsupportedOperationException(s"Unsupported
> > data type: ${dt.simpleString}")
> > + }
> > +
> > + def fromArrowType(dt: ArrowType): DataType = dt match {
> > + case ArrowType.Bool.INSTANCE => BooleanType
> > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> > 8 => ByteType
> > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> > 8 * 2 => ShortType
> > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> > 8 * 4 => IntegerType
> > + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> > 8 * 8 => LongType
> > + case float: ArrowType.FloatingPoint
> > + if float.getPrecision() == FloatingPointPrecision.SINGLE =>
> > FloatType
> > + case float: ArrowType.FloatingPoint
> > + if float.getPrecision() == FloatingPointPrecision.DOUBLE =>
> > DoubleType
> > + case ArrowType.Utf8.INSTANCE => StringType
> > + case ArrowType.Binary.INSTANCE => BinaryType
> > + case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
> > + case _ => throw new UnsupportedOperationException(s"Unsupported
> > data type: $dt")
> > + }
> > +
> > + def toArrowField(name: String, dt: DataType, nullable: Boolean): Field
> > = {
> > + dt match {
> > + case ArrayType(elementType, containsNull) =>
> > + val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE,
> > null)
> > + new Field(name, fieldType, Seq(toArrowField("element",
> > elementType, containsNull)).asJava)
> > + case StructType(fields) =>
> > + val fieldType = new FieldType(nullable,
> > ArrowType.Struct.INSTANCE, null)
> > + new Field(name, fieldType,
> > + fields.map { field =>
> > + toArrowField(field.name, field.dataType, field.nullable)
> > + }.toSeq.asJava)
> > + case dataType =>
> > + val fieldType = new FieldType(nullable, toArrowType(dataType),
> > null)
> > + new Field(name, fieldType, Seq.empty[Field].asJava)
> > + }
> > + }
> > +
> > + def fromArrowField(field: Field): DataType = {
> > + field.getType match {
> > + case ArrowType.List.INSTANCE =>
> > + val elementField = field.getChildren().get(0)
> > + val elementType = fromArrowField(elementField)
> > + ArrayType(elementType, containsNull = elementField.isNullable)
> > + case ArrowType.Struct.INSTANCE =>
> > + val fields = field.getChildren().asScala.map { child =>
> > + val dt = fromArrowField(child)
> > + StructField(child.getName, dt, child.isNullable)
> > + }
> > + StructType(fields)
> > + case arrowType => fromArrowType(arrowType)
> > + }
> > + }
> > +
> > + def toArrowSchema(schema: StructType): Schema = {
> > + new Schema(schema.map { field =>
> > + toArrowField(field.name, field.dataType, field.nullable)
> > + }.asJava)
> > + }
> > +
> > + def fromArrowSchema(schema: Schema): StructType = {
> > + StructType(schema.getFields.asScala.map { field =>
> > + val dt = fromArrowField(field)
> > + StructField(field.getName, dt, field.isNullable)
> > + })
> > + }
> > +}
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/test/scala/org/apache/spark/sql/execution/arrow/
> ArrowConvertersSuite.scala
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConvertersSuite.scala
> > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConvertersSuite.scala
> > index 159328c..55b4655 100644
> > ---
> > a/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConvertersSuite.scala
> > +++
> > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowConvertersSuite.scala
> > @@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends
> > SharedSQLContext with BeforeAndAfterAll {
> > val allocator = new RootAllocator(Long.MaxValue)
> > val jsonReader = new JsonFileReader(jsonFile, allocator)
> >
> > - val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema)
> > + val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema)
> > val jsonSchema = jsonReader.start()
> > Validator.compareSchemas(arrowSchema, jsonSchema)
> >
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/test/scala/org/apache/spark/sql/execution/arrow/
> ArrowUtilsSuite.scala
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowUtilsSuite.scala
> > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowUtilsSuite.scala
> > new file mode 100644
> > index 0000000..638619f
> > --- /dev/null
> > +++
> > b/sql/core/src/test/scala/org/apache/spark/sql/execution/arr
> ow/ArrowUtilsSuite.scala
> > @@ -0,0 +1,65 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.spark.sql.execution.arrow
> > +
> > +import org.apache.spark.SparkFunSuite
> > +import org.apache.spark.sql.types._
> > +
> > +class ArrowUtilsSuite extends SparkFunSuite {
> > +
> > + def roundtrip(dt: DataType): Unit = {
> > + dt match {
> > + case schema: StructType =>
> > +
> > assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema))
> > === schema)
> > + case _ =>
> > + roundtrip(new StructType().add("value", dt))
> > + }
> > + }
> > +
> > + test("simple") {
> > + roundtrip(BooleanType)
> > + roundtrip(ByteType)
> > + roundtrip(ShortType)
> > + roundtrip(IntegerType)
> > + roundtrip(LongType)
> > + roundtrip(FloatType)
> > + roundtrip(DoubleType)
> > + roundtrip(StringType)
> > + roundtrip(BinaryType)
> > + roundtrip(DecimalType.SYSTEM_DEFAULT)
> > + }
> > +
> > + test("array") {
> > + roundtrip(ArrayType(IntegerType, containsNull = true))
> > + roundtrip(ArrayType(IntegerType, containsNull = false))
> > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
> > containsNull = true))
> > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
> > containsNull = true))
> > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
> > containsNull = false))
> > + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
> > containsNull = false))
> > + }
> > +
> > + test("struct") {
> > + roundtrip(new StructType())
> > + roundtrip(new StructType().add("i", IntegerType))
> > + roundtrip(new StructType().add("arr", ArrayType(IntegerType)))
> > + roundtrip(new StructType().add("i", IntegerType).add("arr",
> > ArrayType(IntegerType)))
> > + roundtrip(new StructType().add(
> > + "struct",
> > + new StructType().add("i", IntegerType).add("arr",
> > ArrayType(IntegerType))))
> > + }
> > +}
> >
> > http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/s
> ql/core/src/test/scala/org/apache/spark/sql/execution/vector
> ized/ArrowColumnVectorSuite.scala
> > ----------------------------------------------------------------------
> > diff --git
> > a/sql/core/src/test/scala/org/apache/spark/sql/execution/vec
> torized/ArrowColumnVectorSuite.scala
> > b/sql/core/src/test/scala/org/apache/spark/sql/execution/vec
> torized/ArrowColumnVectorSuite.scala
> > new file mode 100644
> > index 0000000..d24a9e1
> > --- /dev/null
> > +++
> > b/sql/core/src/test/scala/org/apache/spark/sql/execution/vec
> torized/ArrowColumnVectorSuite.scala
> > @@ -0,0 +1,410 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> > 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> > implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.spark.sql.execution.vectorized
> > +
> > +import org.apache.arrow.vector._
> > +import org.apache.arrow.vector.complex._
> > +
> > +import org.apache.spark.SparkFunSuite
> > +import org.apache.spark.sql.execution.arrow.ArrowUtils
> > +import org.apache.spark.sql.types._
> > +import org.apache.spark.unsafe.types.UTF8String
> > +
> > +class ArrowColumnVectorSuite extends SparkFunSuite {
> > +
> > + test("boolean") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("boolean", 0,
> > Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("boolean", BooleanType,
> > nullable = true)
> > + .createVector(allocator).asInstanceOf[NullableBitVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, if (i % 2 == 0) 1 else 0)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === BooleanType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getBoolean(i) === (i % 2 == 0))
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i =>
> > (i % 2 == 0)))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("byte") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("byte", ByteType, nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableTinyIntVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, i.toByte)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === ByteType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getByte(i) === i.toByte)
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getBytes(0, 10) === (0 until 10).map(i =>
> > i.toByte))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("short") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("short", ShortType, nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableSmallIntVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, i.toShort)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === ShortType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getShort(i) === i.toShort)
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getShorts(0, 10) === (0 until 10).map(i =>
> > i.toShort))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("int") {
> > + val allocator = ArrowUtils.rootAllocator.newChildAllocator("int",
> > 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("int", IntegerType, nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableIntVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, i)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === IntegerType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getInt(i) === i)
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getInts(0, 10) === (0 until 10))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("long") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("long", LongType, nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableBigIntVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, i.toLong)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === LongType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getLong(i) === i.toLong)
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getLongs(0, 10) === (0 until 10).map(i =>
> > i.toLong))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("float") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("float", FloatType, nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableFloat4Vector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, i.toFloat)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === FloatType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getFloat(i) === i.toFloat)
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getFloats(0, 10) === (0 until 10).map(i =>
> > i.toFloat))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("double") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("double", DoubleType,
> nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableFloat8Vector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + mutator.setSafe(i, i.toDouble)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === DoubleType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getDouble(i) === i.toDouble)
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i =>
> > i.toDouble))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("string") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("string", StringType,
> nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableVarCharVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + val utf8 = s"str$i".getBytes("utf8")
> > + mutator.setSafe(i, utf8, 0, utf8.length)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === StringType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getUTF8String(i) ===
> > UTF8String.fromString(s"str$i"))
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("binary") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("binary", BinaryType,
> nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableVarBinaryVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > +
> > + (0 until 10).foreach { i =>
> > + val utf8 = s"str$i".getBytes("utf8")
> > + mutator.setSafe(i, utf8, 0, utf8.length)
> > + }
> > + mutator.setNull(10)
> > + mutator.setValueCount(11)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === BinaryType)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + (0 until 10).foreach { i =>
> > + assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8"))
> > + }
> > + assert(columnVector.isNullAt(10))
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("array") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue)
> > + val vector = ArrowUtils.toArrowField("array",
> > ArrayType(IntegerType), nullable = true)
> > + .createVector(allocator).asInstanceOf[ListVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > + val elementVector =
> > vector.getDataVector().asInstanceOf[NullableIntVector]
> > + val elementMutator = elementVector.getMutator()
> > +
> > + // [1, 2]
> > + mutator.startNewValue(0)
> > + elementMutator.setSafe(0, 1)
> > + elementMutator.setSafe(1, 2)
> > + mutator.endValue(0, 2)
> > +
> > + // [3, null, 5]
> > + mutator.startNewValue(1)
> > + elementMutator.setSafe(2, 3)
> > + elementMutator.setNull(3)
> > + elementMutator.setSafe(4, 5)
> > + mutator.endValue(1, 3)
> > +
> > + // null
> > +
> > + // []
> > + mutator.startNewValue(3)
> > + mutator.endValue(3, 0)
> > +
> > + elementMutator.setValueCount(5)
> > + mutator.setValueCount(4)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === ArrayType(IntegerType))
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + val array0 = columnVector.getArray(0)
> > + assert(array0.numElements() === 2)
> > + assert(array0.getInt(0) === 1)
> > + assert(array0.getInt(1) === 2)
> > +
> > + val array1 = columnVector.getArray(1)
> > + assert(array1.numElements() === 3)
> > + assert(array1.getInt(0) === 3)
> > + assert(array1.isNullAt(1))
> > + assert(array1.getInt(2) === 5)
> > +
> > + assert(columnVector.isNullAt(2))
> > +
> > + val array3 = columnVector.getArray(3)
> > + assert(array3.numElements() === 0)
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +
> > + test("struct") {
> > + val allocator =
> > ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
> > + val schema = new StructType().add("int", IntegerType).add("long",
> > LongType)
> > + val vector = ArrowUtils.toArrowField("struct", schema, nullable =
> > true)
> > + .createVector(allocator).asInstanceOf[NullableMapVector]
> > + vector.allocateNew()
> > + val mutator = vector.getMutator()
> > + val intVector =
> > vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector]
> > + val intMutator = intVector.getMutator()
> > + val longVector =
> > vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector]
> > + val longMutator = longVector.getMutator()
> > +
> > + // (1, 1L)
> > + mutator.setIndexDefined(0)
> > + intMutator.setSafe(0, 1)
> > + longMutator.setSafe(0, 1L)
> > +
> > + // (2, null)
> > + mutator.setIndexDefined(1)
> > + intMutator.setSafe(1, 2)
> > + longMutator.setNull(1)
> > +
> > + // (null, 3L)
> > + mutator.setIndexDefined(2)
> > + intMutator.setNull(2)
> > + longMutator.setSafe(2, 3L)
> > +
> > + // null
> > + mutator.setNull(3)
> > +
> > + // (5, 5L)
> > + mutator.setIndexDefined(4)
> > + intMutator.setSafe(4, 5)
> > + longMutator.setSafe(4, 5L)
> > +
> > + intMutator.setValueCount(5)
> > + longMutator.setValueCount(5)
> > + mutator.setValueCount(5)
> > +
> > + val columnVector = new ArrowColumnVector(vector)
> > + assert(columnVector.dataType === schema)
> > + assert(columnVector.anyNullsSet)
> > + assert(columnVector.numNulls === 1)
> > +
> > + val row0 = columnVector.getStruct(0, 2)
> > + assert(row0.getInt(0) === 1)
> > + assert(row0.getLong(1) === 1L)
> > +
> > + val row1 = columnVector.getStruct(1, 2)
> > + assert(row1.getInt(0) === 2)
> > + assert(row1.isNullAt(1))
> > +
> > + val row2 = columnVector.getStruct(2, 2)
> > + assert(row2.isNullAt(0))
> > + assert(row2.getLong(1) === 3L)
> > +
> > + assert(columnVector.isNullAt(3))
> > +
> > + val row4 = columnVector.getStruct(4, 2)
> > + assert(row4.getInt(0) === 5)
> > + assert(row4.getLong(1) === 5L)
> > +
> > + columnVector.close()
> > + allocator.close()
> > + }
> > +}
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail:
>
> > commits-unsubscribe@.apache
>
> > For additional commands, e-mail:
>
> > commits-help@.apache
>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-developers
> -list.1001551.n3.nabble.com/Fwd-spark-git-commit-SPARK-
> 21472-SQL-Introduce-ArrowColumnVector-as-a-reader-
> for-Arrow-vectors-tp22003p22004.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>
--
Takuya UESHIN
Tokyo, Japan
http://twitter.com/ueshin
Re: Fwd: spark git commit: [SPARK-21472][SQL] Introduce
ArrowColumnVector as a reader for Arrow vectors.
Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Yeah, I think it should be "ColumnVector.Array". Already ping @ueshin for
this issue.
Jacek Laskowski wrote
> Hi,
>
> Looks like the change has broken the build for me:
>
> [INFO] --- scala-maven-plugin:3.2.2:doc-jar (attach-scaladocs) @
> spark-sql_2.11 ---
> /Users/jacek/dev/oss/spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java:243:
> error: not found: type Array
> public void loadBytes(Array array) {
> ^
>
> ...
>
> 222 warnings found
> one error found
> [INFO]
> ------------------------------------------------------------------------
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Spark Project Parent POM ........................... SUCCESS [
> 4.864 s]
> [INFO] Spark Project Tags ................................. SUCCESS [
> 5.689 s]
> [INFO] Spark Project Sketch ............................... SUCCESS [
> 4.646 s]
> [INFO] Spark Project Local DB ............................. SUCCESS [
> 6.074 s]
> [INFO] Spark Project Networking ........................... SUCCESS [
> 10.305 s]
> [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [
> 7.355 s]
> [INFO] Spark Project Unsafe ............................... SUCCESS [
> 7.639 s]
> [INFO] Spark Project Launcher ............................. SUCCESS [
> 10.364 s]
> [INFO] Spark Project Core ................................. SUCCESS [02:01
> min]
> [INFO] Spark Project ML Local Library ..................... SUCCESS [
> 9.711 s]
> [INFO] Spark Project GraphX ............................... SUCCESS [
> 16.652 s]
> [INFO] Spark Project Streaming ............................ SUCCESS [
> 36.845 s]
> [INFO] Spark Project Catalyst ............................. SUCCESS [01:41
> min]
> [INFO] Spark Project SQL .................................. FAILURE [02:14
> min]
>
> Is this only me or others suffer from it too?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
>
> ---------- Forwarded message ----------
> From: <
> wenchen@
> >
> Date: Thu, Jul 20, 2017 at 3:00 PM
> Subject: spark git commit: [SPARK-21472][SQL] Introduce
> ArrowColumnVector as a reader for Arrow vectors.
> To:
> commits@.apache
>
>
> Repository: spark
> Updated Branches:
> refs/heads/master 5d1850d4b -> cb19880cd
>
>
> [SPARK-21472][SQL] Introduce ArrowColumnVector as a reader for Arrow
> vectors.
>
> ## What changes were proposed in this pull request?
>
> Introducing `ArrowColumnVector` as a reader for Arrow vectors.
> It extends `ColumnVector`, so we will be able to use it with
> `ColumnarBatch` and its functionalities.
> Currently it supports primitive types and `StringType`, `ArrayType`
> and `StructType`.
>
> ## How was this patch tested?
>
> Added tests for `ArrowColumnVector` and existing tests.
>
> Author: Takuya UESHIN <
> ueshin@
> >
>
> Closes #18680 from ueshin/issues/SPARK-21472.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/spark/repo
> Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb19880c
> Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb19880c
> Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb19880c
>
> Branch: refs/heads/master
> Commit: cb19880cd8d54d09fdd13cfad1914b8b36328a5a
> Parents: 5d1850d
> Author: Takuya UESHIN <
> ueshin@
> >
> Authored: Thu Jul 20 21:00:30 2017 +0800
> Committer: Wenchen Fan <
> wenchen@
> >
> Committed: Thu Jul 20 21:00:30 2017 +0800
>
> ----------------------------------------------------------------------
> .../execution/vectorized/ArrowColumnVector.java | 590 +++++++++++++++++++
> .../sql/execution/vectorized/ColumnVector.java | 16 +-
> .../vectorized/ReadOnlyColumnVector.java | 251 ++++++++
> .../sql/execution/arrow/ArrowConverters.scala | 32 +-
> .../spark/sql/execution/arrow/ArrowUtils.scala | 109 ++++
> .../execution/arrow/ArrowConvertersSuite.scala | 2 +-
> .../sql/execution/arrow/ArrowUtilsSuite.scala | 65 ++
> .../vectorized/ArrowColumnVectorSuite.scala | 410 +++++++++++++
> 8 files changed, 1436 insertions(+), 39 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> new file mode 100644
> index 0000000..68e0abc
> --- /dev/null
> +++
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
> @@ -0,0 +1,590 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.vectorized;
> +
> +import org.apache.arrow.vector.*;
> +import org.apache.arrow.vector.complex.*;
> +import org.apache.arrow.vector.holders.NullableVarCharHolder;
> +
> +import org.apache.spark.memory.MemoryMode;
> +import org.apache.spark.sql.execution.arrow.ArrowUtils;
> +import org.apache.spark.sql.types.*;
> +import org.apache.spark.unsafe.types.UTF8String;
> +
> +/**
> + * A column vector backed by Apache Arrow.
> + */
> +public final class ArrowColumnVector extends ReadOnlyColumnVector {
> +
> + private final ArrowVectorAccessor accessor;
> + private final int valueCount;
> +
> + private void ensureAccessible(int index) {
> + if (index < 0 || index >= valueCount) {
> + throw new IndexOutOfBoundsException(
> + String.format("index: %d, valueCount: %d", index, valueCount));
> + }
> + }
> +
> + private void ensureAccessible(int index, int count) {
> + if (index < 0 || index + count > valueCount) {
> + throw new IndexOutOfBoundsException(
> + String.format("index range: [%d, %d), valueCount: %d", index,
> index + count, valueCount));
> + }
> + }
> +
> + @Override
> + public long nullsNativeAddress() {
> + throw new RuntimeException("Cannot get native address for arrow
> column");
> + }
> +
> + @Override
> + public long valuesNativeAddress() {
> + throw new RuntimeException("Cannot get native address for arrow
> column");
> + }
> +
> + @Override
> + public void close() {
> + if (childColumns != null) {
> + for (int i = 0; i < childColumns.length; i++) {
> + childColumns[i].close();
> + }
> + }
> + accessor.close();
> + }
> +
> + //
> + // APIs dealing with nulls
> + //
> +
> + @Override
> + public boolean isNullAt(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.isNullAt(rowId);
> + }
> +
> + //
> + // APIs dealing with Booleans
> + //
> +
> + @Override
> + public boolean getBoolean(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getBoolean(rowId);
> + }
> +
> + @Override
> + public boolean[] getBooleans(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + boolean[] array = new boolean[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getBoolean(rowId + i);
> + }
> + return array;
> + }
> +
> + //
> + // APIs dealing with Bytes
> + //
> +
> + @Override
> + public byte getByte(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getByte(rowId);
> + }
> +
> + @Override
> + public byte[] getBytes(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + byte[] array = new byte[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getByte(rowId + i);
> + }
> + return array;
> + }
> +
> + //
> + // APIs dealing with Shorts
> + //
> +
> + @Override
> + public short getShort(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getShort(rowId);
> + }
> +
> + @Override
> + public short[] getShorts(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + short[] array = new short[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getShort(rowId + i);
> + }
> + return array;
> + }
> +
> + //
> + // APIs dealing with Ints
> + //
> +
> + @Override
> + public int getInt(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getInt(rowId);
> + }
> +
> + @Override
> + public int[] getInts(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + int[] array = new int[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getInt(rowId + i);
> + }
> + return array;
> + }
> +
> + @Override
> + public int getDictId(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Longs
> + //
> +
> + @Override
> + public long getLong(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getLong(rowId);
> + }
> +
> + @Override
> + public long[] getLongs(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + long[] array = new long[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getLong(rowId + i);
> + }
> + return array;
> + }
> +
> + //
> + // APIs dealing with floats
> + //
> +
> + @Override
> + public float getFloat(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getFloat(rowId);
> + }
> +
> + @Override
> + public float[] getFloats(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + float[] array = new float[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getFloat(rowId + i);
> + }
> + return array;
> + }
> +
> + //
> + // APIs dealing with doubles
> + //
> +
> + @Override
> + public double getDouble(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getDouble(rowId);
> + }
> +
> + @Override
> + public double[] getDoubles(int rowId, int count) {
> + ensureAccessible(rowId, count);
> + double[] array = new double[count];
> + for (int i = 0; i < count; ++i) {
> + array[i] = accessor.getDouble(rowId + i);
> + }
> + return array;
> + }
> +
> + //
> + // APIs dealing with Arrays
> + //
> +
> + @Override
> + public int getArrayLength(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getArrayLength(rowId);
> + }
> +
> + @Override
> + public int getArrayOffset(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getArrayOffset(rowId);
> + }
> +
> + @Override
> + public void loadBytes(Array array) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Decimals
> + //
> +
> + @Override
> + public Decimal getDecimal(int rowId, int precision, int scale) {
> + ensureAccessible(rowId);
> + return accessor.getDecimal(rowId, precision, scale);
> + }
> +
> + //
> + // APIs dealing with UTF8Strings
> + //
> +
> + @Override
> + public UTF8String getUTF8String(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getUTF8String(rowId);
> + }
> +
> + //
> + // APIs dealing with Binaries
> + //
> +
> + @Override
> + public byte[] getBinary(int rowId) {
> + ensureAccessible(rowId);
> + return accessor.getBinary(rowId);
> + }
> +
> + public ArrowColumnVector(ValueVector vector) {
> + super(vector.getValueCapacity(),
> ArrowUtils.fromArrowField(vector.getField()),
> + MemoryMode.OFF_HEAP);
> +
> + if (vector instanceof NullableBitVector) {
> + accessor = new BooleanAccessor((NullableBitVector) vector);
> + } else if (vector instanceof NullableTinyIntVector) {
> + accessor = new ByteAccessor((NullableTinyIntVector) vector);
> + } else if (vector instanceof NullableSmallIntVector) {
> + accessor = new ShortAccessor((NullableSmallIntVector) vector);
> + } else if (vector instanceof NullableIntVector) {
> + accessor = new IntAccessor((NullableIntVector) vector);
> + } else if (vector instanceof NullableBigIntVector) {
> + accessor = new LongAccessor((NullableBigIntVector) vector);
> + } else if (vector instanceof NullableFloat4Vector) {
> + accessor = new FloatAccessor((NullableFloat4Vector) vector);
> + } else if (vector instanceof NullableFloat8Vector) {
> + accessor = new DoubleAccessor((NullableFloat8Vector) vector);
> + } else if (vector instanceof NullableDecimalVector) {
> + accessor = new DecimalAccessor((NullableDecimalVector) vector);
> + } else if (vector instanceof NullableVarCharVector) {
> + accessor = new StringAccessor((NullableVarCharVector) vector);
> + } else if (vector instanceof NullableVarBinaryVector) {
> + accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
> + } else if (vector instanceof ListVector) {
> + ListVector listVector = (ListVector) vector;
> + accessor = new ArrayAccessor(listVector);
> +
> + childColumns = new ColumnVector[1];
> + childColumns[0] = new
> ArrowColumnVector(listVector.getDataVector());
> + resultArray = new Array(childColumns[0]);
> + } else if (vector instanceof MapVector) {
> + MapVector mapVector = (MapVector) vector;
> + accessor = new StructAccessor(mapVector);
> +
> + childColumns = new ArrowColumnVector[mapVector.size()];
> + for (int i = 0; i < childColumns.length; ++i) {
> + childColumns[i] = new
> ArrowColumnVector(mapVector.getVectorById(i));
> + }
> + resultStruct = new ColumnarBatch.Row(childColumns);
> + } else {
> + throw new UnsupportedOperationException();
> + }
> + valueCount = accessor.getValueCount();
> + numNulls = accessor.getNullCount();
> + anyNullsSet = numNulls > 0;
> + }
> +
> + private static abstract class ArrowVectorAccessor {
> +
> + private final ValueVector vector;
> + private final ValueVector.Accessor nulls;
> +
> + private final int valueCount;
> + private final int nullCount;
> +
> + ArrowVectorAccessor(ValueVector vector) {
> + this.vector = vector;
> + this.nulls = vector.getAccessor();
> + this.valueCount = nulls.getValueCount();
> + this.nullCount = nulls.getNullCount();
> + }
> +
> + final boolean isNullAt(int rowId) {
> + return nulls.isNull(rowId);
> + }
> +
> + final int getValueCount() {
> + return valueCount;
> + }
> +
> + final int getNullCount() {
> + return nullCount;
> + }
> +
> + final void close() {
> + vector.close();
> + }
> +
> + boolean getBoolean(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + byte getByte(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + short getShort(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + int getInt(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + long getLong(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + float getFloat(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + double getDouble(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + Decimal getDecimal(int rowId, int precision, int scale) {
> + throw new UnsupportedOperationException();
> + }
> +
> + UTF8String getUTF8String(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + byte[] getBinary(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + int getArrayLength(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + int getArrayOffset(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> + }
> +
> + private static class BooleanAccessor extends ArrowVectorAccessor {
> +
> + private final NullableBitVector.Accessor accessor;
> +
> + BooleanAccessor(NullableBitVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final boolean getBoolean(int rowId) {
> + return accessor.get(rowId) == 1;
> + }
> + }
> +
> + private static class ByteAccessor extends ArrowVectorAccessor {
> +
> + private final NullableTinyIntVector.Accessor accessor;
> +
> + ByteAccessor(NullableTinyIntVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final byte getByte(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class ShortAccessor extends ArrowVectorAccessor {
> +
> + private final NullableSmallIntVector.Accessor accessor;
> +
> + ShortAccessor(NullableSmallIntVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final short getShort(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class IntAccessor extends ArrowVectorAccessor {
> +
> + private final NullableIntVector.Accessor accessor;
> +
> + IntAccessor(NullableIntVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final int getInt(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class LongAccessor extends ArrowVectorAccessor {
> +
> + private final NullableBigIntVector.Accessor accessor;
> +
> + LongAccessor(NullableBigIntVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final long getLong(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class FloatAccessor extends ArrowVectorAccessor {
> +
> + private final NullableFloat4Vector.Accessor accessor;
> +
> + FloatAccessor(NullableFloat4Vector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final float getFloat(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class DoubleAccessor extends ArrowVectorAccessor {
> +
> + private final NullableFloat8Vector.Accessor accessor;
> +
> + DoubleAccessor(NullableFloat8Vector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final double getDouble(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class DecimalAccessor extends ArrowVectorAccessor {
> +
> + private final NullableDecimalVector.Accessor accessor;
> +
> + DecimalAccessor(NullableDecimalVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final Decimal getDecimal(int rowId, int precision, int scale) {
> + if (isNullAt(rowId)) return null;
> + return Decimal.apply(accessor.getObject(rowId), precision, scale);
> + }
> + }
> +
> + private static class StringAccessor extends ArrowVectorAccessor {
> +
> + private final NullableVarCharVector.Accessor accessor;
> + private final NullableVarCharHolder stringResult = new
> NullableVarCharHolder();
> +
> + StringAccessor(NullableVarCharVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final UTF8String getUTF8String(int rowId) {
> + accessor.get(rowId, stringResult);
> + if (stringResult.isSet == 0) {
> + return null;
> + } else {
> + return UTF8String.fromAddress(null,
> + stringResult.buffer.memoryAddress() + stringResult.start,
> + stringResult.end - stringResult.start);
> + }
> + }
> + }
> +
> + private static class BinaryAccessor extends ArrowVectorAccessor {
> +
> + private final NullableVarBinaryVector.Accessor accessor;
> +
> + BinaryAccessor(NullableVarBinaryVector vector) {
> + super(vector);
> + this.accessor = vector.getAccessor();
> + }
> +
> + @Override
> + final byte[] getBinary(int rowId) {
> + return accessor.getObject(rowId);
> + }
> + }
> +
> + private static class ArrayAccessor extends ArrowVectorAccessor {
> +
> + private final UInt4Vector.Accessor accessor;
> +
> + ArrayAccessor(ListVector vector) {
> + super(vector);
> + this.accessor = vector.getOffsetVector().getAccessor();
> + }
> +
> + @Override
> + final int getArrayLength(int rowId) {
> + return accessor.get(rowId + 1) - accessor.get(rowId);
> + }
> +
> + @Override
> + final int getArrayOffset(int rowId) {
> + return accessor.get(rowId);
> + }
> + }
> +
> + private static class StructAccessor extends ArrowVectorAccessor {
> +
> + StructAccessor(MapVector vector) {
> + super(vector);
> + }
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> index 0c027f8..7796638 100644
> ---
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> +++
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
> @@ -646,7 +646,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
> /**
> * Returns the decimal for rowId.
> */
> - public final Decimal getDecimal(int rowId, int precision, int scale) {
> + public Decimal getDecimal(int rowId, int precision, int scale) {
> if (precision <= Decimal.MAX_INT_DIGITS()) {
> return Decimal.createUnsafe(getInt(rowId), precision, scale);
> } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
> @@ -661,7 +661,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
> }
>
>
> - public final void putDecimal(int rowId, Decimal value, int precision) {
> + public void putDecimal(int rowId, Decimal value, int precision) {
> if (precision <= Decimal.MAX_INT_DIGITS()) {
> putInt(rowId, (int) value.toUnscaledLong());
> } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
> @@ -675,7 +675,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
> /**
> * Returns the UTF8String for rowId.
> */
> - public final UTF8String getUTF8String(int rowId) {
> + public UTF8String getUTF8String(int rowId) {
> if (dictionary == null) {
> ColumnVector.Array a = getByteArray(rowId);
> return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset,
> a.length);
> @@ -688,7 +688,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
> /**
> * Returns the byte array for rowId.
> */
> - public final byte[] getBinary(int rowId) {
> + public byte[] getBinary(int rowId) {
> if (dictionary == null) {
> ColumnVector.Array array = getByteArray(rowId);
> byte[] bytes = new byte[array.length];
> @@ -956,7 +956,7 @@ public abstract class ColumnVector implements
> AutoCloseable {
> /**
> * Data type for this column.
> */
> - protected final DataType type;
> + protected DataType type;
>
> /**
> * Number of nulls in this column. This is an optimization for the
> reader, to skip NULL checks.
> @@ -988,17 +988,17 @@ public abstract class ColumnVector implements
> AutoCloseable {
> /**
> * If this is a nested type (array or struct), the column for the child
> data.
> */
> - protected final ColumnVector[] childColumns;
> + protected ColumnVector[] childColumns;
>
> /**
> * Reusable Array holder for getArray().
> */
> - protected final Array resultArray;
> + protected Array resultArray;
>
> /**
> * Reusable Struct holder for getStruct().
> */
> - protected final ColumnarBatch.Row resultStruct;
> + protected ColumnarBatch.Row resultStruct;
>
> /**
> * The Dictionary for this column.
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> new file mode 100644
> index 0000000..e9f6e7c
> --- /dev/null
> +++
> b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
> @@ -0,0 +1,251 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.vectorized;
> +
> +import org.apache.spark.memory.MemoryMode;
> +import org.apache.spark.sql.types.*;
> +
> +/**
> + * An abstract class for read-only column vector.
> + */
> +public abstract class ReadOnlyColumnVector extends ColumnVector {
> +
> + protected ReadOnlyColumnVector(int capacity, DataType type,
> MemoryMode memMode) {
> + super(capacity, DataTypes.NullType, memMode);
> + this.type = type;
> + isConstant = true;
> + }
> +
> + //
> + // APIs dealing with nulls
> + //
> +
> + @Override
> + public final void putNotNull(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putNull(int rowId) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putNulls(int rowId, int count) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putNotNulls(int rowId, int count) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Booleans
> + //
> +
> + @Override
> + public final void putBoolean(int rowId, boolean value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putBooleans(int rowId, int count, boolean value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Bytes
> + //
> +
> + @Override
> + public final void putByte(int rowId, byte value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putBytes(int rowId, int count, byte value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putBytes(int rowId, int count, byte[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Shorts
> + //
> +
> + @Override
> + public final void putShort(int rowId, short value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putShorts(int rowId, int count, short value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putShorts(int rowId, int count, short[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Ints
> + //
> +
> + @Override
> + public final void putInt(int rowId, int value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putInts(int rowId, int count, int value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putInts(int rowId, int count, int[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putIntsLittleEndian(int rowId, int count, byte[]
> src, int srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Longs
> + //
> +
> + @Override
> + public final void putLong(int rowId, long value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putLongs(int rowId, int count, long value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putLongs(int rowId, int count, long[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putLongsLittleEndian(int rowId, int count, byte[]
> src, int srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with floats
> + //
> +
> + @Override
> + public final void putFloat(int rowId, float value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putFloats(int rowId, int count, float value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putFloats(int rowId, int count, float[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putFloats(int rowId, int count, byte[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with doubles
> + //
> +
> + @Override
> + public final void putDouble(int rowId, double value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putDoubles(int rowId, int count, double value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putDoubles(int rowId, int count, double[] src,
> int srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final void putDoubles(int rowId, int count, byte[] src, int
> srcIndex) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Arrays
> + //
> +
> + @Override
> + public final void putArray(int rowId, int offset, int length) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Byte Arrays
> + //
> +
> + @Override
> + public final int putByteArray(int rowId, byte[] value, int offset,
> int count) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // APIs dealing with Decimals
> + //
> +
> + @Override
> + public final void putDecimal(int rowId, Decimal value, int precision) {
> + throw new UnsupportedOperationException();
> + }
> +
> + //
> + // Other APIs
> + //
> +
> + @Override
> + public final void setDictionary(Dictionary dictionary) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public final ColumnVector reserveDictionaryIds(int capacity) {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + protected final void reserveInternal(int newCapacity) {
> + throw new UnsupportedOperationException();
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> index 6af5c73..c913efe 100644
> ---
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> +++
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
> @@ -71,34 +71,6 @@ private[sql] object ArrowPayload {
> private[sql] object ArrowConverters {
>
> /**
> - * Map a Spark DataType to ArrowType.
> - */
> - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType
> = {
> - dataType match {
> - case BooleanType => ArrowType.Bool.INSTANCE
> - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize,
> true)
> - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize,
> true)
> - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
> - case FloatType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
> - case DoubleType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
> - case ByteType => new ArrowType.Int(8, true)
> - case StringType => ArrowType.Utf8.INSTANCE
> - case BinaryType => ArrowType.Binary.INSTANCE
> - case _ => throw new UnsupportedOperationException(s"Unsupported
> data type: $dataType")
> - }
> - }
> -
> - /**
> - * Convert a Spark Dataset schema to Arrow schema.
> - */
> - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
> - val arrowFields = schema.fields.map { f =>
> - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType),
> List.empty[Field].asJava)
> - }
> - new Schema(arrowFields.toList.asJava)
> - }
> -
> - /**
> * Maps Iterator from InternalRow to ArrowPayload. Limit
> ArrowRecordBatch size in ArrowPayload
> * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
> */
> @@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
> batch: ArrowRecordBatch,
> schema: StructType,
> allocator: BufferAllocator): Array[Byte] = {
> - val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
> + val arrowSchema = ArrowUtils.toArrowSchema(schema)
> val root = VectorSchemaRoot.create(arrowSchema, allocator)
> val out = new ByteArrayOutputStream()
> val writer = new ArrowFileWriter(root, null,
> Channels.newChannel(out))
> @@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
> * Create an Arrow ColumnWriter given the type and ordinal of row.
> */
> def apply(dataType: DataType, ordinal: Int, allocator:
> BufferAllocator): ColumnWriter = {
> - val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
> + val dtype = ArrowUtils.toArrowType(dataType)
> dataType match {
> case BooleanType => new BooleanColumnWriter(dtype, ordinal,
> allocator)
> case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> new file mode 100644
> index 0000000..2caf1ef
> --- /dev/null
> +++
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala
> @@ -0,0 +1,109 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.arrow
> +
> +import scala.collection.JavaConverters._
> +
> +import org.apache.arrow.memory.RootAllocator
> +import org.apache.arrow.vector.types.FloatingPointPrecision
> +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType,
> Schema}
> +
> +import org.apache.spark.sql.types._
> +
> +object ArrowUtils {
> +
> + val rootAllocator = new RootAllocator(Long.MaxValue)
> +
> + // todo: support more types.
> +
> + def toArrowType(dt: DataType): ArrowType = dt match {
> + case BooleanType => ArrowType.Bool.INSTANCE
> + case ByteType => new ArrowType.Int(8, true)
> + case ShortType => new ArrowType.Int(8 * 2, true)
> + case IntegerType => new ArrowType.Int(8 * 4, true)
> + case LongType => new ArrowType.Int(8 * 8, true)
> + case FloatType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
> + case DoubleType => new
> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
> + case StringType => ArrowType.Utf8.INSTANCE
> + case BinaryType => ArrowType.Binary.INSTANCE
> + case DecimalType.Fixed(precision, scale) => new
> ArrowType.Decimal(precision, scale)
> + case _ => throw new UnsupportedOperationException(s"Unsupported
> data type: ${dt.simpleString}")
> + }
> +
> + def fromArrowType(dt: ArrowType): DataType = dt match {
> + case ArrowType.Bool.INSTANCE => BooleanType
> + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 => ByteType
> + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 * 2 => ShortType
> + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 * 4 => IntegerType
> + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth ==
> 8 * 8 => LongType
> + case float: ArrowType.FloatingPoint
> + if float.getPrecision() == FloatingPointPrecision.SINGLE =>
> FloatType
> + case float: ArrowType.FloatingPoint
> + if float.getPrecision() == FloatingPointPrecision.DOUBLE =>
> DoubleType
> + case ArrowType.Utf8.INSTANCE => StringType
> + case ArrowType.Binary.INSTANCE => BinaryType
> + case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
> + case _ => throw new UnsupportedOperationException(s"Unsupported
> data type: $dt")
> + }
> +
> + def toArrowField(name: String, dt: DataType, nullable: Boolean): Field
> = {
> + dt match {
> + case ArrayType(elementType, containsNull) =>
> + val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE,
> null)
> + new Field(name, fieldType, Seq(toArrowField("element",
> elementType, containsNull)).asJava)
> + case StructType(fields) =>
> + val fieldType = new FieldType(nullable,
> ArrowType.Struct.INSTANCE, null)
> + new Field(name, fieldType,
> + fields.map { field =>
> + toArrowField(field.name, field.dataType, field.nullable)
> + }.toSeq.asJava)
> + case dataType =>
> + val fieldType = new FieldType(nullable, toArrowType(dataType),
> null)
> + new Field(name, fieldType, Seq.empty[Field].asJava)
> + }
> + }
> +
> + def fromArrowField(field: Field): DataType = {
> + field.getType match {
> + case ArrowType.List.INSTANCE =>
> + val elementField = field.getChildren().get(0)
> + val elementType = fromArrowField(elementField)
> + ArrayType(elementType, containsNull = elementField.isNullable)
> + case ArrowType.Struct.INSTANCE =>
> + val fields = field.getChildren().asScala.map { child =>
> + val dt = fromArrowField(child)
> + StructField(child.getName, dt, child.isNullable)
> + }
> + StructType(fields)
> + case arrowType => fromArrowType(arrowType)
> + }
> + }
> +
> + def toArrowSchema(schema: StructType): Schema = {
> + new Schema(schema.map { field =>
> + toArrowField(field.name, field.dataType, field.nullable)
> + }.asJava)
> + }
> +
> + def fromArrowSchema(schema: Schema): StructType = {
> + StructType(schema.getFields.asScala.map { field =>
> + val dt = fromArrowField(field)
> + StructField(field.getName, dt, field.isNullable)
> + })
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> index 159328c..55b4655 100644
> ---
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
> @@ -1202,7 +1202,7 @@ class ArrowConvertersSuite extends
> SharedSQLContext with BeforeAndAfterAll {
> val allocator = new RootAllocator(Long.MaxValue)
> val jsonReader = new JsonFileReader(jsonFile, allocator)
>
> - val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema)
> + val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema)
> val jsonSchema = jsonReader.start()
> Validator.compareSchemas(arrowSchema, jsonSchema)
>
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> new file mode 100644
> index 0000000..638619f
> --- /dev/null
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala
> @@ -0,0 +1,65 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.arrow
> +
> +import org.apache.spark.SparkFunSuite
> +import org.apache.spark.sql.types._
> +
> +class ArrowUtilsSuite extends SparkFunSuite {
> +
> + def roundtrip(dt: DataType): Unit = {
> + dt match {
> + case schema: StructType =>
> +
> assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema))
> === schema)
> + case _ =>
> + roundtrip(new StructType().add("value", dt))
> + }
> + }
> +
> + test("simple") {
> + roundtrip(BooleanType)
> + roundtrip(ByteType)
> + roundtrip(ShortType)
> + roundtrip(IntegerType)
> + roundtrip(LongType)
> + roundtrip(FloatType)
> + roundtrip(DoubleType)
> + roundtrip(StringType)
> + roundtrip(BinaryType)
> + roundtrip(DecimalType.SYSTEM_DEFAULT)
> + }
> +
> + test("array") {
> + roundtrip(ArrayType(IntegerType, containsNull = true))
> + roundtrip(ArrayType(IntegerType, containsNull = false))
> + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
> containsNull = true))
> + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
> containsNull = true))
> + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = true),
> containsNull = false))
> + roundtrip(ArrayType(ArrayType(IntegerType, containsNull = false),
> containsNull = false))
> + }
> +
> + test("struct") {
> + roundtrip(new StructType())
> + roundtrip(new StructType().add("i", IntegerType))
> + roundtrip(new StructType().add("arr", ArrayType(IntegerType)))
> + roundtrip(new StructType().add("i", IntegerType).add("arr",
> ArrayType(IntegerType)))
> + roundtrip(new StructType().add(
> + "struct",
> + new StructType().add("i", IntegerType).add("arr",
> ArrayType(IntegerType))))
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/spark/blob/cb19880c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> ----------------------------------------------------------------------
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> new file mode 100644
> index 0000000..d24a9e1
> --- /dev/null
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
> @@ -0,0 +1,410 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.spark.sql.execution.vectorized
> +
> +import org.apache.arrow.vector._
> +import org.apache.arrow.vector.complex._
> +
> +import org.apache.spark.SparkFunSuite
> +import org.apache.spark.sql.execution.arrow.ArrowUtils
> +import org.apache.spark.sql.types._
> +import org.apache.spark.unsafe.types.UTF8String
> +
> +class ArrowColumnVectorSuite extends SparkFunSuite {
> +
> + test("boolean") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("boolean", 0,
> Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("boolean", BooleanType,
> nullable = true)
> + .createVector(allocator).asInstanceOf[NullableBitVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, if (i % 2 == 0) 1 else 0)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === BooleanType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getBoolean(i) === (i % 2 == 0))
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getBooleans(0, 10) === (0 until 10).map(i =>
> (i % 2 == 0)))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("byte") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("byte", ByteType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableTinyIntVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, i.toByte)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === ByteType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getByte(i) === i.toByte)
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getBytes(0, 10) === (0 until 10).map(i =>
> i.toByte))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("short") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("short", ShortType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableSmallIntVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, i.toShort)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === ShortType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getShort(i) === i.toShort)
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getShorts(0, 10) === (0 until 10).map(i =>
> i.toShort))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("int") {
> + val allocator = ArrowUtils.rootAllocator.newChildAllocator("int",
> 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("int", IntegerType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableIntVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, i)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === IntegerType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getInt(i) === i)
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getInts(0, 10) === (0 until 10))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("long") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("long", LongType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableBigIntVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, i.toLong)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === LongType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getLong(i) === i.toLong)
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getLongs(0, 10) === (0 until 10).map(i =>
> i.toLong))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("float") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("float", FloatType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableFloat4Vector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, i.toFloat)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === FloatType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getFloat(i) === i.toFloat)
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getFloats(0, 10) === (0 until 10).map(i =>
> i.toFloat))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("double") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("double", DoubleType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableFloat8Vector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + mutator.setSafe(i, i.toDouble)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === DoubleType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getDouble(i) === i.toDouble)
> + }
> + assert(columnVector.isNullAt(10))
> +
> + assert(columnVector.getDoubles(0, 10) === (0 until 10).map(i =>
> i.toDouble))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("string") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("string", StringType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableVarCharVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + val utf8 = s"str$i".getBytes("utf8")
> + mutator.setSafe(i, utf8, 0, utf8.length)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === StringType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getUTF8String(i) ===
> UTF8String.fromString(s"str$i"))
> + }
> + assert(columnVector.isNullAt(10))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("binary") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableVarBinaryVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> +
> + (0 until 10).foreach { i =>
> + val utf8 = s"str$i".getBytes("utf8")
> + mutator.setSafe(i, utf8, 0, utf8.length)
> + }
> + mutator.setNull(10)
> + mutator.setValueCount(11)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === BinaryType)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + (0 until 10).foreach { i =>
> + assert(columnVector.getBinary(i) === s"str$i".getBytes("utf8"))
> + }
> + assert(columnVector.isNullAt(10))
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("array") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue)
> + val vector = ArrowUtils.toArrowField("array",
> ArrayType(IntegerType), nullable = true)
> + .createVector(allocator).asInstanceOf[ListVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> + val elementVector =
> vector.getDataVector().asInstanceOf[NullableIntVector]
> + val elementMutator = elementVector.getMutator()
> +
> + // [1, 2]
> + mutator.startNewValue(0)
> + elementMutator.setSafe(0, 1)
> + elementMutator.setSafe(1, 2)
> + mutator.endValue(0, 2)
> +
> + // [3, null, 5]
> + mutator.startNewValue(1)
> + elementMutator.setSafe(2, 3)
> + elementMutator.setNull(3)
> + elementMutator.setSafe(4, 5)
> + mutator.endValue(1, 3)
> +
> + // null
> +
> + // []
> + mutator.startNewValue(3)
> + mutator.endValue(3, 0)
> +
> + elementMutator.setValueCount(5)
> + mutator.setValueCount(4)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === ArrayType(IntegerType))
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + val array0 = columnVector.getArray(0)
> + assert(array0.numElements() === 2)
> + assert(array0.getInt(0) === 1)
> + assert(array0.getInt(1) === 2)
> +
> + val array1 = columnVector.getArray(1)
> + assert(array1.numElements() === 3)
> + assert(array1.getInt(0) === 3)
> + assert(array1.isNullAt(1))
> + assert(array1.getInt(2) === 5)
> +
> + assert(columnVector.isNullAt(2))
> +
> + val array3 = columnVector.getArray(3)
> + assert(array3.numElements() === 0)
> +
> + columnVector.close()
> + allocator.close()
> + }
> +
> + test("struct") {
> + val allocator =
> ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
> + val schema = new StructType().add("int", IntegerType).add("long",
> LongType)
> + val vector = ArrowUtils.toArrowField("struct", schema, nullable =
> true)
> + .createVector(allocator).asInstanceOf[NullableMapVector]
> + vector.allocateNew()
> + val mutator = vector.getMutator()
> + val intVector =
> vector.getChildByOrdinal(0).asInstanceOf[NullableIntVector]
> + val intMutator = intVector.getMutator()
> + val longVector =
> vector.getChildByOrdinal(1).asInstanceOf[NullableBigIntVector]
> + val longMutator = longVector.getMutator()
> +
> + // (1, 1L)
> + mutator.setIndexDefined(0)
> + intMutator.setSafe(0, 1)
> + longMutator.setSafe(0, 1L)
> +
> + // (2, null)
> + mutator.setIndexDefined(1)
> + intMutator.setSafe(1, 2)
> + longMutator.setNull(1)
> +
> + // (null, 3L)
> + mutator.setIndexDefined(2)
> + intMutator.setNull(2)
> + longMutator.setSafe(2, 3L)
> +
> + // null
> + mutator.setNull(3)
> +
> + // (5, 5L)
> + mutator.setIndexDefined(4)
> + intMutator.setSafe(4, 5)
> + longMutator.setSafe(4, 5L)
> +
> + intMutator.setValueCount(5)
> + longMutator.setValueCount(5)
> + mutator.setValueCount(5)
> +
> + val columnVector = new ArrowColumnVector(vector)
> + assert(columnVector.dataType === schema)
> + assert(columnVector.anyNullsSet)
> + assert(columnVector.numNulls === 1)
> +
> + val row0 = columnVector.getStruct(0, 2)
> + assert(row0.getInt(0) === 1)
> + assert(row0.getLong(1) === 1L)
> +
> + val row1 = columnVector.getStruct(1, 2)
> + assert(row1.getInt(0) === 2)
> + assert(row1.isNullAt(1))
> +
> + val row2 = columnVector.getStruct(2, 2)
> + assert(row2.isNullAt(0))
> + assert(row2.getLong(1) === 3L)
> +
> + assert(columnVector.isNullAt(3))
> +
> + val row4 = columnVector.getStruct(4, 2)
> + assert(row4.getInt(0) === 5)
> + assert(row4.getLong(1) === 5L)
> +
> + columnVector.close()
> + allocator.close()
> + }
> +}
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail:
> commits-unsubscribe@.apache
> For additional commands, e-mail:
> commits-help@.apache
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail:
> dev-unsubscribe@.apache
-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-spark-git-commit-SPARK-21472-SQL-Introduce-ArrowColumnVector-as-a-reader-for-Arrow-vectors-tp22003p22004.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org