You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/21 23:38:12 UTC

spark git commit: [SQL] [Minor] Remove deprecated parquet tests

Repository: spark
Updated Branches:
  refs/heads/master b328ac6c8 -> ba19689fe


[SQL] [Minor] Remove deprecated parquet tests

This PR removes the deprecated `ParquetQuerySuite`, renamed `ParquetQuerySuite2` to `ParquetQuerySuite`, and refactored changes introduced in #4115 to `ParquetFilterSuite` . It is a follow-up of #3644.

Notice that test cases in the old `ParquetQuerySuite` have already been well covered by other test suites introduced in #3644.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4116)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #4116 from liancheng/remove-deprecated-parquet-tests and squashes the following commits:

f73b8f9 [Cheng Lian] Removes deprecated Parquet test suite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba19689f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba19689f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba19689f

Branch: refs/heads/master
Commit: ba19689fe77b90052b587640c9ff325c5a892c20
Parents: b328ac6
Author: Cheng Lian <li...@databricks.com>
Authored: Wed Jan 21 14:38:10 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 21 14:38:10 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetFilterSuite.scala  |  373 +++----
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 1040 +-----------------
 .../spark/sql/parquet/ParquetQuerySuite2.scala  |   88 --
 3 files changed, 212 insertions(+), 1289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba19689f/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 4ad8c47..1e7d3e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -21,7 +21,7 @@ import parquet.filter2.predicate.Operators._
 import parquet.filter2.predicate.{FilterPredicate, Operators}
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, Predicate, Row}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
 
@@ -40,15 +40,16 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
 class ParquetFilterSuite extends QueryTest with ParquetTest {
   val sqlContext = TestSQLContext
 
-  private def checkFilterPushdown(
+  private def checkFilterPredicate(
       rdd: SchemaRDD,
-      output: Seq[Symbol],
       predicate: Predicate,
       filterClass: Class[_ <: FilterPredicate],
-      checker: (SchemaRDD, Any) => Unit,
-      expectedResult: Any): Unit = {
+      checker: (SchemaRDD, Seq[Row]) => Unit,
+      expected: Seq[Row]): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
-      val query = rdd.select(output.map(_.attr): _*).where(predicate)
+      val query = rdd.select(output: _*).where(predicate)
 
       val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
         case plan: ParquetTableScan => plan.columnPruningPred
@@ -58,209 +59,180 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
       maybeAnalyzedPredicate.foreach { pred =>
         val maybeFilter = ParquetFilters.createFilter(pred)
         assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
-        maybeFilter.foreach(f => assert(f.getClass === filterClass))
+        maybeFilter.foreach { f =>
+          // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+          assert(f.getClass === filterClass)
+        }
       }
 
-      checker(query, expectedResult)
+      checker(query, expected)
     }
   }
 
-  private def checkFilterPushdown1
-      (rdd: SchemaRDD, output: Symbol*)
-      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
-      (expectedResult: => Seq[Row]): Unit = {
-    checkFilterPushdown(rdd, output, predicate, filterClass,
-      (query, expected) => checkAnswer(query, expected.asInstanceOf[Seq[Row]]), expectedResult)
-  }
-
-  private def checkFilterPushdown
-    (rdd: SchemaRDD, output: Symbol*)
-      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
-      (expectedResult: Int): Unit = {
-    checkFilterPushdown(rdd, output, predicate, filterClass,
-      (query, expected) => checkAnswer(query, expected.asInstanceOf[Seq[Row]]), Seq(Row(expectedResult)))
+  private def checkFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
+      (implicit rdd: SchemaRDD): Unit = {
+    checkFilterPredicate(rdd, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected)
   }
 
-  def checkBinaryFilterPushdown
-      (rdd: SchemaRDD, output: Symbol*)
-      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
-      (expectedResult: => Any): Unit = {
-    def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
-      val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
-      val expected = result match {
-        case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
-        case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
-      }
-      assert(actual.sorted === expected.sorted)
-    }
-    checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
+  private def checkFilterPredicate[T]
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T)
+      (implicit rdd: SchemaRDD): Unit = {
+    checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
   }
 
   test("filter pushdown - boolean") {
-    withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { rdd =>
-      checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Boolean]])(Seq.empty[Row])
-      checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Boolean]]) {
-        Seq(Row(true), Row(false))
-      }
+    withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd =>
+      checkFilterPredicate('_1.isNull,    classOf[Eq   [_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
 
-      checkFilterPushdown1(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(Seq(Row(true)))
-      checkFilterPushdown1(rdd, '_1)('_1 !== true, classOf[Operators.NotEq[java.lang.Boolean]])(Seq(Row(false)))
+      checkFilterPredicate('_1 === true, classOf[Eq   [_]], true)
+      checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
     }
   }
 
   test("filter pushdown - integer") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { rdd =>
-      checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[Integer]])(Seq.empty[Row])
-      checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[Integer]]) {
-        (1 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
-      checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[Integer]]) {
-        (2 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 < 2,  classOf[Lt  [Integer]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 > 3,  classOf[Gt  [Integer]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)
-
-      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq  [Integer]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [Integer]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [Integer]])(4)
-      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[Integer]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[Integer]])(4)
-
-      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
-      checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
-        Seq(Row(1), Row(4))
-      }
+    withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
+      checkFilterPredicate('_1.isNull,    classOf[Eq   [_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq   [_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2,  classOf[Lt  [_]], 1)
+      checkFilterPredicate('_1 > 3,  classOf[Gt  [_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq  [_]], 1)
+      checkFilterPredicate(Literal(2) >   '_1, classOf[Lt  [_]], 1)
+      checkFilterPredicate(Literal(3) <   '_1, classOf[Gt  [_]], 4)
+      checkFilterPredicate(Literal(1) >=  '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <=  '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4),         classOf[GtEq[_]],       4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  Seq(Row(1), Row(4)))
     }
   }
 
   test("filter pushdown - long") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { rdd =>
-      checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Long]])(Seq.empty[Row])
-      checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Long]]) {
-        (1 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
-      checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Long]]) {
-        (2 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 <  2, classOf[Lt  [java.lang.Long]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >  3, classOf[Gt  [java.lang.Long]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)
-
-      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq  [Integer]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [java.lang.Long]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [java.lang.Long]])(4)
-      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[java.lang.Long]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[java.lang.Long]])(4)
-
-      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
-      checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
-        Seq(Row(1), Row(4))
-      }
+    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd =>
+      checkFilterPredicate('_1.isNull,    classOf[Eq   [_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq[_]],    1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 <  2, classOf[Lt  [_]], 1)
+      checkFilterPredicate('_1 >  3, classOf[Gt  [_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq  [_]], 1)
+      checkFilterPredicate(Literal(2) >   '_1, classOf[Lt  [_]], 1)
+      checkFilterPredicate(Literal(3) <   '_1, classOf[Gt  [_]], 4)
+      checkFilterPredicate(Literal(1) >=  '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <=  '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4),         classOf[GtEq[_]],       4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  Seq(Row(1), Row(4)))
     }
   }
 
   test("filter pushdown - float") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { rdd =>
-      checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Float]])(Seq.empty[Row])
-      checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Float]]) {
-        (1 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
-      checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Float]]) {
-        (2 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 <  2, classOf[Lt  [java.lang.Float]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >  3, classOf[Gt  [java.lang.Float]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)
-
-      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq  [Integer]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [java.lang.Float]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [java.lang.Float]])(4)
-      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[java.lang.Float]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[java.lang.Float]])(4)
-
-      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
-      checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
-        Seq(Row(1), Row(4))
-      }
+    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd =>
+      checkFilterPredicate('_1.isNull,    classOf[Eq   [_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq   [_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 <  2, classOf[Lt  [_]], 1)
+      checkFilterPredicate('_1 >  3, classOf[Gt  [_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq  [_]], 1)
+      checkFilterPredicate(Literal(2) >   '_1, classOf[Lt  [_]], 1)
+      checkFilterPredicate(Literal(3) <   '_1, classOf[Gt  [_]], 4)
+      checkFilterPredicate(Literal(1) >=  '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <=  '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4),         classOf[GtEq[_]],       4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  Seq(Row(1), Row(4)))
     }
   }
 
   test("filter pushdown - double") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { rdd =>
-      checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Double]])(Seq.empty[Row])
-      checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Double]]) {
-        (1 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
-      checkFilterPushdown1(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Double]]) {
-        (2 to 4).map(Row.apply(_))
-      }
-
-      checkFilterPushdown(rdd, '_1)('_1 <  2, classOf[Lt  [java.lang.Double]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >  3, classOf[Gt  [java.lang.Double]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
-      checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)
-
-      checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(2) >   '_1, classOf[Lt  [java.lang.Double]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(3) <   '_1, classOf[Gt  [java.lang.Double]])(4)
-      checkFilterPushdown(rdd, '_1)(Literal(1) >=  '_1, classOf[LtEq[java.lang.Double]])(1)
-      checkFilterPushdown(rdd, '_1)(Literal(4) <=  '_1, classOf[GtEq[java.lang.Double]])(4)
-
-      checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
-      checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
-      checkFilterPushdown1(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
-        Seq(Row(1), Row(4))
-      }
+    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd =>
+      checkFilterPredicate('_1.isNull,    classOf[Eq[_]],    Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq   [_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 <  2, classOf[Lt  [_]], 1)
+      checkFilterPredicate('_1 >  3, classOf[Gt  [_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq  [_]], 1)
+      checkFilterPredicate(Literal(2) >   '_1, classOf[Lt  [_]], 1)
+      checkFilterPredicate(Literal(3) <   '_1, classOf[Gt  [_]], 4)
+      checkFilterPredicate(Literal(1) >=  '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <=  '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4),         classOf[GtEq[_]],       4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  Seq(Row(1), Row(4)))
     }
   }
 
   test("filter pushdown - string") {
-    withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { rdd =>
-      checkFilterPushdown1(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
-      checkFilterPushdown1(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
-        (1 to 4).map(i => Row.apply(i.toString))
-      }
-
-      checkFilterPushdown1(rdd, '_1)('_1 === "1", classOf[Eq[String]])(Seq(Row("1")))
-      checkFilterPushdown1(rdd, '_1)('_1 !== "1", classOf[Operators.NotEq[String]]) {
-        (2 to 4).map(i => Row.apply(i.toString))
-      }
+    withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(
+        '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
+
+      checkFilterPredicate('_1 === "1", classOf[Eq   [_]], "1")
+      checkFilterPredicate('_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
+
+      checkFilterPredicate('_1 <  "2", classOf[Lt  [_]], "1")
+      checkFilterPredicate('_1 >  "3", classOf[Gt  [_]], "4")
+      checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
+      checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
+
+      checkFilterPredicate(Literal("1") === '_1, classOf[Eq  [_]], "1")
+      checkFilterPredicate(Literal("2") >   '_1, classOf[Lt  [_]], "1")
+      checkFilterPredicate(Literal("3") <   '_1, classOf[Gt  [_]], "4")
+      checkFilterPredicate(Literal("1") >=  '_1, classOf[LtEq[_]], "1")
+      checkFilterPredicate(Literal("4") <=  '_1, classOf[GtEq[_]], "4")
+
+      checkFilterPredicate(!('_1 < "4"),           classOf[GtEq[_]],       "4")
+      checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
+      checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or],  Seq(Row("1"), Row("4")))
+    }
+  }
 
-      checkFilterPushdown1(rdd, '_1)('_1 <  "2", classOf[Lt  [java.lang.String]])(Seq(Row("1")))
-      checkFilterPushdown1(rdd, '_1)('_1 >  "3", classOf[Gt  [java.lang.String]])(Seq(Row("4")))
-      checkFilterPushdown1(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])(Seq(Row("1")))
-      checkFilterPushdown1(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])(Seq(Row("4")))
-
-      checkFilterPushdown1(rdd, '_1)(Literal("1") === '_1, classOf[Eq  [java.lang.String]])(Seq(Row("1")))
-      checkFilterPushdown1(rdd, '_1)(Literal("2") >   '_1, classOf[Lt  [java.lang.String]])(Seq(Row("1")))
-      checkFilterPushdown1(rdd, '_1)(Literal("3") <   '_1, classOf[Gt  [java.lang.String]])(Seq(Row("4")))
-      checkFilterPushdown1(rdd, '_1)(Literal("1") >=  '_1, classOf[LtEq[java.lang.String]])(Seq(Row("1")))
-      checkFilterPushdown1(rdd, '_1)(Literal("4") <=  '_1, classOf[GtEq[java.lang.String]])(Seq(Row("4")))
-
-      checkFilterPushdown1(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])(Seq(Row("4")))
-      checkFilterPushdown1(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])(Seq(Row("3")))
-      checkFilterPushdown1(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
-        Seq(Row("1"), Row("4"))
+  def checkBinaryFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
+      (implicit rdd: SchemaRDD): Unit = {
+    def checkBinaryAnswer(rdd: SchemaRDD, expected: Seq[Row]) = {
+      assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
+        rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
       }
     }
+
+    checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, expected)
+  }
+
+  def checkBinaryFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte])
+      (implicit rdd: SchemaRDD): Unit = {
+    checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
   }
 
   test("filter pushdown - binary") {
@@ -268,33 +240,30 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
       def b: Array[Byte] = int.toString.getBytes("UTF-8")
     }
 
-    withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { rdd =>
-      checkBinaryFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
-      checkBinaryFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
-        (1 to 4).map(i => Row.apply(i.b)).toSeq
-      }
-
-      checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
-      checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.NotEq[Array[Byte]]]) {
-        (2 to 4).map(i => Row.apply(i.b)).toSeq
-      }
-
-      checkBinaryFilterPushdown(rdd, '_1)('_1 <  2.b, classOf[Lt  [Array[Byte]]])(1.b)
-      checkBinaryFilterPushdown(rdd, '_1)('_1 >  3.b, classOf[Gt  [Array[Byte]]])(4.b)
-      checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
-      checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)
-
-      checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq  [Array[Byte]]])(1.b)
-      checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) >   '_1, classOf[Lt  [Array[Byte]]])(1.b)
-      checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) <   '_1, classOf[Gt  [Array[Byte]]])(4.b)
-      checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >=  '_1, classOf[LtEq[Array[Byte]]])(1.b)
-      checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <=  '_1, classOf[GtEq[Array[Byte]]])(4.b)
-
-      checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
-      checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
-      checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
-        Seq(Row(1.b), Row(4.b))
-      }
+    withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
+      checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkBinaryFilterPredicate(
+        '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.b)).toSeq)
+
+      checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq   [_]], 1.b)
+      checkBinaryFilterPredicate(
+        '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.b)).toSeq)
+
+      checkBinaryFilterPredicate('_1 <  2.b, classOf[Lt  [_]], 1.b)
+      checkBinaryFilterPredicate('_1 >  3.b, classOf[Gt  [_]], 4.b)
+      checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
+      checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
+
+      checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq  [_]], 1.b)
+      checkBinaryFilterPredicate(Literal(2.b) >   '_1, classOf[Lt  [_]], 1.b)
+      checkBinaryFilterPredicate(Literal(3.b) <   '_1, classOf[Gt  [_]], 4.b)
+      checkBinaryFilterPredicate(Literal(1.b) >=  '_1, classOf[LtEq[_]], 1.b)
+      checkBinaryFilterPredicate(Literal(4.b) <=  '_1, classOf[GtEq[_]], 4.b)
+
+      checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
+      checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
+      checkBinaryFilterPredicate(
+        '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba19689f/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 2c5345b..1263ff8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,1030 +17,72 @@
 
 package org.apache.spark.sql.parquet
 
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
-import parquet.filter2.predicate.{FilterPredicate, Operators}
-import parquet.hadoop.ParquetFileWriter
-import parquet.hadoop.util.ContextUtil
-import parquet.io.api.Binary
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.{Row => _, _}
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-case class TestRDDEntry(key: Int, value: String)
-
-case class NullReflectData(
-    intField: java.lang.Integer,
-    longField: java.lang.Long,
-    floatField: java.lang.Float,
-    doubleField: java.lang.Double,
-    booleanField: java.lang.Boolean)
-
-case class OptionalReflectData(
-    intField: Option[Int],
-    longField: Option[Long],
-    floatField: Option[Float],
-    doubleField: Option[Double],
-    booleanField: Option[Boolean])
-
-case class Nested(i: Int, s: String)
-
-case class Data(array: Seq[Int], nested: Nested)
-
-case class AllDataTypes(
-    stringField: String,
-    intField: Int,
-    longField: Long,
-    floatField: Float,
-    doubleField: Double,
-    shortField: Short,
-    byteField: Byte,
-    booleanField: Boolean)
-
-case class AllDataTypesWithNonPrimitiveType(
-    stringField: String,
-    intField: Int,
-    longField: Long,
-    floatField: Float,
-    doubleField: Double,
-    shortField: Short,
-    byteField: Byte,
-    booleanField: Boolean,
-    array: Seq[Int],
-    arrayContainsNull: Seq[Option[Int]],
-    map: Map[Int, Long],
-    mapValueContainsNull: Map[Int, Option[Long]],
-    data: Data)
-
-case class BinaryData(binaryData: Array[Byte])
-
-case class NumericData(i: Int, d: Double)
 
-class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
-  TestData // Load test data tables.
-
-  private var testRDD: SchemaRDD = null
-  private val originalParquetFilterPushdownEnabled = TestSQLContext.conf.parquetFilterPushDown
-
-  override def beforeAll() {
-    ParquetTestData.writeFile()
-    ParquetTestData.writeFilterFile()
-    ParquetTestData.writeNestedFile1()
-    ParquetTestData.writeNestedFile2()
-    ParquetTestData.writeNestedFile3()
-    ParquetTestData.writeNestedFile4()
-    ParquetTestData.writeGlobFiles()
-    testRDD = parquetFile(ParquetTestData.testDir.toString)
-    testRDD.registerTempTable("testsource")
-    parquetFile(ParquetTestData.testFilterDir.toString)
-      .registerTempTable("testfiltersource")
-
-    setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
-  }
-
-  override def afterAll() {
-    Utils.deleteRecursively(ParquetTestData.testDir)
-    Utils.deleteRecursively(ParquetTestData.testFilterDir)
-    Utils.deleteRecursively(ParquetTestData.testNestedDir1)
-    Utils.deleteRecursively(ParquetTestData.testNestedDir2)
-    Utils.deleteRecursively(ParquetTestData.testNestedDir3)
-    Utils.deleteRecursively(ParquetTestData.testNestedDir4)
-    Utils.deleteRecursively(ParquetTestData.testGlobDir)
-    // here we should also unregister the table??
-
-    setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, originalParquetFilterPushdownEnabled.toString)
-  }
+/**
+ * A test suite that tests various Parquet queries.
+ */
+class ParquetQuerySuite extends QueryTest with ParquetTest {
+  val sqlContext = TestSQLContext
 
-  test("Read/Write All Types") {
-    val tempDir = getTempFilePath("parquetTest").getCanonicalPath
-    val range = (0 to 255)
-    val data = sparkContext.parallelize(range).map { x =>
-      parquet.AllDataTypes(
-        s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)
+  test("simple projection") {
+    withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
     }
-
-    data.saveAsParquetFile(tempDir)
-
-    checkAnswer(
-      parquetFile(tempDir),
-      data.toSchemaRDD.collect().toSeq)
   }
 
-  test("read/write binary data") {
-    // Since equality for Array[Byte] is broken we test this separately.
-    val tempDir = getTempFilePath("parquetTest").getCanonicalPath
-    sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).saveAsParquetFile(tempDir)
-    parquetFile(tempDir)
-      .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
-      .collect().toSeq == Seq("test")
-  }
-
-  ignore("Treat binary as string") {
-    val oldIsParquetBinaryAsString = TestSQLContext.conf.isParquetBinaryAsString
-
-    // Create the test file.
-    val file = getTempFilePath("parquet")
-    val path = file.toString
-    val range = (0 to 255)
-    val rowRDD = TestSQLContext.sparkContext.parallelize(range)
-      .map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes))
-    // We need to ask Parquet to store the String column as a Binary column.
-    val schema = StructType(
-      StructField("c1", IntegerType, false) ::
-      StructField("c2", BinaryType, false) :: Nil)
-    val schemaRDD1 = applySchema(rowRDD, schema)
-    schemaRDD1.saveAsParquetFile(path)
-    checkAnswer(
-      parquetFile(path).select('c1, 'c2.cast(StringType)),
-      schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)
-
-    setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
-    parquetFile(path).printSchema()
-    checkAnswer(
-      parquetFile(path),
-      schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)
-
-
-    // Set it back.
-    TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
-  }
-
-  test("Compression options for writing to a Parquetfile") {
-    val defaultParquetCompressionCodec = TestSQLContext.conf.parquetCompressionCodec
-    import scala.collection.JavaConversions._
-
-    val file = getTempFilePath("parquet")
-    val path = file.toString
-    val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
-      .map(i => TestRDDEntry(i, s"val_$i"))
-
-    // test default compression codec
-    rdd.saveAsParquetFile(path)
-    var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
-      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
-    parquetFile(path).registerTempTable("tmp")
-    checkAnswer(
-      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
-      Row(5, "val_5") ::
-      Row(7, "val_7") :: Nil)
-
-    Utils.deleteRecursively(file)
-
-    // test uncompressed parquet file with property value "UNCOMPRESSED"
-    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")
-
-    rdd.saveAsParquetFile(path)
-    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
-      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
-    parquetFile(path).registerTempTable("tmp")
-    checkAnswer(
-      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
-      Row(5, "val_5") ::
-        Row(7, "val_7") :: Nil)
-
-    Utils.deleteRecursively(file)
-
-    // test uncompressed parquet file with property value "none"
-    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")
-
-    rdd.saveAsParquetFile(path)
-    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
-      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === "UNCOMPRESSED" :: Nil)
-
-    parquetFile(path).registerTempTable("tmp")
-    checkAnswer(
-      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
-      Row(5, "val_5") ::
-        Row(7, "val_7") :: Nil)
-
-    Utils.deleteRecursively(file)
-
-    // test gzip compression codec
-    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
-
-    rdd.saveAsParquetFile(path)
-    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
-      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
-    parquetFile(path).registerTempTable("tmp")
-    checkAnswer(
-      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
-      Row(5, "val_5") ::
-        Row(7, "val_7") :: Nil)
-
-    Utils.deleteRecursively(file)
-
-    // test snappy compression codec
-    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")
-
-    rdd.saveAsParquetFile(path)
-    actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
-      .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
-
-    parquetFile(path).registerTempTable("tmp")
-    checkAnswer(
-      sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
-      Row(5, "val_5") ::
-        Row(7, "val_7") :: Nil)
-
-    Utils.deleteRecursively(file)
-
-    // TODO: Lzo requires additional external setup steps so leave it out for now
-    // ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
-
-    // Set it back.
-    TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
-  }
-
-  test("Read/Write All Types with non-primitive type") {
-    val tempDir = getTempFilePath("parquetTest").getCanonicalPath
-    val range = (0 to 255)
-    val data = sparkContext.parallelize(range).map { x =>
-      parquet.AllDataTypesWithNonPrimitiveType(
-        s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
-        (0 until x),
-        (0 until x).map(Option(_).filter(_ % 3 == 0)),
-        (0 until x).map(i => i -> i.toLong).toMap,
-        (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
-        parquet.Data((0 until x), parquet.Nested(x, s"$x")))
+  test("appending") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    withParquetTable(data, "t") {
+      sql("INSERT INTO t SELECT * FROM t")
+      checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
-    data.saveAsParquetFile(tempDir)
-
-    checkAnswer(
-      parquetFile(tempDir),
-      data.toSchemaRDD.collect().toSeq)
   }
 
-  test("self-join parquet files") {
-    val x = ParquetTestData.testData.as('x)
-    val y = ParquetTestData.testData.as('y)
-    val query = x.join(y).where("x.myint".attr === "y.myint".attr)
-
-    // Check to make sure that the attributes from either side of the join have unique expression
-    // ids.
-    query.queryExecution.analyzed.output.filter(_.name == "myint") match {
-      case Seq(i1, i2) if(i1.exprId == i2.exprId) =>
-        fail(s"Duplicate expression IDs found in query plan: $query")
-      case Seq(_, _) => // All good
+  test("self-join") {
+    // 4 rows, cells of column 1 of row 2 and row 4 are null
+    val data = (1 to 4).map { i =>
+      val maybeInt = if (i % 2 == 0) None else Some(i)
+      (maybeInt, i.toString)
     }
 
-    val result = query.collect()
-    assert(result.size === 9, "self-join result has incorrect size")
-    assert(result(0).size === 12, "result row has incorrect size")
-    result.zipWithIndex.foreach {
-      case (row, index) => row.toSeq.zipWithIndex.foreach {
-        case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column")
-      }
-    }
-  }
+    withParquetTable(data, "t") {
+      val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+      val queryOutput = selfJoin.queryExecution.analyzed.output
 
-  test("Import of simple Parquet file") {
-    val result = parquetFile(ParquetTestData.testDir.toString).collect()
-    assert(result.size === 15)
-    result.zipWithIndex.foreach {
-      case (row, index) => {
-        val checkBoolean =
-          if (index % 3 == 0)
-            row(0) == true
-          else
-            row(0) == false
-        assert(checkBoolean === true, s"boolean field value in line $index did not match")
-        if (index % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
-        assert(row(2) === "abc", s"string field value in line $index did not match")
-        assert(row(3) === (index.toLong << 33), s"long value in line $index did not match")
-        assert(row(4) === 2.5F, s"float field value in line $index did not match")
-        assert(row(5) === 4.5D, s"double field value in line $index did not match")
+      assertResult(4, s"Field count mismatches")(queryOutput.size)
+      assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
+        queryOutput.filter(_.name == "_1").map(_.exprId).size
       }
-    }
-  }
 
-  test("Projection of simple Parquet file") {
-    val result = ParquetTestData.testData.select('myboolean, 'mylong).collect()
-    result.zipWithIndex.foreach {
-      case (row, index) => {
-          if (index % 3 == 0)
-            assert(row(0) === true, s"boolean field value in line $index did not match (every third row)")
-          else
-            assert(row(0) === false, s"boolean field value in line $index did not match")
-        assert(row(1) === (index.toLong << 33), s"long field value in line $index did not match")
-        assert(row.size === 2, s"number of columns in projection in line $index is incorrect")
-      }
+      checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
     }
   }
 
-  test("Writing metadata from scratch for table CREATE") {
-    val job = new Job()
-    val path = new Path(getTempFilePath("testtable").getCanonicalFile.toURI.toString)
-    val fs: FileSystem = FileSystem.getLocal(ContextUtil.getConfiguration(job))
-    ParquetTypesConverter.writeMetaData(
-      ParquetTestData.testData.output,
-      path,
-      TestSQLContext.sparkContext.hadoopConfiguration)
-    assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
-    val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job)))
-    assert(metaData != null)
-    ParquetTestData
-      .testData
-      .parquetSchema
-      .checkContains(metaData.getFileMetaData.getSchema) // throws exception if incompatible
-    metaData
-      .getFileMetaData
-      .getSchema
-      .checkContains(ParquetTestData.testData.parquetSchema) // throws exception if incompatible
-    fs.delete(path, true)
-  }
-
-  test("Creating case class RDD table") {
-    TestSQLContext.sparkContext.parallelize((1 to 100))
-      .map(i => TestRDDEntry(i, s"val_$i"))
-      .registerTempTable("tmp")
-    val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0))
-    var counter = 1
-    rdd.foreach {
-      // '===' does not like string comparison?
-      row: Row => {
-        assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter")
-        counter = counter + 1
-      }
+  test("nested data - struct with array field") {
+    val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+        case Tuple1((_, Seq(string))) => Row(string)
+      })
     }
   }
 
-  test("Read a parquet file instead of a directory") {
-    val file = getTempFilePath("parquet")
-    val path = file.toString
-    val fsPath = new Path(path)
-    val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
-    val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
-      .map(i => TestRDDEntry(i, s"val_$i"))
-    rdd.coalesce(1).saveAsParquetFile(path)
-
-    val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
-    assert(children.length > 0)
-    val readFile = parquetFile(path + "/" + children(0).getPath.getName)
-    readFile.registerTempTable("tmpx")
-    val rdd_copy = sql("SELECT * FROM tmpx").collect()
-    val rdd_orig = rdd.collect()
-    for(i <- 0 to 99) {
-      assert(rdd_copy(i).apply(0) === rdd_orig(i).key,   s"key error in line $i")
-      assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
+  test("nested data - array of struct") {
+    val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+        case Tuple1(Seq((_, string))) => Row(string)
+      })
     }
-    Utils.deleteRecursively(file)
-  }
-
-  test("Insert (appending) to same table via Scala API") {
-    sql("INSERT INTO testsource SELECT * FROM testsource")
-    val double_rdd = sql("SELECT * FROM testsource").collect()
-    assert(double_rdd != null)
-    assert(double_rdd.size === 30)
-
-    // let's restore the original test data
-    Utils.deleteRecursively(ParquetTestData.testDir)
-    ParquetTestData.writeFile()
-  }
-
-  test("save and load case class RDD with nulls as parquet") {
-    val data = parquet.NullReflectData(null, null, null, null, null)
-    val rdd = sparkContext.parallelize(data :: Nil)
-
-    val file = getTempFilePath("parquet")
-    val path = file.toString
-    rdd.saveAsParquetFile(path)
-    val readFile = parquetFile(path)
-
-    val rdd_saved = readFile.collect()
-    assert(rdd_saved(0) === Row(null, null, null, null, null))
-    Utils.deleteRecursively(file)
-    assert(true)
-  }
-
-  test("save and load case class RDD with Nones as parquet") {
-    val data = parquet.OptionalReflectData(None, None, None, None, None)
-    val rdd = sparkContext.parallelize(data :: Nil)
-
-    val file = getTempFilePath("parquet")
-    val path = file.toString
-    rdd.saveAsParquetFile(path)
-    val readFile = parquetFile(path)
-
-    val rdd_saved = readFile.collect()
-    assert(rdd_saved(0) === Row(null, null, null, null, null))
-    Utils.deleteRecursively(file)
-    assert(true)
-  }
-
-  test("make RecordFilter for simple predicates") {
-    def checkFilter[T <: FilterPredicate : ClassTag](
-        predicate: Expression,
-        defined: Boolean = true): Unit = {
-      val filter = ParquetFilters.createFilter(predicate)
-      if (defined) {
-        assert(filter.isDefined)
-        val tClass = implicitly[ClassTag[T]].runtimeClass
-        val filterGet = filter.get
-        assert(
-          tClass.isInstance(filterGet),
-          s"$filterGet of type ${filterGet.getClass} is not an instance of $tClass")
-      } else {
-        assert(filter.isEmpty)
-      }
-    }
-
-    checkFilter[Operators.Eq[Integer]]('a.int === 1)
-    checkFilter[Operators.Eq[Integer]](Literal(1) === 'a.int)
-
-    checkFilter[Operators.Lt[Integer]]('a.int < 4)
-    checkFilter[Operators.Lt[Integer]](Literal(4) > 'a.int)
-    checkFilter[Operators.LtEq[Integer]]('a.int <= 4)
-    checkFilter[Operators.LtEq[Integer]](Literal(4) >= 'a.int)
-
-    checkFilter[Operators.Gt[Integer]]('a.int > 4)
-    checkFilter[Operators.Gt[Integer]](Literal(4) < 'a.int)
-    checkFilter[Operators.GtEq[Integer]]('a.int >= 4)
-    checkFilter[Operators.GtEq[Integer]](Literal(4) <= 'a.int)
-
-    checkFilter[Operators.And]('a.int === 1 && 'a.int < 4)
-    checkFilter[Operators.Or]('a.int === 1 || 'a.int < 4)
-    checkFilter[Operators.NotEq[Integer]](!('a.int === 1))
-
-    checkFilter('a.int > 'b.int, defined = false)
-    checkFilter(('a.int > 'b.int) && ('a.int > 'b.int), defined = false)
-  }
-
-  test("test filter by predicate pushdown") {
-    for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
-      val query1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100")
-      assert(
-        query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-        "Top operator should be ParquetTableScan after pushdown")
-      val result1 = query1.collect()
-      assert(result1.size === 50)
-      assert(result1(0)(1) === 100)
-      assert(result1(49)(1) === 149)
-      val query2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200")
-      assert(
-        query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-        "Top operator should be ParquetTableScan after pushdown")
-      val result2 = query2.collect()
-      assert(result2.size === 50)
-      if (myval == "myint" || myval == "mylong") {
-        assert(result2(0)(1) === 151)
-        assert(result2(49)(1) === 200)
-      } else {
-        assert(result2(0)(1) === 150)
-        assert(result2(49)(1) === 199)
-      }
-    }
-    for(myval <- Seq("myint", "mylong")) {
-      val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10")
-      assert(
-        query3.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-        "Top operator should be ParquetTableScan after pushdown")
-      val result3 = query3.collect()
-      assert(result3.size === 20)
-      assert(result3(0)(1) === 0)
-      assert(result3(9)(1) === 9)
-      assert(result3(10)(1) === 191)
-      assert(result3(19)(1) === 200)
-    }
-    for(myval <- Seq("mydouble", "myfloat")) {
-      val result4 =
-        if (myval == "mydouble") {
-          val query4 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10.0")
-          assert(
-            query4.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-            "Top operator should be ParquetTableScan after pushdown")
-          query4.collect()
-        } else {
-          // CASTs are problematic. Here myfloat will be casted to a double and it seems there is
-          // currently no way to specify float constants in SqlParser?
-          sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect()
-        }
-      assert(result4.size === 20)
-      assert(result4(0)(1) === 0)
-      assert(result4(9)(1) === 9)
-      assert(result4(10)(1) === 191)
-      assert(result4(19)(1) === 200)
-    }
-    val query5 = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40")
-    assert(
-      query5.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val booleanResult = query5.collect()
-    assert(booleanResult.size === 10)
-    for(i <- 0 until 10) {
-      if (!booleanResult(i).getBoolean(0)) {
-        fail(s"Boolean value in result row $i not true")
-      }
-      if (booleanResult(i).getInt(1) != i * 4) {
-        fail(s"Int value in result row $i should be ${4*i}")
-      }
-    }
-    val query6 = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"")
-    assert(
-      query6.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val stringResult = query6.collect()
-    assert(stringResult.size === 1)
-    assert(stringResult(0).getString(2) == "100", "stringvalue incorrect")
-    assert(stringResult(0).getInt(1) === 100)
-
-    val query7 = sql(s"SELECT * FROM testfiltersource WHERE myoptint < 40")
-    assert(
-      query7.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val optResult = query7.collect()
-    assert(optResult.size === 20)
-    for(i <- 0 until 20) {
-      if (optResult(i)(7) != i * 2) {
-        fail(s"optional Int value in result row $i should be ${2*4*i}")
-      }
-    }
-    for(myval <- Seq("myoptint", "myoptlong", "myoptdouble", "myoptfloat")) {
-      val query8 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100")
-      assert(
-        query8.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-        "Top operator should be ParquetTableScan after pushdown")
-      val result8 = query8.collect()
-      assert(result8.size === 25)
-      assert(result8(0)(7) === 100)
-      assert(result8(24)(7) === 148)
-      val query9 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200")
-      assert(
-        query9.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-        "Top operator should be ParquetTableScan after pushdown")
-      val result9 = query9.collect()
-      assert(result9.size === 25)
-      if (myval == "myoptint" || myval == "myoptlong") {
-        assert(result9(0)(7) === 152)
-        assert(result9(24)(7) === 200)
-      } else {
-        assert(result9(0)(7) === 150)
-        assert(result9(24)(7) === 198)
-      }
-    }
-    val query10 = sql("SELECT * FROM testfiltersource WHERE myoptstring = \"100\"")
-    assert(
-      query10.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val result10 = query10.collect()
-    assert(result10.size === 1)
-    assert(result10(0).getString(8) == "100", "stringvalue incorrect")
-    assert(result10(0).getInt(7) === 100)
-    val query11 = sql(s"SELECT * FROM testfiltersource WHERE myoptboolean = true AND myoptint < 40")
-    assert(
-      query11.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val result11 = query11.collect()
-    assert(result11.size === 7)
-    for(i <- 0 until 6) {
-      if (!result11(i).getBoolean(6)) {
-        fail(s"optional Boolean value in result row $i not true")
-      }
-      if (result11(i).getInt(7) != i * 6) {
-        fail(s"optional Int value in result row $i should be ${6*i}")
-      }
-    }
-
-    val query12 = sql("SELECT * FROM testfiltersource WHERE mystring >= \"50\"")
-    assert(
-      query12.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val result12 = query12.collect()
-    assert(result12.size === 54)
-    assert(result12(0).getString(2) == "6")
-    assert(result12(4).getString(2) == "50")
-    assert(result12(53).getString(2) == "99")
-
-    val query13 = sql("SELECT * FROM testfiltersource WHERE mystring > \"50\"")
-    assert(
-      query13.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val result13 = query13.collect()
-    assert(result13.size === 53)
-    assert(result13(0).getString(2) == "6")
-    assert(result13(4).getString(2) == "51")
-    assert(result13(52).getString(2) == "99")
-
-    val query14 = sql("SELECT * FROM testfiltersource WHERE mystring <= \"50\"")
-    assert(
-      query14.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val result14 = query14.collect()
-    assert(result14.size === 148)
-    assert(result14(0).getString(2) == "0")
-    assert(result14(46).getString(2) == "50")
-    assert(result14(147).getString(2) == "200")
-
-    val query15 = sql("SELECT * FROM testfiltersource WHERE mystring < \"50\"")
-    assert(
-      query15.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
-      "Top operator should be ParquetTableScan after pushdown")
-    val result15 = query15.collect()
-    assert(result15.size === 147)
-    assert(result15(0).getString(2) == "0")
-    assert(result15(46).getString(2) == "100")
-    assert(result15(146).getString(2) == "200")
   }
 
   test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
-    val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
-    assert(query.collect().size === 10)
-  }
-
-  test("Importing nested Parquet file (Addressbook)") {
-    val result = TestSQLContext
-      .parquetFile(ParquetTestData.testNestedDir1.toString)
-      .toSchemaRDD
-      .collect()
-    assert(result != null)
-    assert(result.size === 2)
-    val first_record = result(0)
-    val second_record = result(1)
-    assert(first_record != null)
-    assert(second_record != null)
-    assert(first_record.size === 3)
-    assert(second_record(1) === null)
-    assert(second_record(2) === null)
-    assert(second_record(0) === "A. Nonymous")
-    assert(first_record(0) === "Julien Le Dem")
-    val first_owner_numbers = first_record(1)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
-    val first_contacts = first_record(2)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
-    assert(first_owner_numbers != null)
-    assert(first_owner_numbers(0) === "555 123 4567")
-    assert(first_owner_numbers(2) === "XXX XXX XXXX")
-    assert(first_contacts(0)
-      .asInstanceOf[CatalystConverter.StructScalaType[_]].size === 2)
-    val first_contacts_entry_one = first_contacts(0)
-      .asInstanceOf[CatalystConverter.StructScalaType[_]]
-    assert(first_contacts_entry_one(0) === "Dmitriy Ryaboy")
-    assert(first_contacts_entry_one(1) === "555 987 6543")
-    val first_contacts_entry_two = first_contacts(1)
-      .asInstanceOf[CatalystConverter.StructScalaType[_]]
-    assert(first_contacts_entry_two(0) === "Chris Aniszczyk")
-  }
-
-  test("Importing nested Parquet file (nested numbers)") {
-    val result = TestSQLContext
-      .parquetFile(ParquetTestData.testNestedDir2.toString)
-      .toSchemaRDD
-      .collect()
-    assert(result.size === 1, "number of top-level rows incorrect")
-    assert(result(0).size === 5, "number of fields in row incorrect")
-    assert(result(0)(0) === 1)
-    assert(result(0)(1) === 7)
-    val subresult1 = result(0)(2).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
-    assert(subresult1.size === 3)
-    assert(subresult1(0) === (1.toLong << 32))
-    assert(subresult1(1) === (1.toLong << 33))
-    assert(subresult1(2) === (1.toLong << 34))
-    val subresult2 = result(0)(3)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
-      .asInstanceOf[CatalystConverter.StructScalaType[_]]
-    assert(subresult2.size === 2)
-    assert(subresult2(0) === 2.5)
-    assert(subresult2(1) === false)
-    val subresult3 = result(0)(4)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
-    assert(subresult3.size === 2)
-    assert(subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 2)
-    val subresult4 = subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]]
-    assert(subresult4(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7)
-    assert(subresult4(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8)
-    assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 1)
-    assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
-  }
-
-  test("Simple query on addressbook") {
-    val data = TestSQLContext
-      .parquetFile(ParquetTestData.testNestedDir1.toString)
-      .toSchemaRDD
-    val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 'contacts as 'c).collect()
-    assert(tmp.size === 1)
-    assert(tmp(0)(0) === "Julien Le Dem")
-  }
-
-  test("Projection in addressbook") {
-    val data = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
-    data.registerTempTable("data")
-    val query = sql("SELECT owner, contacts[1].name FROM data")
-    val tmp = query.collect()
-    assert(tmp.size === 2)
-    assert(tmp(0).size === 2)
-    assert(tmp(0)(0) === "Julien Le Dem")
-    assert(tmp(0)(1) === "Chris Aniszczyk")
-    assert(tmp(1)(0) === "A. Nonymous")
-    assert(tmp(1)(1) === null)
-  }
-
-  test("Simple query on nested int data") {
-    val data = parquetFile(ParquetTestData.testNestedDir2.toString).toSchemaRDD
-    data.registerTempTable("data")
-    val result1 = sql("SELECT entries[0].value FROM data").collect()
-    assert(result1.size === 1)
-    assert(result1(0).size === 1)
-    assert(result1(0)(0) === 2.5)
-    val result2 = sql("SELECT entries[0] FROM data").collect()
-    assert(result2.size === 1)
-    val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]]
-    assert(subresult1.size === 2)
-    assert(subresult1(0) === 2.5)
-    assert(subresult1(1) === false)
-    val result3 = sql("SELECT outerouter FROM data").collect()
-    val subresult2 = result3(0)(0)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]]
-    assert(subresult2(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7)
-    assert(subresult2(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8)
-    assert(result3(0)(0)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](1)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
-      .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9)
-  }
-
-  test("nested structs") {
-    val data = parquetFile(ParquetTestData.testNestedDir3.toString)
-      .toSchemaRDD
-    data.registerTempTable("data")
-    val result1 = sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
-    assert(result1.size === 1)
-    assert(result1(0).size === 1)
-    assert(result1(0)(0) === false)
-    val result2 = sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
-    assert(result2.size === 1)
-    assert(result2(0).size === 1)
-    assert(result2(0)(0) === true)
-    val result3 = sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
-    assert(result3.size === 1)
-    assert(result3(0).size === 1)
-    assert(result3(0)(0) === false)
-  }
-
-  test("simple map") {
-    val data = TestSQLContext
-      .parquetFile(ParquetTestData.testNestedDir4.toString)
-      .toSchemaRDD
-    data.registerTempTable("mapTable")
-    val result1 = sql("SELECT data1 FROM mapTable").collect()
-    assert(result1.size === 1)
-    assert(result1(0)(0)
-      .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
-      .getOrElse("key1", 0) === 1)
-    assert(result1(0)(0)
-      .asInstanceOf[CatalystConverter.MapScalaType[String, _]]
-      .getOrElse("key2", 0) === 2)
-    val result2 = sql("""SELECT data1["key1"] FROM mapTable""").collect()
-    assert(result2(0)(0) === 1)
-  }
-
-  test("map with struct values") {
-    val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
-    data.registerTempTable("mapTable")
-    val result1 = sql("SELECT data2 FROM mapTable").collect()
-    assert(result1.size === 1)
-    val entry1 = result1(0)(0)
-      .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
-      .getOrElse("seven", null)
-    assert(entry1 != null)
-    assert(entry1(0) === 42)
-    assert(entry1(1) === "the answer")
-    val entry2 = result1(0)(0)
-      .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
-      .getOrElse("eight", null)
-    assert(entry2 != null)
-    assert(entry2(0) === 49)
-    assert(entry2(1) === null)
-    val result2 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
-    assert(result2.size === 1)
-    assert(result2(0)(0) === 42.toLong)
-    assert(result2(0)(1) === "the answer")
-  }
-
-  test("Writing out Addressbook and reading it back in") {
-    // TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
-    // has no effect in this test case
-    val tmpdir = Utils.createTempDir()
-    Utils.deleteRecursively(tmpdir)
-    val result = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
-    result.saveAsParquetFile(tmpdir.toString)
-    parquetFile(tmpdir.toString)
-      .toSchemaRDD
-      .registerTempTable("tmpcopy")
-    val tmpdata = sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
-    assert(tmpdata.size === 2)
-    assert(tmpdata(0).size === 2)
-    assert(tmpdata(0)(0) === "Julien Le Dem")
-    assert(tmpdata(0)(1) === "Chris Aniszczyk")
-    assert(tmpdata(1)(0) === "A. Nonymous")
-    assert(tmpdata(1)(1) === null)
-    Utils.deleteRecursively(tmpdir)
-  }
-
-  test("Writing out Map and reading it back in") {
-    val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
-    val tmpdir = Utils.createTempDir()
-    Utils.deleteRecursively(tmpdir)
-    data.saveAsParquetFile(tmpdir.toString)
-    parquetFile(tmpdir.toString)
-      .toSchemaRDD
-      .registerTempTable("tmpmapcopy")
-    val result1 = sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
-    assert(result1.size === 1)
-    assert(result1(0)(0) === 2)
-    val result2 = sql("SELECT data2 FROM tmpmapcopy").collect()
-    assert(result2.size === 1)
-    val entry1 = result2(0)(0)
-      .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
-      .getOrElse("seven", null)
-    assert(entry1 != null)
-    assert(entry1(0) === 42)
-    assert(entry1(1) === "the answer")
-    val entry2 = result2(0)(0)
-      .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
-      .getOrElse("eight", null)
-    assert(entry2 != null)
-    assert(entry2(0) === 49)
-    assert(entry2(1) === null)
-    val result3 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
-    assert(result3.size === 1)
-    assert(result3(0)(0) === 42.toLong)
-    assert(result3(0)(1) === "the answer")
-    Utils.deleteRecursively(tmpdir)
-  }
-
-  test("read/write fixed-length decimals") {
-    for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
-      val tempDir = getTempFilePath("parquetTest").getCanonicalPath
-      val data = sparkContext.parallelize(0 to 1000)
-        .map(i => NumericData(i, i / 100.0))
-        .select('i, 'd cast DecimalType(precision, scale))
-      data.saveAsParquetFile(tempDir)
-      checkAnswer(parquetFile(tempDir), data.toSchemaRDD.collect().toSeq)
-    }
-
-    // Decimals with precision above 18 are not yet supported
-    intercept[RuntimeException] {
-      val tempDir = getTempFilePath("parquetTest").getCanonicalPath
-      val data = sparkContext.parallelize(0 to 1000)
-        .map(i => NumericData(i, i / 100.0))
-        .select('i, 'd cast DecimalType(19, 10))
-      data.saveAsParquetFile(tempDir)
-      checkAnswer(parquetFile(tempDir), data.toSchemaRDD.collect().toSeq)
-    }
-
-    // Unlimited-length decimals are not yet supported
-    intercept[RuntimeException] {
-      val tempDir = getTempFilePath("parquetTest").getCanonicalPath
-      val data = sparkContext.parallelize(0 to 1000)
-        .map(i => NumericData(i, i / 100.0))
-        .select('i, 'd cast DecimalType.Unlimited)
-      data.saveAsParquetFile(tempDir)
-      checkAnswer(parquetFile(tempDir), data.toSchemaRDD.collect().toSeq)
-    }
-  }
-
-  def checkFilter(predicate: Predicate, filterClass: Class[_ <: FilterPredicate]): Unit = {
-    val filter = ParquetFilters.createFilter(predicate)
-    assert(filter.isDefined)
-    assert(filter.get.getClass == filterClass)
-  }
-
-  test("Pushdown IsNull predicate") {
-    checkFilter('a.int.isNull,    classOf[Operators.Eq[Integer]])
-    checkFilter('a.long.isNull,   classOf[Operators.Eq[java.lang.Long]])
-    checkFilter('a.float.isNull,  classOf[Operators.Eq[java.lang.Float]])
-    checkFilter('a.double.isNull, classOf[Operators.Eq[java.lang.Double]])
-    checkFilter('a.string.isNull, classOf[Operators.Eq[Binary]])
-    checkFilter('a.binary.isNull, classOf[Operators.Eq[Binary]])
-  }
-
-  test("Pushdown IsNotNull predicate") {
-    checkFilter('a.int.isNotNull,    classOf[Operators.NotEq[Integer]])
-    checkFilter('a.long.isNotNull,   classOf[Operators.NotEq[java.lang.Long]])
-    checkFilter('a.float.isNotNull,  classOf[Operators.NotEq[java.lang.Float]])
-    checkFilter('a.double.isNotNull, classOf[Operators.NotEq[java.lang.Double]])
-    checkFilter('a.string.isNotNull, classOf[Operators.NotEq[Binary]])
-    checkFilter('a.binary.isNotNull, classOf[Operators.NotEq[Binary]])
-  }
-
-  test("Pushdown EqualTo predicate") {
-    checkFilter('a.int === 0,                 classOf[Operators.Eq[Integer]])
-    checkFilter('a.long === 0.toLong,         classOf[Operators.Eq[java.lang.Long]])
-    checkFilter('a.float === 0.toFloat,       classOf[Operators.Eq[java.lang.Float]])
-    checkFilter('a.double === 0.toDouble,     classOf[Operators.Eq[java.lang.Double]])
-    checkFilter('a.string === "foo",          classOf[Operators.Eq[Binary]])
-    checkFilter('a.binary === "foo".getBytes, classOf[Operators.Eq[Binary]])
-  }
-
-  test("Pushdown Not(EqualTo) predicate") {
-    checkFilter(!('a.int === 0),                 classOf[Operators.NotEq[Integer]])
-    checkFilter(!('a.long === 0.toLong),         classOf[Operators.NotEq[java.lang.Long]])
-    checkFilter(!('a.float === 0.toFloat),       classOf[Operators.NotEq[java.lang.Float]])
-    checkFilter(!('a.double === 0.toDouble),     classOf[Operators.NotEq[java.lang.Double]])
-    checkFilter(!('a.string === "foo"),          classOf[Operators.NotEq[Binary]])
-    checkFilter(!('a.binary === "foo".getBytes), classOf[Operators.NotEq[Binary]])
-  }
-
-  test("Pushdown LessThan predicate") {
-    checkFilter('a.int < 0,                 classOf[Operators.Lt[Integer]])
-    checkFilter('a.long < 0.toLong,         classOf[Operators.Lt[java.lang.Long]])
-    checkFilter('a.float < 0.toFloat,       classOf[Operators.Lt[java.lang.Float]])
-    checkFilter('a.double < 0.toDouble,     classOf[Operators.Lt[java.lang.Double]])
-    checkFilter('a.string < "foo",          classOf[Operators.Lt[Binary]])
-    checkFilter('a.binary < "foo".getBytes, classOf[Operators.Lt[Binary]])
-  }
-
-  test("Pushdown LessThanOrEqual predicate") {
-    checkFilter('a.int <= 0,                 classOf[Operators.LtEq[Integer]])
-    checkFilter('a.long <= 0.toLong,         classOf[Operators.LtEq[java.lang.Long]])
-    checkFilter('a.float <= 0.toFloat,       classOf[Operators.LtEq[java.lang.Float]])
-    checkFilter('a.double <= 0.toDouble,     classOf[Operators.LtEq[java.lang.Double]])
-    checkFilter('a.string <= "foo",          classOf[Operators.LtEq[Binary]])
-    checkFilter('a.binary <= "foo".getBytes, classOf[Operators.LtEq[Binary]])
-  }
-
-  test("Pushdown GreaterThan predicate") {
-    checkFilter('a.int > 0,                 classOf[Operators.Gt[Integer]])
-    checkFilter('a.long > 0.toLong,         classOf[Operators.Gt[java.lang.Long]])
-    checkFilter('a.float > 0.toFloat,       classOf[Operators.Gt[java.lang.Float]])
-    checkFilter('a.double > 0.toDouble,     classOf[Operators.Gt[java.lang.Double]])
-    checkFilter('a.string > "foo",          classOf[Operators.Gt[Binary]])
-    checkFilter('a.binary > "foo".getBytes, classOf[Operators.Gt[Binary]])
-  }
-
-  test("Pushdown GreaterThanOrEqual predicate") {
-    checkFilter('a.int >= 0,                 classOf[Operators.GtEq[Integer]])
-    checkFilter('a.long >= 0.toLong,         classOf[Operators.GtEq[java.lang.Long]])
-    checkFilter('a.float >= 0.toFloat,       classOf[Operators.GtEq[java.lang.Float]])
-    checkFilter('a.double >= 0.toDouble,     classOf[Operators.GtEq[java.lang.Double]])
-    checkFilter('a.string >= "foo",          classOf[Operators.GtEq[Binary]])
-    checkFilter('a.binary >= "foo".getBytes, classOf[Operators.GtEq[Binary]])
-  }
-
-  test("Comparison with null should not be pushed down") {
-    val predicates = Seq(
-      'a.int === null,
-      !('a.int === null),
-
-      Literal(null) === 'a.int,
-      !(Literal(null) === 'a.int),
-
-      'a.int < null,
-      'a.int <= null,
-      'a.int > null,
-      'a.int >= null,
-
-      Literal(null) < 'a.int,
-      Literal(null) <= 'a.int,
-      Literal(null) > 'a.int,
-      Literal(null) >= 'a.int
-    )
-
-    predicates.foreach { p =>
-      assert(
-        ParquetFilters.createFilter(p).isEmpty,
-        "Comparison predicate with null shouldn't be pushed down")
-    }
-  }
-
-  test("Import of simple Parquet files using glob wildcard pattern") {
-    val testGlobDir = ParquetTestData.testGlobDir.toString
-    val globPatterns = Array(testGlobDir + "/*/*", testGlobDir + "/spark-*/*", testGlobDir + "/?pa?k-*/*")
-    globPatterns.foreach { path =>
-      val result = parquetFile(path).collect()
-      assert(result.size === 45)
-      result.zipWithIndex.foreach {
-        case (row, index) => {
-          val checkBoolean =
-            if ((index % 15) % 3 == 0)
-              row(0) == true
-            else
-              row(0) == false
-          assert(checkBoolean === true, s"boolean field value in line $index did not match")
-          if ((index % 15) % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
-          assert(row(2) === "abc", s"string field value in line $index did not match")
-          assert(row(3) === ((index.toLong % 15) << 33), s"long value in line $index did not match")
-          assert(row(4) === 2.5F, s"float field value in line $index did not match")
-          assert(row(5) === 4.5D, s"double field value in line $index did not match")
-        }
-      }
+    withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+      checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba19689f/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
deleted file mode 100644
index 7b3f8c2..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.test.TestSQLContext._
-
-/**
- * A test suite that tests various Parquet queries.
- */
-class ParquetQuerySuite2 extends QueryTest with ParquetTest {
-  val sqlContext = TestSQLContext
-
-  test("simple projection") {
-    withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
-      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
-    }
-  }
-
-  test("appending") {
-    val data = (0 until 10).map(i => (i, i.toString))
-    withParquetTable(data, "t") {
-      sql("INSERT INTO t SELECT * FROM t")
-      checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
-    }
-  }
-
-  test("self-join") {
-    // 4 rows, cells of column 1 of row 2 and row 4 are null
-    val data = (1 to 4).map { i =>
-      val maybeInt = if (i % 2 == 0) None else Some(i)
-      (maybeInt, i.toString)
-    }
-
-    withParquetTable(data, "t") {
-      val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
-      val queryOutput = selfJoin.queryExecution.analyzed.output
-
-      assertResult(4, s"Field count mismatches")(queryOutput.size)
-      assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
-        queryOutput.filter(_.name == "_1").map(_.exprId).size
-      }
-
-      checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
-    }
-  }
-
-  test("nested data - struct with array field") {
-    val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
-    withParquetTable(data, "t") {
-      checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
-        case Tuple1((_, Seq(string))) => Row(string)
-      })
-    }
-  }
-
-  test("nested data - array of struct") {
-    val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
-    withParquetTable(data, "t") {
-      checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
-        case Tuple1(Seq((_, string))) => Row(string)
-      })
-    }
-  }
-
-  test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
-    withParquetTable((1 to 10).map(Tuple1.apply), "t") {
-      checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org