You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/09/28 22:53:50 UTC

spark git commit: [SPARK-10395] [SQL] Simplifies CatalystReadSupport

Repository: spark
Updated Branches:
  refs/heads/master 353c30bd7 -> 14978b785


[SPARK-10395] [SQL] Simplifies CatalystReadSupport

Please refer to [SPARK-10395] [1] for details.

[1]: https://issues.apache.org/jira/browse/SPARK-10395

Author: Cheng Lian <li...@databricks.com>

Closes #8553 from liancheng/spark-10395/simplify-parquet-read-support.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14978b78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14978b78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14978b78

Branch: refs/heads/master
Commit: 14978b785a43e0c13c8bdfd52d20cc8984984ba3
Parents: 353c30b
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Sep 28 13:53:45 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Sep 28 13:53:45 2015 -0700

----------------------------------------------------------------------
 .../parquet/CatalystReadSupport.scala           | 92 ++++++++++----------
 1 file changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14978b78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 8c819f1..9502b83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
@@ -29,34 +29,62 @@ import org.apache.parquet.schema.Type.Repetition
 import org.apache.parquet.schema._
 
 import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
 
+/**
+ * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
+ * [[InternalRow]]s.
+ *
+ * The API interface of [[ReadSupport]] is a little bit over complicated because of historical
+ * reasons.  In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
+ * instantiated and initialized twice on both driver side and executor side.  The [[init()]] method
+ * is for driver side initialization, while [[prepareForRead()]] is for executor side.  However,
+ * starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated
+ * and initialized on executor side.  So, theoretically, now it's totally fine to combine these two
+ * methods into a single initialization method.  The only reason (I could think of) to still have
+ * them here is for parquet-mr API backwards-compatibility.
+ *
+ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
+ * to [[prepareForRead()]], but use a private `var` for simplicity.
+ */
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
-  // Called after `init()` when initializing Parquet record reader.
+  private var catalystRequestedSchema: StructType = _
+
+  /**
+   * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
+   * readers.  Responsible for figuring out Parquet requested schema used for column pruning.
+   */
+  override def init(context: InitContext): ReadContext = {
+    catalystRequestedSchema = {
+      // scalastyle:off jobcontext
+      val conf = context.getConfiguration
+      // scalastyle:on jobcontext
+      val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+      assert(schemaString != null, "Parquet requested schema not set.")
+      StructType.fromString(schemaString)
+    }
+
+    val parquetRequestedSchema =
+      CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
+
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
   override def prepareForRead(
       conf: Configuration,
       keyValueMetaData: JMap[String, String],
       fileSchema: MessageType,
       readContext: ReadContext): RecordMaterializer[InternalRow] = {
     log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
-
-    val toCatalyst = new CatalystSchemaConverter(conf)
     val parquetRequestedSchema = readContext.getRequestedSchema
 
-    val catalystRequestedSchema =
-      Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata =>
-        metadata
-          // First tries to read requested schema, which may result from projections
-          .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
-          // If not available, tries to read Catalyst schema from file metadata.  It's only
-          // available if the target file is written by Spark SQL.
-          .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
-      }.map(StructType.fromString).getOrElse {
-        logInfo("Catalyst schema not available, falling back to Parquet schema")
-        toCatalyst.convert(parquetRequestedSchema)
-      }
-
     logInfo {
       s"""Going to read the following fields from the Parquet file:
          |
@@ -69,36 +97,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
 
     new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
   }
-
-  // Called before `prepareForRead()` when initializing Parquet record reader.
-  override def init(context: InitContext): ReadContext = {
-    val conf = {
-      // scalastyle:off jobcontext
-      context.getConfiguration
-      // scalastyle:on jobcontext
-    }
-
-    // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
-    // schema of this file from its metadata.
-    val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
-
-    // Optional schema of requested columns, in the form of a string serialized from a Catalyst
-    // `StructType` containing all requested columns.
-    val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
-
-    val parquetRequestedSchema =
-      maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
-        val catalystRequestedSchema = StructType.fromString(schemaString)
-        CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
-      }
-
-    val metadata =
-      Map.empty[String, String] ++
-        maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
-        maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
-
-    new ReadContext(parquetRequestedSchema, metadata.asJava)
-  }
 }
 
 private[parquet] object CatalystReadSupport {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org