You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/07 03:25:02 UTC

[spark] branch master updated: [SPARK-27049][SQL] Create util class to support handling partition values in file source V2

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a543f91  [SPARK-27049][SQL] Create util class to support handling partition values in file source V2
a543f91 is described below

commit a543f917e086baa1151475dcb2d122d83b18a619
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Mar 7 11:24:15 2019 +0800

    [SPARK-27049][SQL] Create util class to support handling partition values in file source V2
    
    ## What changes were proposed in this pull request?
    
    While I am migrating other data sources, I find that we should abstract the logic that:
    1. converting safe `InternalRow`s into `UnsafeRow`s
    2. appending partition values to the end of the result row if existed
    
    This PR proposes to support handling partition values in file source v2 abstraction by adding a util class `PartitionReaderWithPartitionValues`.
    
    ## How was this patch tested?
    
    Existing unit tests
    
    Closes #23987 from gengliangwang/SPARK-27049.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../v2/FilePartitionReaderFactory.scala            | 16 ++++++-
 .../v2/PartitionReaderWithPartitionValues.scala    | 53 ++++++++++++++++++++++
 .../datasources/v2/PartitionRecordReader.scala     | 10 ----
 .../v2/orc/OrcPartitionReaderFactory.scala         | 33 ++++----------
 4 files changed, 77 insertions(+), 35 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
index 101a70e..1daf8ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
@@ -17,8 +17,9 @@
 package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
@@ -45,6 +46,19 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
   def buildColumnarReader(partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
     throw new UnsupportedOperationException("Cannot create columnar reader.")
   }
+
+  protected def getReadDataSchema(
+      readSchema: StructType,
+      partitionSchema: StructType,
+      isCaseSensitive: Boolean): StructType = {
+    val partitionNameSet =
+      partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet
+    val fields = readSchema.fields.filterNot { field =>
+      partitionNameSet.contains(PartitioningUtils.getColName(field, isCaseSensitive))
+    }
+
+    StructType(fields)
+  }
 }
 
 // A compound class for combining file and its corresponding reader.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionReaderWithPartitionValues.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionReaderWithPartitionValues.scala
new file mode 100644
index 0000000..072465b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionReaderWithPartitionValues.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A wrapper reader that always appends partition values to [[InternalRow]]s produced by the input
+ * reader [[fileReader]].
+ */
+class PartitionReaderWithPartitionValues(
+    fileReader: PartitionReader[InternalRow],
+    readDataSchema: StructType,
+    partitionSchema: StructType,
+    partitionValues: InternalRow) extends PartitionReader[InternalRow] {
+  private val fullSchema = readDataSchema.toAttributes ++ partitionSchema.toAttributes
+  private val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+  // Note that we have to apply the converter even though `file.partitionValues` is empty.
+  // This is because the converter is also responsible for converting safe `InternalRow`s into
+  // `UnsafeRow`s
+  private val rowConverter = {
+    if (partitionSchema.isEmpty) {
+      () => unsafeProjection(fileReader.get())}
+    else {
+      val joinedRow = new JoinedRow()
+      () => unsafeProjection(joinedRow(fileReader.get(), partitionValues))
+    }
+  }
+
+  override def next(): Boolean = fileReader.next()
+
+  override def get(): InternalRow = rowConverter()
+
+  override def close(): Unit = fileReader.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala
index ff78ef3..baa8cb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala
@@ -29,13 +29,3 @@ class PartitionRecordReader[T](
 
   override def close(): Unit = rowReader.close()
 }
-
-class PartitionRecordReaderWithProject[X, T](
-    private[this] var rowReader: RecordReader[_, X],
-    project: X => T) extends PartitionReader[T] {
-  override def next(): Boolean = rowReader.nextKeyValue()
-
-  override def get(): T = project(rowReader.getCurrentValue)
-
-  override def close(): Unit = rowReader.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index f6fc0ca..4ae10a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -90,22 +90,20 @@ case class OrcPartitionReaderFactory(
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
       val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
 
-      val requiredDataSchema = subtractSchema(readSchema, partitionSchema)
+      val readDataSchema = getReadDataSchema(readSchema, partitionSchema, isCaseSensitive)
       val orcRecordReader = new OrcInputFormat[OrcStruct]
         .createRecordReader(fileSplit, taskAttemptContext)
+      val deserializer = new OrcDeserializer(dataSchema, readDataSchema, requestedColIds)
+      val fileReader = new PartitionReader[InternalRow] {
+        override def next(): Boolean = orcRecordReader.nextKeyValue()
 
-      val fullSchema = requiredDataSchema.toAttributes ++ partitionSchema.toAttributes
-      val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-      val deserializer = new OrcDeserializer(dataSchema, requiredDataSchema, requestedColIds)
+        override def get(): InternalRow = deserializer.deserialize(orcRecordReader.getCurrentValue)
 
-      val projection = if (partitionSchema.length == 0) {
-        (value: OrcStruct) => unsafeProjection(deserializer.deserialize(value))
-      } else {
-        val joinedRow = new JoinedRow()
-        (value: OrcStruct) =>
-          unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues))
+        override def close(): Unit = orcRecordReader.close()
       }
-      new PartitionRecordReaderWithProject(orcRecordReader, projection)
+
+      new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
+        partitionSchema, file.partitionValues)
     }
   }
 
@@ -153,17 +151,4 @@ case class OrcPartitionReaderFactory(
     }
   }
 
-  /**
-   * Returns a new StructType that is a copy of the original StructType, removing any items that
-   * also appear in other StructType. The order is preserved from the original StructType.
-   */
-  private def subtractSchema(original: StructType, other: StructType): StructType = {
-    val otherNameSet = other.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet
-    val fields = original.fields.filterNot { field =>
-      otherNameSet.contains(PartitioningUtils.getColName(field, isCaseSensitive))
-    }
-
-    StructType(fields)
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org