You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/03/22 04:16:06 UTC

spark git commit: [SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader

Repository: spark
Updated Branches:
  refs/heads/master 729996165 -> 8014a516d


[SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader

This PR add implements the new `buildReader` interface for the Parquet `FileFormat`.  An simple implementation of `FileScanRDD` is also included.

This code should be tested by the many existing tests for parquet.

Author: Michael Armbrust <mi...@databricks.com>
Author: Sameer Agarwal <sa...@databricks.com>
Author: Nong Li <no...@databricks.com>

Closes #11709 from marmbrus/parquetReader.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8014a516
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8014a516
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8014a516

Branch: refs/heads/master
Commit: 8014a516d1cbb0f0c7035e2149161aa32fb506f8
Parents: 7299961
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Mar 21 20:16:01 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Mar 21 20:16:01 2016 -0700

----------------------------------------------------------------------
 .../parquet/VectorizedParquetRecordReader.java  |  82 ++++++++--
 .../sql/execution/vectorized/ColumnVector.java  |  13 ++
 .../sql/execution/vectorized/ColumnarBatch.java |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   1 -
 .../sql/execution/datasources/FileScanRDD.scala |  44 +++++-
 .../datasources/FileSourceStrategy.scala        |  36 +++--
 .../datasources/RecordReaderIterator.scala      |  55 +++++++
 .../datasources/parquet/ParquetRelation.scala   | 148 ++++++++++++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../apache/spark/sql/sources/interfaces.scala   |   9 +-
 .../datasources/parquet/ParquetIOSuite.scala    |   4 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |  10 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   1 +
 13 files changed, 366 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 9db5c41..9ac2513 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -29,8 +29,10 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
 import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.*;
 
 /**
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -52,7 +54,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   private int numBatched = 0;
 
   /**
-   * For each request column, the reader to read this column.
+   * For each request column, the reader to read this column. This is NULL if this column
+   * is missing from the file, in which case we populate the attribute with NULL.
    */
   private VectorizedColumnReader[] columnReaders;
 
@@ -67,6 +70,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   private long totalCountLoadedSoFar = 0;
 
   /**
+   * For each column, true if the column is missing in the file and we'll instead return NULLs.
+   */
+  private boolean[] missingColumns;
+
+  /**
    * columnBatch object that is used for batch decoding. This is created on first use and triggers
    * batched decoding. It is not valid to interleave calls to the batched interface with the row
    * by row RecordReader APIs.
@@ -163,14 +171,53 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    * This object is reused. Calling this enables the vectorized reader. This should be called
    * before any calls to nextKeyValue/nextBatch.
    */
-  public ColumnarBatch resultBatch() {
-    return resultBatch(DEFAULT_MEMORY_MODE);
-  }
 
-  public ColumnarBatch resultBatch(MemoryMode memMode) {
-    if (columnarBatch == null) {
-      columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
+  // Creates a columnar batch that includes the schema from the data files and the additional
+  // partition columns appended to the end of the batch.
+  // For example, if the data contains two columns, with 2 partition columns:
+  // Columns 0,1: data columns
+  // Column 2: partitionValues[0]
+  // Column 3: partitionValues[1]
+  public void initBatch(MemoryMode memMode, StructType partitionColumns,
+                        InternalRow partitionValues) {
+    StructType batchSchema = new StructType();
+    for (StructField f: sparkSchema.fields()) {
+      batchSchema = batchSchema.add(f);
+    }
+    if (partitionColumns != null) {
+      for (StructField f : partitionColumns.fields()) {
+        batchSchema = batchSchema.add(f);
+      }
+    }
+
+    columnarBatch = ColumnarBatch.allocate(batchSchema);
+    if (partitionColumns != null) {
+      int partitionIdx = sparkSchema.fields().length;
+      for (int i = 0; i < partitionColumns.fields().length; i++) {
+        ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
+        columnarBatch.column(i + partitionIdx).setIsConstant();
+      }
+    }
+
+    // Initialize missing columns with nulls.
+    for (int i = 0; i < missingColumns.length; i++) {
+      if (missingColumns[i]) {
+        columnarBatch.column(i).putNulls(0, columnarBatch.capacity());
+        columnarBatch.column(i).setIsConstant();
+      }
     }
+  }
+
+  public void initBatch() {
+    initBatch(DEFAULT_MEMORY_MODE, null, null);
+  }
+
+  public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
+    initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
+  }
+
+  public ColumnarBatch resultBatch() {
+    if (columnarBatch == null) initBatch();
     return columnarBatch;
   }
 
@@ -191,6 +238,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
 
     int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
     for (int i = 0; i < columnReaders.length; ++i) {
+      if (columnReaders[i] == null) continue;
       columnReaders[i].readBatch(num, columnarBatch.column(i));
     }
     rowsReturned += num;
@@ -205,6 +253,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
      * Check that the requested schema is supported.
      */
     OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
+    missingColumns = new boolean[requestedSchema.getFieldCount()];
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
       Type t = requestedSchema.getFields().get(i);
       if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
@@ -223,9 +272,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
       if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
         throw new IOException("Int96 not supported.");
       }
-      ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i));
-      if (!fd.equals(requestedSchema.getColumns().get(i))) {
-        throw new IOException("Schema evolution not supported.");
+      String[] colPath = requestedSchema.getPaths().get(i);
+      if (fileSchema.containsPath(colPath)) {
+        ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
+        if (!fd.equals(requestedSchema.getColumns().get(i))) {
+          throw new IOException("Schema evolution not supported.");
+        }
+        missingColumns[i] = false;
+      } else {
+        if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
+          // Column is missing in data but the required data is non-nullable. This file is invalid.
+          throw new IOException("Required column is missing in data file. Col: " + colPath);
+        }
+        missingColumns[i] = true;
       }
     }
   }
@@ -240,6 +299,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     columnReaders = new VectorizedColumnReader[columns.size()];
     for (int i = 0; i < columns.size(); ++i) {
+      if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i),
           pages.getPageReader(columns.get(i)));
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 13bf4c5..74fa632 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -256,6 +256,8 @@ public abstract class ColumnVector {
    * Resets this column for writing. The currently stored values are no longer accessible.
    */
   public void reset() {
+    if (isConstant) return;
+
     if (childColumns != null) {
       for (ColumnVector c: childColumns) {
         c.reset();
@@ -823,6 +825,11 @@ public abstract class ColumnVector {
   public final boolean isArray() { return resultArray != null; }
 
   /**
+   * Marks this column as being constant.
+   */
+  public final void setIsConstant() { isConstant = true; }
+
+  /**
    * Maximum number of rows that can be stored in this column.
    */
   protected int capacity;
@@ -844,6 +851,12 @@ public abstract class ColumnVector {
   protected boolean anyNullsSet;
 
   /**
+   * True if this column's values are fixed. This means the column values never change, even
+   * across resets.
+   */
+  protected boolean isConstant;
+
+  /**
    * Default size of each array length value. This grows as necessary.
    */
   protected static final int DEFAULT_ARRAY_LENGTH = 4;

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 7ab4cda..792e179 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * This class is the in memory representation of rows as they are streamed through operators. It
  * is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
- * each operator allocates one of thee objects, the storage footprint on the task is negligible.
+ * each operator allocates one of these objects, the storage footprint on the task is negligible.
  *
  * The layout is a columnar with values encoded in their native format. Each RowBatch contains
  * a horizontal partitioning of the data, split into columns.

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6116cce..e2d5f42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -233,7 +233,6 @@ case class DataSource(
             "It must be specified manually")
         }
 
-
         HadoopFsRelation(
           sqlContext,
           fileCatalog,

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index e2cbbc3..bbe7f4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 
@@ -31,13 +31,17 @@ case class PartitionedFile(
     partitionValues: InternalRow,
     filePath: String,
     start: Long,
-    length: Long)
+    length: Long) {
+  override def toString(): String = {
+    s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
+  }
+}
+
 
 /**
  * A collection of files that should be read as a single task possibly from multiple partitioned
  * directories.
  *
- * IMPLEMENT ME: This is just a placeholder for a future implementation.
  * TODO: This currently does not take locality information about the files into account.
  */
 case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
@@ -48,10 +52,38 @@ class FileScanRDD(
     @transient val filePartitions: Seq[FilePartition])
     extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
 
-
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
-    throw new NotImplementedError("Not Implemented Yet")
+    val iterator = new Iterator[Object] with AutoCloseable {
+      private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+      private[this] var currentIterator: Iterator[Object] = null
+
+      def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
+      def next() = currentIterator.next()
+
+      /** Advances to the next file. Returns true if a new non-empty iterator is available. */
+      private def nextIterator(): Boolean = {
+        if (files.hasNext) {
+          val nextFile = files.next()
+          logInfo(s"Reading File $nextFile")
+          SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
+          currentIterator = readFunction(nextFile)
+          hasNext
+        } else {
+          SqlNewHadoopRDDState.unsetInputFileName()
+          false
+        }
+      }
+
+      override def close() = {
+        SqlNewHadoopRDDState.unsetInputFileName()
+      }
+    }
+
+    // Register an on-task-completion callback to close the input stream.
+    context.addTaskCompletionListener(context => iterator.close())
+
+    iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
   }
 
-  override protected def getPartitions: Array[Partition] = Array.empty
+  override protected def getPartitions: Array[Partition] = filePartitions.toArray
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 62576ea..de89d5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -57,7 +57,9 @@ import org.apache.spark.sql.types._
 private[sql] object FileSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
-      if files.fileFormat.toString == "TestFileFormat" =>
+      if (files.fileFormat.toString == "TestFileFormat" ||
+         files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
+         files.sqlContext.conf.parquetFileScan =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:
       //  - partition keys only - used to prune directories to read
@@ -67,12 +69,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       val filterSet = ExpressionSet(filters)
 
       val partitionColumns =
-        AttributeSet(
-          l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver))
+        l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
+      val partitionSet = AttributeSet(partitionColumns)
       val partitionKeyFilters =
-        ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
+        ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
       logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
 
+      val dataColumns =
+        l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
+
       val bucketColumns =
         AttributeSet(
           files.bucketSpec
@@ -82,7 +87,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
                   .getOrElse(sys.error(""))))
 
       // Partition keys are not available in the statistics of the files.
-      val dataFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
+      val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
 
       // Predicates with both partition keys and attributes need to be evaluated after the scan.
       val afterScanFilters = filterSet -- partitionKeyFilters
@@ -92,11 +97,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
       val filterAttributes = AttributeSet(afterScanFilters)
       val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
-      val requiredAttributes = AttributeSet(requiredExpressions).map(_.name).toSet
+      val requiredAttributes = AttributeSet(requiredExpressions)
 
-      val prunedDataSchema =
-        StructType(
-          files.dataSchema.filter(f => requiredAttributes.contains(f.name)))
+      val readDataColumns =
+        dataColumns
+            .filter(requiredAttributes.contains)
+            .filterNot(partitionColumns.contains)
+      val prunedDataSchema = readDataColumns.toStructType
       logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")
 
       val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
@@ -132,7 +139,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
           val splitFiles = selectedPartitions.flatMap { partition =>
             partition.files.flatMap { file =>
-              assert(file.getLen != 0)
+              assert(file.getLen != 0, file.toString)
               (0L to file.getLen by maxSplitBytes).map { offset =>
                 val remaining = file.getLen - offset
                 val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
@@ -180,17 +187,20 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
       val scan =
         DataSourceScan(
-          l.output,
+          readDataColumns ++ partitionColumns,
           new FileScanRDD(
             files.sqlContext,
             readFile,
             plannedPartitions),
           files,
-          Map("format" -> files.fileFormat.toString))
+          Map(
+            "Format" -> files.fileFormat.toString,
+            "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
+            "ReadSchema" -> prunedDataSchema.simpleString))
 
       val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
       val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan)
-      val withProjections = if (projects.forall(_.isInstanceOf[AttributeReference])) {
+      val withProjections = if (projects == withFilter.output) {
         withFilter
       } else {
         execution.Project(projects, withFilter)

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
new file mode 100644
index 0000000..f03ae94
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.mapreduce.RecordReader
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
+ *
+ * Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
+ * column batches by pretending they are rows.
+ */
+class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] {
+  private[this] var havePair = false
+  private[this] var finished = false
+
+  override def hasNext: Boolean = {
+    if (!finished && !havePair) {
+      finished = !rowReader.nextKeyValue
+      if (finished) {
+        // Close and release the reader here; close() will also be called when the task
+        // completes, but for tasks that read from many files, it helps to release the
+        // resources early.
+        rowReader.close()
+      }
+      havePair = !finished
+    }
+    !finished
+  }
+
+  override def next(): T = {
+    if (!hasNext) {
+      throw new java.util.NoSuchElementException("End of stream")
+    }
+    havePair = false
+    rowReader.getCurrentValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 3f0389b..2f2d438 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -24,14 +24,16 @@ import java.util.logging.{Logger => JLogger}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Try}
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.hadoop.mapreduce.task.JobContextImpl
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
 import org.apache.parquet.{Log => ApacheParquetLog}
+import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
@@ -45,16 +47,21 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
-import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.util.collection.BitSet
 
-
-private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Logging {
+private[sql] class DefaultSource
+  extends FileFormat
+  with DataSourceRegister
+  with Logging
+  with Serializable {
 
   override def shortName(): String = "parquet"
 
@@ -269,6 +276,137 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister with
         file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
+   *
+   * @param partitionSchema The schema of the partition column row that will be present in each
+   *                        PartitionedFile.  These columns should be prepended to the rows that
+   *                        are produced by the iterator.
+   * @param dataSchema The schema of the data that should be output for each row.  This may be a
+   *                   subset of the columns that are present in the file if  column pruning has
+   *                   occurred.
+   * @param filters A set of filters than can optionally be used to reduce the number of rows output
+   * @param options A set of string -> string configuration options.
+   * @return
+   */
+  override def buildReader(
+      sqlContext: SQLContext,
+      partitionSchema: StructType,
+      dataSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+    val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+    parquetConf.set(
+      CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+    parquetConf.set(
+      CatalystWriteSupport.SPARK_ROW_SCHEMA,
+      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+
+    // We want to clear this temporary metadata from saving into Parquet file.
+    // This metadata is only useful for detecting optional columns when pushdowning filters.
+    val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
+      dataSchema).asInstanceOf[StructType]
+    CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
+
+    // Sets flags for `CatalystSchemaConverter`
+    parquetConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sqlContext.conf.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
+    parquetConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+      filters
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(ParquetFilters.createFilter(dataSchema, _))
+          .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    val broadcastedConf =
+      sqlContext.sparkContext.broadcast(new SerializableConfiguration(parquetConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val enableVectorizedParquetReader: Boolean =
+      sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
+    val enableWholestageCodegen: Boolean =
+      sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
+
+    (file: PartitionedFile) => {
+      assert(file.partitionValues.numFields == partitionSchema.size)
+
+      val fileSplit =
+        new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          fileSplit.getPath,
+          fileSplit.getStart,
+          fileSplit.getStart + fileSplit.getLength,
+          fileSplit.getLength,
+          fileSplit.getLocations,
+          null)
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+      val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
+
+      val parquetReader = try {
+        if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.")
+        val vectorizedReader = new VectorizedParquetRecordReader()
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+        // TODO: fix column appending
+        if (enableWholestageCodegen) {
+          logDebug(s"Enabling batch returning")
+          vectorizedReader.enableReturningBatches()
+        }
+        vectorizedReader
+      } catch {
+        case NonFatal(e) =>
+          logDebug(s"Falling back to parquet-mr: $e", e)
+          val reader = pushed match {
+            case Some(filter) =>
+              new ParquetRecordReader[InternalRow](
+                new CatalystReadSupport,
+                FilterCompat.get(filter, null))
+            case _ =>
+              new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+          }
+          reader.initialize(split, hadoopAttemptContext)
+          reader
+      }
+
+      val iter = new RecordReaderIterator(parquetReader)
+
+      // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+      if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
+          enableVectorizedParquetReader) {
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } else {
+        val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+        val joinedRow = new JoinedRow()
+        val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+        // This is a horrible erasure hack...  if we type the iterator above, then it actually check
+        // the type in next() and we get a class cast exception.  If we make that function return
+        // Object, then we can defer the cast until later!
+        iter.asInstanceOf[Iterator[InternalRow]]
+            .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+      }
+    }
+  }
+
   override def buildInternalScan(
       sqlContext: SQLContext,
       dataSchema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3d1d5b1..61058ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -288,6 +288,11 @@ object SQLConf {
     defaultValue = Some(true),
     doc = "Whether the query analyzer should be case sensitive or not.")
 
+  val PARQUET_FILE_SCAN = booleanConf("spark.sql.parquet.fileScan",
+    defaultValue = Some(true),
+    doc = "Use the new FileScanRDD path for reading parquet data.",
+    isPublic = false)
+
   val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
     defaultValue = Some(false),
     doc = "When true, the Parquet data source merges schemas collected from all data files, " +
@@ -555,6 +560,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
 
   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
 
+  def parquetFileScan: Boolean = getConf(PARQUET_FILE_SCAN)
+
   def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6101b08..1e02354 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -415,7 +415,7 @@ case class HadoopFsRelation(
   def refresh(): Unit = location.refresh()
 
   override def toString: String =
-    s"$fileFormat part: ${partitionSchema.simpleString}, data: ${dataSchema.simpleString}"
+    s"HadoopFiles"
 
   /** Returns the list of files that will be read when scanning this relation. */
   override def inputFiles: Array[String] =
@@ -551,10 +551,13 @@ class HDFSFileCatalog(
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
     if (partitionSpec().partitionColumns.isEmpty) {
-      Partition(InternalRow.empty, allFiles()) :: Nil
+      Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
     } else {
       prunePartitions(filters, partitionSpec()).map {
-        case PartitionDirectory(values, path) => Partition(values, getStatus(path))
+        case PartitionDirectory(values, path) =>
+          Partition(
+            values,
+            getStatus(path).filterNot(_.getPath.getName startsWith "_"))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index ebdb105..9746187 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -445,7 +445,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-6352 DirectParquetOutputCommitter") {
+  testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
     val clonedConf = new Configuration(hadoopConfiguration)
 
     // Write to a parquet file and let it fail.
@@ -469,7 +469,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
+  testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatiblity") {
     val clonedConf = new Configuration(hadoopConfiguration)
 
     // Write to a parquet file and let it fail.

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 2bce745..926fabe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.Filter
 import org.apache.spark.util.Utils
 
 /**
@@ -204,10 +205,11 @@ private[sql] trait SQLTestUtils
    */
   protected def stripSparkFilter(df: DataFrame): DataFrame = {
     val schema = df.schema
-    val childRDD = df
-      .queryExecution
-      .sparkPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
-      .child
+    val withoutFilters = df.queryExecution.sparkPlan transform {
+      case Filter(_, child) => child
+    }
+
+    val childRDD = withoutFilters
       .execute()
       .map(row => Row.fromSeq(row.copy().toSeq(schema)))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8014a516/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2806b87..bc8896d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1724,6 +1724,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     withTable("tbl10562") {
       val df = Seq(2012 -> "a").toDF("Year", "val")
       df.write.partitionBy("Year").saveAsTable("tbl10562")
+      checkAnswer(sql("SELECT year FROM tbl10562"), Row(2012))
       checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012))
       checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012))
       checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org