You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/17 06:16:11 UTC

spark git commit: [SPARK-4798][SQL] A new set of Parquet testing API and test suites

Repository: spark
Updated Branches:
  refs/heads/master b85044ecf -> 3b395e105


[SPARK-4798][SQL] A new set of Parquet testing API and test suites

This PR provides a set Parquet testing API (see trait `ParquetTest`) that enables developers to write more concise test cases. A new set of Parquet test suites built upon this API  are added and aim to replace the old `ParquetQuerySuite`. To avoid potential merge conflicts, old testing code are not removed yet. The following classes can be safely removed after most Parquet related PRs are handled:

- `ParquetQuerySuite`
- `ParquetTestData`

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3644)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #3644 from liancheng/parquet-tests and squashes the following commits:

800e745 [Cheng Lian] Enforces ordering of test output
3bb8731 [Cheng Lian] Refactors HiveParquetSuite
aa2cb2e [Cheng Lian] Decouples ParquetTest and TestSQLContext
7b43a68 [Cheng Lian] Updates ParquetTest Scaladoc
7f07af0 [Cheng Lian] Adds a new set of Parquet test suites


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b395e10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b395e10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b395e10

Branch: refs/heads/master
Commit: 3b395e10510782474789c9098084503f98ca4830
Parents: b85044e
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Dec 16 21:16:03 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Dec 16 21:16:03 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |   4 +
 .../apache/spark/sql/parquet/ParquetTest.scala  | 127 ++++++++
 .../spark/sql/parquet/ParquetFilterSuite.scala  | 253 ++++++++++++++++
 .../spark/sql/parquet/ParquetIOSuite.scala      | 287 +++++++++++++++++++
 .../spark/sql/parquet/ParquetQuerySuite2.scala  | 110 +++++++
 .../spark/sql/parquet/ParquetSchemaSuite.scala  | 164 +++++++++++
 .../spark/sql/parquet/HiveParquetSuite.scala    | 119 +++-----
 .../spark/sql/parquet/parquetSuites.scala       |   6 +-
 8 files changed, 989 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f5abf71..f5bf935 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -188,6 +188,10 @@ private[sql] trait SQLConf {
    */
   def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap }
 
+  private[spark] def unsetConf(key: String) {
+    settings -= key
+  }
+
   private[spark] def clear() {
     settings.clear()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
new file mode 100644
index 0000000..b4d4890
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+import scala.util.Try
+
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
+import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
+
+/**
+ * A helper trait that provides convenient facilities for Parquet testing.
+ *
+ * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more
+ * convenient to use tuples rather than special case classes when writing test cases/suites.
+ * Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
+ */
+trait ParquetTest {
+  val sqlContext: SQLContext
+
+  import sqlContext._
+
+  protected def configuration = sparkContext.hadoopConfiguration
+
+  /**
+   * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
+   * configurations.
+   *
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map(key => Try(getConf(key)).toOption)
+    (keys, values).zipped.foreach(setConf)
+    try f finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => setConf(key, value)
+        case (key, None) => unsetConf(key)
+      }
+    }
+  }
+
+  /**
+   * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
+   * a file/directory is created there by `f`, it will be delete after `f` returns.
+   *
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withTempPath(f: File => Unit): Unit = {
+    val file = util.getTempFilePath("parquetTest").getCanonicalFile
+    try f(file) finally if (file.exists()) Utils.deleteRecursively(file)
+  }
+
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+   * returns.
+   *
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withTempDir(f: File => Unit): Unit = {
+    val dir = Utils.createTempDir().getCanonicalFile
+    try f(dir) finally Utils.deleteRecursively(dir)
+  }
+
+  /**
+   * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
+   * returns.
+   */
+  protected def withParquetFile[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T])
+      (f: String => Unit): Unit = {
+    withTempPath { file =>
+      sparkContext.parallelize(data).saveAsParquetFile(file.getCanonicalPath)
+      f(file.getCanonicalPath)
+    }
+  }
+
+  /**
+   * Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`.
+   * The Parquet file will be deleted after `f` returns.
+   */
+  protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T])
+      (f: SchemaRDD => Unit): Unit = {
+    withParquetFile(data)(path => f(parquetFile(path)))
+  }
+
+  /**
+   * Drops temporary table `tableName` after calling `f`.
+   */
+  protected def withTempTable(tableName: String)(f: => Unit): Unit = {
+    try f finally dropTempTable(tableName)
+  }
+
+  /**
+   * Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary
+   * table named `tableName`, then call `f`. The temporary table together with the Parquet file will
+   * be dropped/deleted after `f` returns.
+   */
+  protected def withParquetTable[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T], tableName: String)
+      (f: => Unit): Unit = {
+    withParquetRDD(data) { rdd =>
+      rdd.registerTempTable(tableName)
+      withTempTable(tableName)(f)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
new file mode 100644
index 0000000..111a459
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import parquet.filter2.predicate.Operators._
+import parquet.filter2.predicate.{FilterPredicate, Operators}
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
+
+/**
+ * A test suite that tests Parquet filter2 API based filter pushdown optimization.
+ *
+ * Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the
+ * `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)`
+ * results a `GtEq` filter predicate rather than a `Not`.
+ *
+ * @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367
+ */
+class ParquetFilterSuite extends QueryTest with ParquetTest {
+  val sqlContext = TestSQLContext
+
+  private def checkFilterPushdown(
+      rdd: SchemaRDD,
+      output: Seq[Symbol],
+      predicate: Predicate,
+      filterClass: Class[_ <: FilterPredicate],
+      checker: (SchemaRDD, Any) => Unit,
+      expectedResult: => Any): Unit = {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+      val query = rdd.select(output.map(_.attr): _*).where(predicate)
+
+      val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
+        case plan: ParquetTableScan => plan.columnPruningPred
+      }.flatten.reduceOption(_ && _)
+
+      assert(maybeAnalyzedPredicate.isDefined)
+      maybeAnalyzedPredicate.foreach { pred =>
+        val maybeFilter = ParquetFilters.createFilter(pred)
+        assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
+        maybeFilter.foreach(f => assert(f.getClass === filterClass))
+      }
+
+      checker(query, expectedResult)
+    }
+  }
+
+  private def checkFilterPushdown
+      (rdd: SchemaRDD, output: Symbol*)
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
+      (expectedResult: => Any): Unit = {
+    checkFilterPushdown(rdd, output, predicate, filterClass, checkAnswer _, expectedResult)
+  }
+
+  def checkBinaryFilterPushdown
+      (rdd: SchemaRDD, output: Symbol*)
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
+      (expectedResult: => Any): Unit = {
+    def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
+      val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
+      val expected = result match {
+        case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
+        case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
+      }
+      assert(actual.sorted === expected.sorted)
+    }
+    checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
+  }
+
+  test("filter pushdown - boolean") {
+    withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd =>
+      checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true)
+      checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.Not])(false)
+    }
+  }
+
+  test("filter pushdown - integer") {
+    withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd =>
+      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+        (2 to 4).map(Row.apply(_))
+      }
+
+      checkFilterPushdown(rdd, '_1)('_1 < 2,  classOf[Lt  [Integer]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 > 3,  classOf[Gt  [Integer]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)
+
+      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq  [Integer]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [Integer]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [Integer]])(4)
+      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[Integer]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[Integer]])(4)
+
+      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+      checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+        Seq(Row(1), Row(4))
+      }
+    }
+  }
+
+  test("filter pushdown - long") {
+    withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toLong))) { rdd =>
+      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+        (2 to 4).map(Row.apply(_))
+      }
+
+      checkFilterPushdown(rdd, '_1)('_1 <  2, classOf[Lt  [java.lang.Long]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >  3, classOf[Gt  [java.lang.Long]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)
+
+      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq  [Integer]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [java.lang.Long]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [java.lang.Long]])(4)
+      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[java.lang.Long]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[java.lang.Long]])(4)
+
+      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+      checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+        Seq(Row(1), Row(4))
+      }
+    }
+  }
+
+  test("filter pushdown - float") {
+    withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toFloat))) { rdd =>
+      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+        (2 to 4).map(Row.apply(_))
+      }
+
+      checkFilterPushdown(rdd, '_1)('_1 <  2, classOf[Lt  [java.lang.Float]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >  3, classOf[Gt  [java.lang.Float]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)
+
+      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq  [Integer]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [java.lang.Float]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [java.lang.Float]])(4)
+      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[java.lang.Float]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[java.lang.Float]])(4)
+
+      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+      checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+        Seq(Row(1), Row(4))
+      }
+    }
+  }
+
+  test("filter pushdown - double") {
+    withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toDouble))) { rdd =>
+      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
+        (2 to 4).map(Row.apply(_))
+      }
+
+      checkFilterPushdown(rdd, '_1)('_1 <  2, classOf[Lt  [java.lang.Double]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >  3, classOf[Gt  [java.lang.Double]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
+      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)
+
+      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [java.lang.Double]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [java.lang.Double]])(4)
+      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[java.lang.Double]])(1)
+      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[java.lang.Double]])(4)
+
+      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
+      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
+      checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
+        Seq(Row(1), Row(4))
+      }
+    }
+  }
+
+  test("filter pushdown - string") {
+    withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toString))) { rdd =>
+      checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1")
+      checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.Not]) {
+        (2 to 4).map(i => Row.apply(i.toString))
+      }
+
+      checkFilterPushdown(rdd, '_1)('_1 <  "2", classOf[Lt  [java.lang.String]])("1")
+      checkFilterPushdown(rdd, '_1)('_1 >  "3", classOf[Gt  [java.lang.String]])("4")
+      checkFilterPushdown(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])("1")
+      checkFilterPushdown(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])("4")
+
+      checkFilterPushdown(rdd, '_1)(Literal("1") === '_1, classOf[Eq  [java.lang.String]])("1")
+      checkFilterPushdown(rdd, '_1)(Literal("2") >   '_1, classOf[Lt  [java.lang.String]])("1")
+      checkFilterPushdown(rdd, '_1)(Literal("3") <   '_1, classOf[Gt  [java.lang.String]])("4")
+      checkFilterPushdown(rdd, '_1)(Literal("1") >=  '_1, classOf[LtEq[java.lang.String]])("1")
+      checkFilterPushdown(rdd, '_1)(Literal("4") <=  '_1, classOf[GtEq[java.lang.String]])("4")
+
+      checkFilterPushdown(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])("4")
+      checkFilterPushdown(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])("3")
+      checkFilterPushdown(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
+        Seq(Row("1"), Row("4"))
+      }
+    }
+  }
+
+  test("filter pushdown - binary") {
+    implicit class IntToBinary(int: Int) {
+      def b: Array[Byte] = int.toString.getBytes("UTF-8")
+    }
+
+    withParquetRDD((1 to 4).map(i => Tuple1.apply(i.b))) { rdd =>
+      checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
+      checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.Not]) {
+        (2 to 4).map(i => Row.apply(i.b)).toSeq
+      }
+
+      checkBinaryFilterPushdown(rdd, '_1)('_1 <  2.b, classOf[Lt  [Array[Byte]]])(1.b)
+      checkBinaryFilterPushdown(rdd, '_1)('_1 >  3.b, classOf[Gt  [Array[Byte]]])(4.b)
+      checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
+      checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)
+
+      checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq  [Array[Byte]]])(1.b)
+      checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) >   '_1, classOf[Lt  [Array[Byte]]])(1.b)
+      checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) <   '_1, classOf[Gt  [Array[Byte]]])(4.b)
+      checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >=  '_1, classOf[LtEq[Array[Byte]]])(1.b)
+      checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <=  '_1, classOf[GtEq[Array[Byte]]])(4.b)
+
+      checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
+      checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
+      checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
+        Seq(Row(1.b), Row(4.b))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
new file mode 100644
index 0000000..10a0147
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import parquet.example.data.simple.SimpleGroup
+import parquet.example.data.{Group, GroupWriter}
+import parquet.hadoop.api.WriteSupport
+import parquet.hadoop.api.WriteSupport.WriteContext
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
+import parquet.io.api.RecordConsumer
+import parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types.DecimalType
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
+
+// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
+// with an empty configuration (it is after all not intended to be used in this way?)
+// and members are private so we need to make our own in order to pass the schema
+// to the writer.
+private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] {
+  var groupWriter: GroupWriter = null
+
+  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+    groupWriter = new GroupWriter(recordConsumer, schema)
+  }
+
+  override def init(configuration: Configuration): WriteContext = {
+    new WriteContext(schema, new java.util.HashMap[String, String]())
+  }
+
+  override def write(record: Group) {
+    groupWriter.write(record)
+  }
+}
+
+/**
+ * A test suite that tests basic Parquet I/O.
+ */
+class ParquetIOSuite extends QueryTest with ParquetTest {
+  val sqlContext = TestSQLContext
+
+  /**
+   * Writes `data` to a Parquet file, reads it back and check file contents.
+   */
+  protected def checkParquetFile[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = {
+    withParquetRDD(data)(checkAnswer(_, data))
+  }
+
+  test("basic data types (without binary)") {
+    val data = (1 to 4).map { i =>
+      (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+    }
+    checkParquetFile(data)
+  }
+
+  test("raw binary") {
+    val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
+    withParquetRDD(data) { rdd =>
+      assertResult(data.map(_._1.mkString(",")).sorted) {
+        rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
+      }
+    }
+  }
+
+  test("string") {
+    val data = (1 to 4).map(i => Tuple1(i.toString))
+    // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
+    // as we store Spark SQL schema in the extra metadata.
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data))
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data))
+  }
+
+  test("fixed-length decimals") {
+    def makeDecimalRDD(decimal: DecimalType): SchemaRDD =
+      sparkContext
+        .parallelize(0 to 1000)
+        .map(i => Tuple1(i / 100.0))
+        .select('_1 cast decimal)
+
+    for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
+      withTempPath { dir =>
+        val data = makeDecimalRDD(DecimalType(precision, scale))
+        data.saveAsParquetFile(dir.getCanonicalPath)
+        checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+      }
+    }
+
+    // Decimals with precision above 18 are not yet supported
+    intercept[RuntimeException] {
+      withTempPath { dir =>
+        makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
+        parquetFile(dir.getCanonicalPath).collect()
+      }
+    }
+
+    // Unlimited-length decimals are not yet supported
+    intercept[RuntimeException] {
+      withTempPath { dir =>
+        makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
+        parquetFile(dir.getCanonicalPath).collect()
+      }
+    }
+  }
+
+  test("map") {
+    val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
+    checkParquetFile(data)
+  }
+
+  test("array") {
+    val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
+    checkParquetFile(data)
+  }
+
+  test("struct") {
+    val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
+    withParquetRDD(data) { rdd =>
+      // Structs are converted to `Row`s
+      checkAnswer(rdd, data.map { case Tuple1(struct) =>
+        Tuple1(Row(struct.productIterator.toSeq: _*))
+      })
+    }
+  }
+
+  test("nested struct with array of array as field") {
+    val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
+    withParquetRDD(data) { rdd =>
+      // Structs are converted to `Row`s
+      checkAnswer(rdd, data.map { case Tuple1(struct) =>
+        Tuple1(Row(struct.productIterator.toSeq: _*))
+      })
+    }
+  }
+
+  test("nested map with struct as value type") {
+    val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
+    withParquetRDD(data) { rdd =>
+      checkAnswer(rdd, data.map { case Tuple1(m) =>
+        Tuple1(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+      })
+    }
+  }
+
+  test("nulls") {
+    val allNulls = (
+      null.asInstanceOf[java.lang.Boolean],
+      null.asInstanceOf[Integer],
+      null.asInstanceOf[java.lang.Long],
+      null.asInstanceOf[java.lang.Float],
+      null.asInstanceOf[java.lang.Double])
+
+    withParquetRDD(allNulls :: Nil) { rdd =>
+      val rows = rdd.collect()
+      assert(rows.size === 1)
+      assert(rows.head === Row(Seq.fill(5)(null): _*))
+    }
+  }
+
+  test("nones") {
+    val allNones = (
+      None.asInstanceOf[Option[Int]],
+      None.asInstanceOf[Option[Long]],
+      None.asInstanceOf[Option[String]])
+
+    withParquetRDD(allNones :: Nil) { rdd =>
+      val rows = rdd.collect()
+      assert(rows.size === 1)
+      assert(rows.head === Row(Seq.fill(3)(null): _*))
+    }
+  }
+
+  test("compression codec") {
+    def compressionCodecFor(path: String) = {
+      val codecs = ParquetTypesConverter
+        .readMetaData(new Path(path), Some(configuration))
+        .getBlocks
+        .flatMap(_.getColumns)
+        .map(_.getCodec.name())
+        .distinct
+
+      assert(codecs.size === 1)
+      codecs.head
+    }
+
+    val data = (0 until 10).map(i => (i, i.toString))
+
+    def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+      withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+        withParquetFile(data) { path =>
+          assertResult(parquetCompressionCodec.toUpperCase) {
+            compressionCodecFor(path)
+          }
+        }
+      }
+    }
+
+    // Checks default compression codec
+    checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec))
+
+    checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+    checkCompressionCodec(CompressionCodecName.GZIP)
+    checkCompressionCodec(CompressionCodecName.SNAPPY)
+  }
+
+  test("read raw Parquet file") {
+    def makeRawParquetFile(path: Path): Unit = {
+      val schema = MessageTypeParser.parseMessageType(
+        """
+          |message root {
+          |  required boolean _1;
+          |  required int32   _2;
+          |  required int64   _3;
+          |  required float   _4;
+          |  required double  _5;
+          |}
+        """.stripMargin)
+
+      val writeSupport = new TestGroupWriteSupport(schema)
+      val writer = new ParquetWriter[Group](path, writeSupport)
+
+      (0 until 10).foreach { i =>
+        val record = new SimpleGroup(schema)
+        record.add(0, i % 2 == 0)
+        record.add(1, i)
+        record.add(2, i.toLong)
+        record.add(3, i.toFloat)
+        record.add(4, i.toDouble)
+        writer.write(record)
+      }
+
+      writer.close()
+    }
+
+    withTempDir { dir =>
+      val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+      makeRawParquetFile(path)
+      checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
+        (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+      })
+    }
+  }
+
+  test("write metadata") {
+    withTempPath { file =>
+      val path = new Path(file.toURI.toString)
+      val fs = FileSystem.getLocal(configuration)
+      val attributes = ScalaReflection.attributesFor[(Int, String)]
+      ParquetTypesConverter.writeMetaData(attributes, path, configuration)
+
+      assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
+      assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
+
+      val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration))
+      val actualSchema = metaData.getFileMetaData.getSchema
+      val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+
+      actualSchema.checkContains(expectedSchema)
+      expectedSchema.checkContains(actualSchema)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
new file mode 100644
index 0000000..daa7ca6
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
+
+/**
+ * A test suite that tests various Parquet queries.
+ */
+class ParquetQuerySuite2 extends QueryTest with ParquetTest {
+  val sqlContext = TestSQLContext
+
+  test("simple projection") {
+    withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+    }
+  }
+
+  test("insertion") {
+    withTempDir { dir =>
+      val data = (0 until 10).map(i => (i, i.toString))
+      withParquetTable(data, "t") {
+        createParquetFile[(Int, String)](dir.toString).registerTempTable("dest")
+        withTempTable("dest") {
+          sql("INSERT OVERWRITE INTO dest SELECT * FROM t")
+          checkAnswer(table("dest"), data)
+        }
+      }
+    }
+  }
+
+  test("appending") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    withParquetTable(data, "t") {
+      sql("INSERT INTO t SELECT * FROM t")
+      checkAnswer(table("t"), data ++ data)
+    }
+  }
+
+  test("self-join") {
+    // 4 rows, cells of column 1 of row 2 and row 4 are null
+    val data = (1 to 4).map { i =>
+      val maybeInt = if (i % 2 == 0) None else Some(i)
+      (maybeInt, i.toString)
+    }
+
+    withParquetTable(data, "t") {
+      val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+      val queryOutput = selfJoin.queryExecution.analyzed.output
+
+      assertResult(4, s"Field count mismatches")(queryOutput.size)
+      assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
+        queryOutput.filter(_.name == "_1").map(_.exprId).size
+      }
+
+      checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+    }
+  }
+
+  test("nested data - struct with array field") {
+    val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+        case Tuple1((_, Seq(string))) => Row(string)
+      })
+    }
+  }
+
+  test("nested data - array of struct") {
+    val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+        case Tuple1(Seq((_, string))) => Row(string)
+      })
+    }
+  }
+
+  test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
+    withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+      checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
+    }
+  }
+
+  test("SPARK-3536 regression: query empty Parquet file shouldn't throw") {
+    withTempDir { dir =>
+      createParquetFile[(Int, String)](dir.toString).registerTempTable("t")
+      withTempTable("t") {
+        checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
new file mode 100644
index 0000000..34d61bf
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.scalatest.FunSuite
+import parquet.schema.MessageTypeParser
+
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.types.{BinaryType, IntegerType, StructField, StructType}
+import org.apache.spark.sql.test.TestSQLContext
+
+class ParquetSchemaSuite extends FunSuite with ParquetTest {
+  val sqlContext = TestSQLContext
+
+  /**
+   * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
+   */
+  private def testSchema[T <: Product: ClassTag: TypeTag](
+      testName: String, messageType: String): Unit = {
+    test(testName) {
+      val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T])
+      val expected = MessageTypeParser.parseMessageType(messageType)
+      actual.checkContains(expected)
+      expected.checkContains(actual)
+    }
+  }
+
+  testSchema[(Boolean, Int, Long, Float, Double, Array[Byte])](
+    "basic types",
+    """
+      |message root {
+      |  required boolean _1;
+      |  required int32   _2;
+      |  required int64   _3;
+      |  required float   _4;
+      |  required double  _5;
+      |  optional binary  _6;
+      |}
+    """.stripMargin)
+
+  testSchema[(Byte, Short, Int, Long)](
+    "logical integral types",
+    """
+      |message root {
+      |  required int32 _1 (INT_8);
+      |  required int32 _2 (INT_16);
+      |  required int32 _3 (INT_32);
+      |  required int64 _4 (INT_64);
+      |}
+    """.stripMargin)
+
+  // Currently String is the only supported logical binary type.
+  testSchema[Tuple1[String]](
+    "binary logical types",
+    """
+      |message root {
+      |  optional binary _1 (UTF8);
+      |}
+    """.stripMargin)
+
+  testSchema[Tuple1[Seq[Int]]](
+    "array",
+    """
+      |message root {
+      |  optional group _1 (LIST) {
+      |    repeated int32 array;
+      |  }
+      |}
+    """.stripMargin)
+
+  testSchema[Tuple1[Map[Int, String]]](
+    "map",
+    """
+      |message root {
+      |  optional group _1 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      optional binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testSchema[Tuple1[Pair[Int, String]]](
+    "struct",
+    """
+      |message root {
+      |  optional group _1 {
+      |    required int32 _1;
+      |    optional binary _2 (UTF8);
+      |  }
+      |}
+    """.stripMargin)
+
+  testSchema[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
+    "deeply nested type",
+    """
+      |message root {
+      |  optional group _1 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      optional group value {
+      |        optional binary _1 (UTF8);
+      |        optional group _2 (LIST) {
+      |          repeated group bag {
+      |            optional group array {
+      |              required int32 _1;
+      |              required double _2;
+      |            }
+      |          }
+      |        }
+      |      }
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testSchema[(Option[Int], Map[Int, Option[Double]])](
+    "optional types",
+    """
+      |message root {
+      |  optional int32 _1;
+      |  optional group _2 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      optional double value;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  test("DataType string parser compatibility") {
+    val schema = StructType(List(
+      StructField("c1", IntegerType, false),
+      StructField("c2", BinaryType, true)))
+
+    val fromCaseClassString = ParquetTypesConverter.convertFromString(schema.toString)
+    val fromJson = ParquetTypesConverter.convertFromString(schema.json)
+
+    (fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
+      assert(a.name == b.name)
+      assert(a.dataType === b.dataType)
+      assert(a.nullable === b.nullable)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 6f57fe8..4bc14ba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,103 +17,66 @@
 
 package org.apache.spark.sql.parquet
 
-import java.io.File
-
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
-import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType}
-import org.apache.spark.sql.{parquet, SchemaRDD}
-import org.apache.spark.util.Utils
-
-// Implicits
-import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.hive.test.TestHive
 
 case class Cases(lower: String, UPPER: String)
 
-class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
-
-  val dirname = Utils.createTempDir()
-
-  var testRDD: SchemaRDD = null
-
-  override def beforeAll() {
-    // write test data
-    ParquetTestData.writeFile()
-    testRDD = parquetFile(ParquetTestData.testDir.toString)
-    testRDD.registerTempTable("testsource")
-  }
-
-  override def afterAll() {
-    Utils.deleteRecursively(ParquetTestData.testDir)
-    Utils.deleteRecursively(dirname)
-    reset() // drop all tables that were registered as part of the tests
-  }
-
-  // in case tests are failing we delete before and after each test
-  override def beforeEach() {
-    Utils.deleteRecursively(dirname)
-  }
+class HiveParquetSuite extends QueryTest with ParquetTest {
+  val sqlContext = TestHive
 
-  override def afterEach() {
-    Utils.deleteRecursively(dirname)
-  }
+  import sqlContext._
 
   test("Case insensitive attribute names") {
-    val tempFile = File.createTempFile("parquet", "")
-    tempFile.delete()
-    sparkContext.parallelize(1 to 10)
-      .map(_.toString)
-      .map(i => Cases(i, i))
-      .saveAsParquetFile(tempFile.getCanonicalPath)
-
-    parquetFile(tempFile.getCanonicalPath).registerTempTable("cases")
-    sql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
-    sql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+    withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
+      val expected = (1 to 4).map(i => Row(i.toString))
+      checkAnswer(sql("SELECT upper FROM cases"), expected)
+      checkAnswer(sql("SELECT LOWER FROM cases"), expected)
+    }
   }
 
   test("SELECT on Parquet table") {
-    val rdd = sql("SELECT * FROM testsource").collect()
-    assert(rdd != null)
-    assert(rdd.forall(_.size == 6))
+    val data = (1 to 4).map(i => (i, s"val_$i"))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT * FROM t"), data)
+    }
   }
 
   test("Simple column projection + filter on Parquet table") {
-    val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect()
-    assert(rdd.size === 5, "Filter returned incorrect number of rows")
-    assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
+    withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+      checkAnswer(
+        sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+        Seq(Row(true, "val_2"), Row(true, "val_4")))
+    }
   }
 
   test("Converting Hive to Parquet Table via saveAsParquetFile") {
-    sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath)
-    parquetFile(dirname.getAbsolutePath).registerTempTable("ptable")
-    val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0))
-    val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0))
-
-    compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
+    withTempPath { dir =>
+      sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
+      parquetFile(dir.getCanonicalPath).registerTempTable("p")
+      withTempTable("p") {
+        checkAnswer(
+          sql("SELECT * FROM src ORDER BY key"),
+          sql("SELECT * from p ORDER BY key").collect().toSeq)
+      }
+    }
   }
 
-  test("INSERT OVERWRITE TABLE Parquet table") {
-    sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath)
-    parquetFile(dirname.getAbsolutePath).registerTempTable("ptable")
-    // let's do three overwrites for good measure
-    sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
-    sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
-    sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
-    val rddCopy = sql("SELECT * FROM ptable").collect()
-    val rddOrig = sql("SELECT * FROM testsource").collect()
-    assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??")
-    compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
-  }
 
-  private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
-    var counter = 0
-    (rddOne, rddTwo).zipped.foreach {
-      (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
-        case ((value_1, value_2), index) =>
-          assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
+  test("INSERT OVERWRITE TABLE Parquet table") {
+    withParquetTable((1 to 4).map(i => (i, s"val_$i")), "t") {
+      withTempPath { file =>
+        sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath)
+        parquetFile(file.getCanonicalPath).registerTempTable("p")
+        withTempTable("p") {
+          // let's do three overwrites for good measure
+          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+          checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
+        }
       }
-    counter = counter + 1
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b395e10/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 488ebba..fc0e42c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -37,7 +37,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
  * A suite to test the automatic conversion of metastore tables with parquet data to use the
  * built in parquet support.
  */
-class ParquetMetastoreSuite extends ParquetTest {
+class ParquetMetastoreSuite extends ParquetPartitioningTest {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
@@ -112,7 +112,7 @@ class ParquetMetastoreSuite extends ParquetTest {
 /**
  * A suite of tests for the Parquet support through the data sources API.
  */
-class ParquetSourceSuite extends ParquetTest {
+class ParquetSourceSuite extends ParquetPartitioningTest {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
@@ -145,7 +145,7 @@ class ParquetSourceSuite extends ParquetTest {
 /**
  * A collection of tests for parquet data with various forms of partitioning.
  */
-abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
+abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll {
   var partitionedTableDir: File = null
   var partitionedTableDirWithKey: File = null
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org