You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/12 11:00:25 UTC

[spark] branch master updated: [SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ff77b1  [SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch
9ff77b1 is described below

commit 9ff77b198ea6e19a7cbed949fe2be863266a29b5
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Sun May 12 19:59:56 2019 +0900

    [SPARK-27675][SQL] do not use MutableColumnarRow in ColumnarBatch
    
    ## What changes were proposed in this pull request?
    
    To move DS v2 API to the catalyst module, we can't refer to an internal class (`MutableColumnarRow`) in `ColumnarBatch`.
    
    This PR creates a read-only version of `MutableColumnarRow`, and use it in `ColumnarBatch`.
    
    close https://github.com/apache/spark/pull/24546
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #24581 from cloud-fan/mutable-row.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../execution/vectorized/MutableColumnarRow.java   |  44 +++---
 .../apache/spark/sql/vectorized/ColumnarBatch.java | 175 ++++++++++++++++++++-
 2 files changed, 189 insertions(+), 30 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 4e4242f..fca7e36 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -26,7 +26,6 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.sql.vectorized.ColumnarRow;
-import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -39,17 +38,10 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public final class MutableColumnarRow extends InternalRow {
   public int rowId;
-  private final ColumnVector[] columns;
-  private final WritableColumnVector[] writableColumns;
-
-  public MutableColumnarRow(ColumnVector[] columns) {
-    this.columns = columns;
-    this.writableColumns = null;
-  }
+  private final WritableColumnVector[] columns;
 
   public MutableColumnarRow(WritableColumnVector[] writableColumns) {
     this.columns = writableColumns;
-    this.writableColumns = writableColumns;
   }
 
   @Override
@@ -228,54 +220,54 @@ public final class MutableColumnarRow extends InternalRow {
 
   @Override
   public void setNullAt(int ordinal) {
-    writableColumns[ordinal].putNull(rowId);
+    columns[ordinal].putNull(rowId);
   }
 
   @Override
   public void setBoolean(int ordinal, boolean value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putBoolean(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putBoolean(rowId, value);
   }
 
   @Override
   public void setByte(int ordinal, byte value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putByte(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putByte(rowId, value);
   }
 
   @Override
   public void setShort(int ordinal, short value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putShort(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putShort(rowId, value);
   }
 
   @Override
   public void setInt(int ordinal, int value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putInt(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putInt(rowId, value);
   }
 
   @Override
   public void setLong(int ordinal, long value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putLong(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putLong(rowId, value);
   }
 
   @Override
   public void setFloat(int ordinal, float value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putFloat(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putFloat(rowId, value);
   }
 
   @Override
   public void setDouble(int ordinal, double value) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putDouble(rowId, value);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putDouble(rowId, value);
   }
 
   @Override
   public void setDecimal(int ordinal, Decimal value, int precision) {
-    writableColumns[ordinal].putNotNull(rowId);
-    writableColumns[ordinal].putDecimal(rowId, value, precision);
+    columns[ordinal].putNotNull(rowId);
+    columns[ordinal].putDecimal(rowId, value, precision);
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 07546a5..9f917ea 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -20,7 +20,10 @@ import java.util.*;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this
@@ -33,7 +36,7 @@ public final class ColumnarBatch {
   private final ColumnVector[] columns;
 
   // Staging row returned from `getRow`.
-  private final MutableColumnarRow row;
+  private final ColumnarBatchRow row;
 
   /**
    * Called to close all the columns in this batch. It is not valid to access the data after
@@ -50,7 +53,7 @@ public final class ColumnarBatch {
    */
   public Iterator<InternalRow> rowIterator() {
     final int maxRows = numRows;
-    final MutableColumnarRow row = new MutableColumnarRow(columns);
+    final ColumnarBatchRow row = new ColumnarBatchRow(columns);
     return new Iterator<InternalRow>() {
       int rowId = 0;
 
@@ -108,6 +111,170 @@ public final class ColumnarBatch {
 
   public ColumnarBatch(ColumnVector[] columns) {
     this.columns = columns;
-    this.row = new MutableColumnarRow(columns);
+    this.row = new ColumnarBatchRow(columns);
   }
 }
+
+/**
+ * An internal class, which wraps an array of {@link ColumnVector} and provides a row view.
+ */
+class ColumnarBatchRow extends InternalRow {
+  public int rowId;
+  private final ColumnVector[] columns;
+
+  ColumnarBatchRow(ColumnVector[] columns) {
+    this.columns = columns;
+  }
+
+  @Override
+  public int numFields() { return columns.length; }
+
+  @Override
+  public InternalRow copy() {
+    GenericInternalRow row = new GenericInternalRow(columns.length);
+    for (int i = 0; i < numFields(); i++) {
+      if (isNullAt(i)) {
+        row.setNullAt(i);
+      } else {
+        DataType dt = columns[i].dataType();
+        if (dt instanceof BooleanType) {
+          row.setBoolean(i, getBoolean(i));
+        } else if (dt instanceof ByteType) {
+          row.setByte(i, getByte(i));
+        } else if (dt instanceof ShortType) {
+          row.setShort(i, getShort(i));
+        } else if (dt instanceof IntegerType) {
+          row.setInt(i, getInt(i));
+        } else if (dt instanceof LongType) {
+          row.setLong(i, getLong(i));
+        } else if (dt instanceof FloatType) {
+          row.setFloat(i, getFloat(i));
+        } else if (dt instanceof DoubleType) {
+          row.setDouble(i, getDouble(i));
+        } else if (dt instanceof StringType) {
+          row.update(i, getUTF8String(i).copy());
+        } else if (dt instanceof BinaryType) {
+          row.update(i, getBinary(i));
+        } else if (dt instanceof DecimalType) {
+          DecimalType t = (DecimalType)dt;
+          row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
+        } else if (dt instanceof DateType) {
+          row.setInt(i, getInt(i));
+        } else if (dt instanceof TimestampType) {
+          row.setLong(i, getLong(i));
+        } else {
+          throw new RuntimeException("Not implemented. " + dt);
+        }
+      }
+    }
+    return row;
+  }
+
+  @Override
+  public boolean anyNull() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); }
+
+  @Override
+  public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); }
+
+  @Override
+  public byte getByte(int ordinal) { return columns[ordinal].getByte(rowId); }
+
+  @Override
+  public short getShort(int ordinal) { return columns[ordinal].getShort(rowId); }
+
+  @Override
+  public int getInt(int ordinal) { return columns[ordinal].getInt(rowId); }
+
+  @Override
+  public long getLong(int ordinal) { return columns[ordinal].getLong(rowId); }
+
+  @Override
+  public float getFloat(int ordinal) { return columns[ordinal].getFloat(rowId); }
+
+  @Override
+  public double getDouble(int ordinal) { return columns[ordinal].getDouble(rowId); }
+
+  @Override
+  public Decimal getDecimal(int ordinal, int precision, int scale) {
+    return columns[ordinal].getDecimal(rowId, precision, scale);
+  }
+
+  @Override
+  public UTF8String getUTF8String(int ordinal) {
+    return columns[ordinal].getUTF8String(rowId);
+  }
+
+  @Override
+  public byte[] getBinary(int ordinal) {
+    return columns[ordinal].getBinary(rowId);
+  }
+
+  @Override
+  public CalendarInterval getInterval(int ordinal) {
+    return columns[ordinal].getInterval(rowId);
+  }
+
+  @Override
+  public ColumnarRow getStruct(int ordinal, int numFields) {
+    return columns[ordinal].getStruct(rowId);
+  }
+
+  @Override
+  public ColumnarArray getArray(int ordinal) {
+    return columns[ordinal].getArray(rowId);
+  }
+
+  @Override
+  public ColumnarMap getMap(int ordinal) {
+    return columns[ordinal].getMap(rowId);
+  }
+
+  @Override
+  public Object get(int ordinal, DataType dataType) {
+    if (dataType instanceof BooleanType) {
+      return getBoolean(ordinal);
+    } else if (dataType instanceof ByteType) {
+      return getByte(ordinal);
+    } else if (dataType instanceof ShortType) {
+      return getShort(ordinal);
+    } else if (dataType instanceof IntegerType) {
+      return getInt(ordinal);
+    } else if (dataType instanceof LongType) {
+      return getLong(ordinal);
+    } else if (dataType instanceof FloatType) {
+      return getFloat(ordinal);
+    } else if (dataType instanceof DoubleType) {
+      return getDouble(ordinal);
+    } else if (dataType instanceof StringType) {
+      return getUTF8String(ordinal);
+    } else if (dataType instanceof BinaryType) {
+      return getBinary(ordinal);
+    } else if (dataType instanceof DecimalType) {
+      DecimalType t = (DecimalType) dataType;
+      return getDecimal(ordinal, t.precision(), t.scale());
+    } else if (dataType instanceof DateType) {
+      return getInt(ordinal);
+    } else if (dataType instanceof TimestampType) {
+      return getLong(ordinal);
+    } else if (dataType instanceof ArrayType) {
+      return getArray(ordinal);
+    } else if (dataType instanceof StructType) {
+      return getStruct(ordinal, ((StructType)dataType).fields().length);
+    } else if (dataType instanceof MapType) {
+      return getMap(ordinal);
+    } else {
+      throw new UnsupportedOperationException("Datatype not supported " + dataType);
+    }
+  }
+
+  @Override
+  public void update(int ordinal, Object value) { throw new UnsupportedOperationException(); }
+
+  @Override
+  public void setNullAt(int ordinal) { throw new UnsupportedOperationException(); }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org