You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/12 11:00:25 UTC
[spark] branch master updated: [SPARK-27675][SQL] do not use
MutableColumnarRow in ColumnarBatch
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9ff77b1 [SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch
9ff77b1 is described below
commit 9ff77b198ea6e19a7cbed949fe2be863266a29b5
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Sun May 12 19:59:56 2019 +0900
[SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch
## What changes were proposed in this pull request?
To move DS v2 API to the catalyst module, we can't refer to an internal class (`MutableColumnarRow`) in `ColumnarBatch`.
This PR creates a read-only version of `MutableColumnarRow`, and use it in `ColumnarBatch`.
close https://github.com/apache/spark/pull/24546
## How was this patch tested?
existing tests
Closes #24581 from cloud-fan/mutable-row.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../execution/vectorized/MutableColumnarRow.java | 44 +++---
.../apache/spark/sql/vectorized/ColumnarBatch.java | 175 ++++++++++++++++++++-
2 files changed, 189 insertions(+), 30 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 4e4242f..fca7e36 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -26,7 +26,6 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.sql.vectorized.ColumnarRow;
-import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@@ -39,17 +38,10 @@ import org.apache.spark.unsafe.types.UTF8String;
*/
public final class MutableColumnarRow extends InternalRow {
public int rowId;
- private final ColumnVector[] columns;
- private final WritableColumnVector[] writableColumns;
-
- public MutableColumnarRow(ColumnVector[] columns) {
- this.columns = columns;
- this.writableColumns = null;
- }
+ private final WritableColumnVector[] columns;
public MutableColumnarRow(WritableColumnVector[] writableColumns) {
this.columns = writableColumns;
- this.writableColumns = writableColumns;
}
@Override
@@ -228,54 +220,54 @@ public final class MutableColumnarRow extends InternalRow {
@Override
public void setNullAt(int ordinal) {
- writableColumns[ordinal].putNull(rowId);
+ columns[ordinal].putNull(rowId);
}
@Override
public void setBoolean(int ordinal, boolean value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putBoolean(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putBoolean(rowId, value);
}
@Override
public void setByte(int ordinal, byte value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putByte(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putByte(rowId, value);
}
@Override
public void setShort(int ordinal, short value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putShort(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putShort(rowId, value);
}
@Override
public void setInt(int ordinal, int value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putInt(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putInt(rowId, value);
}
@Override
public void setLong(int ordinal, long value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putLong(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putLong(rowId, value);
}
@Override
public void setFloat(int ordinal, float value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putFloat(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putFloat(rowId, value);
}
@Override
public void setDouble(int ordinal, double value) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putDouble(rowId, value);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putDouble(rowId, value);
}
@Override
public void setDecimal(int ordinal, Decimal value, int precision) {
- writableColumns[ordinal].putNotNull(rowId);
- writableColumns[ordinal].putDecimal(rowId, value, precision);
+ columns[ordinal].putNotNull(rowId);
+ columns[ordinal].putDecimal(rowId, value, precision);
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 07546a5..9f917ea 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -20,7 +20,10 @@ import java.util.*;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
/**
* This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this
@@ -33,7 +36,7 @@ public final class ColumnarBatch {
private final ColumnVector[] columns;
// Staging row returned from `getRow`.
- private final MutableColumnarRow row;
+ private final ColumnarBatchRow row;
/**
* Called to close all the columns in this batch. It is not valid to access the data after
@@ -50,7 +53,7 @@ public final class ColumnarBatch {
*/
public Iterator<InternalRow> rowIterator() {
final int maxRows = numRows;
- final MutableColumnarRow row = new MutableColumnarRow(columns);
+ final ColumnarBatchRow row = new ColumnarBatchRow(columns);
return new Iterator<InternalRow>() {
int rowId = 0;
@@ -108,6 +111,170 @@ public final class ColumnarBatch {
public ColumnarBatch(ColumnVector[] columns) {
this.columns = columns;
- this.row = new MutableColumnarRow(columns);
+ this.row = new ColumnarBatchRow(columns);
}
}
+
+/**
+ * An internal class, which wraps an array of {@link ColumnVector} and provides a row view.
+ */
+class ColumnarBatchRow extends InternalRow {
+ public int rowId;
+ private final ColumnVector[] columns;
+
+ ColumnarBatchRow(ColumnVector[] columns) {
+ this.columns = columns;
+ }
+
+ @Override
+ public int numFields() { return columns.length; }
+
+ @Override
+ public InternalRow copy() {
+ GenericInternalRow row = new GenericInternalRow(columns.length);
+ for (int i = 0; i < numFields(); i++) {
+ if (isNullAt(i)) {
+ row.setNullAt(i);
+ } else {
+ DataType dt = columns[i].dataType();
+ if (dt instanceof BooleanType) {
+ row.setBoolean(i, getBoolean(i));
+ } else if (dt instanceof ByteType) {
+ row.setByte(i, getByte(i));
+ } else if (dt instanceof ShortType) {
+ row.setShort(i, getShort(i));
+ } else if (dt instanceof IntegerType) {
+ row.setInt(i, getInt(i));
+ } else if (dt instanceof LongType) {
+ row.setLong(i, getLong(i));
+ } else if (dt instanceof FloatType) {
+ row.setFloat(i, getFloat(i));
+ } else if (dt instanceof DoubleType) {
+ row.setDouble(i, getDouble(i));
+ } else if (dt instanceof StringType) {
+ row.update(i, getUTF8String(i).copy());
+ } else if (dt instanceof BinaryType) {
+ row.update(i, getBinary(i));
+ } else if (dt instanceof DecimalType) {
+ DecimalType t = (DecimalType)dt;
+ row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
+ } else if (dt instanceof DateType) {
+ row.setInt(i, getInt(i));
+ } else if (dt instanceof TimestampType) {
+ row.setLong(i, getLong(i));
+ } else {
+ throw new RuntimeException("Not implemented. " + dt);
+ }
+ }
+ }
+ return row;
+ }
+
+ @Override
+ public boolean anyNull() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); }
+
+ @Override
+ public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); }
+
+ @Override
+ public byte getByte(int ordinal) { return columns[ordinal].getByte(rowId); }
+
+ @Override
+ public short getShort(int ordinal) { return columns[ordinal].getShort(rowId); }
+
+ @Override
+ public int getInt(int ordinal) { return columns[ordinal].getInt(rowId); }
+
+ @Override
+ public long getLong(int ordinal) { return columns[ordinal].getLong(rowId); }
+
+ @Override
+ public float getFloat(int ordinal) { return columns[ordinal].getFloat(rowId); }
+
+ @Override
+ public double getDouble(int ordinal) { return columns[ordinal].getDouble(rowId); }
+
+ @Override
+ public Decimal getDecimal(int ordinal, int precision, int scale) {
+ return columns[ordinal].getDecimal(rowId, precision, scale);
+ }
+
+ @Override
+ public UTF8String getUTF8String(int ordinal) {
+ return columns[ordinal].getUTF8String(rowId);
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ return columns[ordinal].getBinary(rowId);
+ }
+
+ @Override
+ public CalendarInterval getInterval(int ordinal) {
+ return columns[ordinal].getInterval(rowId);
+ }
+
+ @Override
+ public ColumnarRow getStruct(int ordinal, int numFields) {
+ return columns[ordinal].getStruct(rowId);
+ }
+
+ @Override
+ public ColumnarArray getArray(int ordinal) {
+ return columns[ordinal].getArray(rowId);
+ }
+
+ @Override
+ public ColumnarMap getMap(int ordinal) {
+ return columns[ordinal].getMap(rowId);
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
+ if (dataType instanceof BooleanType) {
+ return getBoolean(ordinal);
+ } else if (dataType instanceof ByteType) {
+ return getByte(ordinal);
+ } else if (dataType instanceof ShortType) {
+ return getShort(ordinal);
+ } else if (dataType instanceof IntegerType) {
+ return getInt(ordinal);
+ } else if (dataType instanceof LongType) {
+ return getLong(ordinal);
+ } else if (dataType instanceof FloatType) {
+ return getFloat(ordinal);
+ } else if (dataType instanceof DoubleType) {
+ return getDouble(ordinal);
+ } else if (dataType instanceof StringType) {
+ return getUTF8String(ordinal);
+ } else if (dataType instanceof BinaryType) {
+ return getBinary(ordinal);
+ } else if (dataType instanceof DecimalType) {
+ DecimalType t = (DecimalType) dataType;
+ return getDecimal(ordinal, t.precision(), t.scale());
+ } else if (dataType instanceof DateType) {
+ return getInt(ordinal);
+ } else if (dataType instanceof TimestampType) {
+ return getLong(ordinal);
+ } else if (dataType instanceof ArrayType) {
+ return getArray(ordinal);
+ } else if (dataType instanceof StructType) {
+ return getStruct(ordinal, ((StructType)dataType).fields().length);
+ } else if (dataType instanceof MapType) {
+ return getMap(ordinal);
+ } else {
+ throw new UnsupportedOperationException("Datatype not supported " + dataType);
+ }
+ }
+
+ @Override
+ public void update(int ordinal, Object value) { throw new UnsupportedOperationException(); }
+
+ @Override
+ public void setNullAt(int ordinal) { throw new UnsupportedOperationException(); }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org