You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/01/03 16:37:28 UTC

[spark] branch master updated: [SPARK-26447][SQL] Allow OrcColumnarBatchReader to return less partition columns

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e2dbafd  [SPARK-26447][SQL] Allow OrcColumnarBatchReader to return less partition columns
e2dbafd is described below

commit e2dbafdbc5e50fcf2554bf51939ce0cd363d8806
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Fri Jan 4 00:37:03 2019 +0800

    [SPARK-26447][SQL] Allow OrcColumnarBatchReader to return less partition columns
    
    ## What changes were proposed in this pull request?
    
    Currently OrcColumnarBatchReader returns all the partition column values in the batch read.
    In data source V2, we can improve it by returning the required partition column values only.
    
    This PR is part of https://github.com/apache/spark/pull/23383 . As cloud-fan suggested, create a new PR to make review easier.
    
    Also, this PR doesn't improve `OrcFileFormat`, since in the method `buildReaderWithPartitionValues`, the `requiredSchema` filter out all the partition columns, so we can't know which partition column is required.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #23387 from gengliangwang/refactorOrcColumnarBatch.
    
    Lead-authored-by: Gengliang Wang <ge...@databricks.com>
    Co-authored-by: Gengliang Wang <lt...@gmail.com>
    Co-authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/orc/OrcColumnarBatchReader.java    | 93 ++++++++++++----------
 .../execution/datasources/orc/OrcFileFormat.scala  | 10 ++-
 .../orc/OrcColumnarBatchReaderSuite.scala          | 80 +++++++++++++++++++
 3 files changed, 136 insertions(+), 47 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index a0d9578..7dc90df 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc;
 import java.io.IOException;
 import java.util.stream.IntStream;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -58,9 +59,14 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
 
   /**
    * The column IDs of the physical ORC file schema which are required by this reader.
-   * -1 means this required column doesn't exist in the ORC file.
+   * -1 means this required column is partition column, or it doesn't exist in the ORC file.
+   * Ideally partition column should never appear in the physical file, and should only appear
+   * in the directory name. However, Spark allows partition columns inside physical file,
+   * but Spark will discard the values from the file, and use the partition value got from
+   * directory name. The column order will be reserved though.
    */
-  private int[] requestedColIds;
+  @VisibleForTesting
+  public int[] requestedDataColIds;
 
   // Record reader from ORC row batch.
   private org.apache.orc.RecordReader recordReader;
@@ -68,7 +74,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
   private StructField[] requiredFields;
 
   // The result columnar batch for vectorized execution by whole-stage codegen.
-  private ColumnarBatch columnarBatch;
+  @VisibleForTesting
+  public ColumnarBatch columnarBatch;
 
   // Writable column vectors of the result columnar batch.
   private WritableColumnVector[] columnVectors;
@@ -143,25 +150,33 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
   /**
    * Initialize columnar batch by setting required schema and partition information.
    * With this information, this creates ColumnarBatch with the full schema.
+   *
+   * @param orcSchema Schema from ORC file reader.
+   * @param requiredFields All the fields that are required to return, including partition fields.
+   * @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed.
+   * @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed.
+   * @param partitionValues Values of partition columns.
    */
   public void initBatch(
       TypeDescription orcSchema,
-      int[] requestedColIds,
       StructField[] requiredFields,
-      StructType partitionSchema,
+      int[] requestedDataColIds,
+      int[] requestedPartitionColIds,
       InternalRow partitionValues) {
     batch = orcSchema.createRowBatch(capacity);
     assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
-
+    assert(requiredFields.length == requestedDataColIds.length);
+    assert(requiredFields.length == requestedPartitionColIds.length);
+    // If a required column is also partition column, use partition value and don't read from file.
+    for (int i = 0; i < requiredFields.length; i++) {
+      if (requestedPartitionColIds[i] != -1) {
+        requestedDataColIds[i] = -1;
+      }
+    }
     this.requiredFields = requiredFields;
-    this.requestedColIds = requestedColIds;
-    assert(requiredFields.length == requestedColIds.length);
+    this.requestedDataColIds = requestedDataColIds;
 
     StructType resultSchema = new StructType(requiredFields);
-    for (StructField f : partitionSchema.fields()) {
-      resultSchema = resultSchema.add(f);
-    }
-
     if (copyToSpark) {
       if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
         columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
@@ -169,22 +184,18 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
         columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
       }
 
-      // Initialize the missing columns once.
+      // Initialize the partition columns and missing columns once.
       for (int i = 0; i < requiredFields.length; i++) {
-        if (requestedColIds[i] == -1) {
+        if (requestedPartitionColIds[i] != -1) {
+          ColumnVectorUtils.populate(columnVectors[i],
+            partitionValues, requestedPartitionColIds[i]);
+          columnVectors[i].setIsConstant();
+        } else if (requestedDataColIds[i] == -1) {
           columnVectors[i].putNulls(0, capacity);
           columnVectors[i].setIsConstant();
         }
       }
 
-      if (partitionValues.numFields() > 0) {
-        int partitionIdx = requiredFields.length;
-        for (int i = 0; i < partitionValues.numFields(); i++) {
-          ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
-          columnVectors[i + partitionIdx].setIsConstant();
-        }
-      }
-
       columnarBatch = new ColumnarBatch(columnVectors);
     } else {
       // Just wrap the ORC column vector instead of copying it to Spark column vector.
@@ -192,26 +203,22 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
 
       for (int i = 0; i < requiredFields.length; i++) {
         DataType dt = requiredFields[i].dataType();
-        int colId = requestedColIds[i];
-        // Initialize the missing columns once.
-        if (colId == -1) {
-          OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
-          missingCol.putNulls(0, capacity);
-          missingCol.setIsConstant();
-          orcVectorWrappers[i] = missingCol;
-        } else {
-          orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
-        }
-      }
-
-      if (partitionValues.numFields() > 0) {
-        int partitionIdx = requiredFields.length;
-        for (int i = 0; i < partitionValues.numFields(); i++) {
-          DataType dt = partitionSchema.fields()[i].dataType();
+        if (requestedPartitionColIds[i] != -1) {
           OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
-          ColumnVectorUtils.populate(partitionCol, partitionValues, i);
+          ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
           partitionCol.setIsConstant();
-          orcVectorWrappers[partitionIdx + i] = partitionCol;
+          orcVectorWrappers[i] = partitionCol;
+        } else {
+          int colId = requestedDataColIds[i];
+          // Initialize the missing columns once.
+          if (colId == -1) {
+            OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+            missingCol.putNulls(0, capacity);
+            missingCol.setIsConstant();
+            orcVectorWrappers[i] = missingCol;
+          } else {
+            orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
+          }
         }
       }
 
@@ -233,7 +240,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
 
     if (!copyToSpark) {
       for (int i = 0; i < requiredFields.length; i++) {
-        if (requestedColIds[i] != -1) {
+        if (requestedDataColIds[i] != -1) {
           ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
         }
       }
@@ -248,8 +255,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
       StructField field = requiredFields[i];
       WritableColumnVector toColumn = columnVectors[i];
 
-      if (requestedColIds[i] >= 0) {
-        ColumnVector fromColumn = batch.cols[requestedColIds[i]];
+      if (requestedDataColIds[i] >= 0) {
+        ColumnVector fromColumn = batch.cols[requestedDataColIds[i]];
 
         if (fromColumn.isRepeating) {
           putRepeatingValues(batchSize, field, fromColumn, toColumn);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 4574f82..cd10ad2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -206,13 +206,15 @@ class OrcFileFormat
           // after opening a file.
           val iter = new RecordReaderIterator(batchReader)
           Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-
+          val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
+          val requestedPartitionColIds =
+            Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)
           batchReader.initialize(fileSplit, taskAttemptContext)
           batchReader.initBatch(
             reader.getSchema,
-            requestedColIds,
-            requiredSchema.fields,
-            partitionSchema,
+            resultSchema.fields,
+            requestedDataColIds,
+            requestedPartitionColIds,
             file.partitionValues)
 
           iter.asInstanceOf[Iterator[InternalRow]]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
new file mode 100644
index 0000000..52abeb2
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.orc.TypeDescription
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String.fromString
+
+class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext {
+  private val dataSchema = StructType.fromDDL("col1 int, col2 int")
+  private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
+  private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
+  private val orcFileSchemaList = Seq(
+    "struct<col1:int,col2:int>", "struct<col1:int,col2:int,p1:string,p2:string>",
+    "struct<col1:int,col2:int,p1:string>", "struct<col1:int,col2:int,p2:string>")
+  orcFileSchemaList.foreach { case schema =>
+    val orcFileSchema = TypeDescription.fromString(schema)
+
+    val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant")
+    isConstant.setAccessible(true)
+
+    def getReader(
+        requestedDataColIds: Array[Int],
+        requestedPartitionColIds: Array[Int],
+        resultFields: Array[StructField]): OrcColumnarBatchReader = {
+      val reader = new OrcColumnarBatchReader(false, false, 4096)
+      reader.initBatch(
+        orcFileSchema,
+        resultFields,
+        requestedDataColIds,
+        requestedPartitionColIds,
+        partitionValues)
+      reader
+    }
+
+    test(s"all partitions are requested: $schema") {
+      val requestedDataColIds = Array(0, 1, 0, 0)
+      val requestedPartitionColIds = Array(-1, -1, 0, 1)
+      val reader = getReader(requestedDataColIds, requestedPartitionColIds,
+        dataSchema.fields ++ partitionSchema.fields)
+      assert(reader.requestedDataColIds === Array(0, 1, -1, -1))
+    }
+
+    test(s"initBatch should initialize requested partition columns only: $schema") {
+      val requestedDataColIds = Array(0, -1) // only `col1` is requested, `col2` doesn't exist
+      val requestedPartitionColIds = Array(-1, 0) // only `p1` is requested
+      val reader = getReader(requestedDataColIds, requestedPartitionColIds,
+        Array(dataSchema.fields(0), partitionSchema.fields(0)))
+      val batch = reader.columnarBatch
+      assert(batch.numCols() === 2)
+
+      assert(batch.column(0).isInstanceOf[OrcColumnVector])
+      assert(batch.column(1).isInstanceOf[OnHeapColumnVector])
+
+      val p1 = batch.column(1).asInstanceOf[OnHeapColumnVector]
+      assert(isConstant.get(p1).asInstanceOf[Boolean]) // Partition column is constant.
+      assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org