You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/04 00:32:18 UTC

git commit: Spark parquet improvements

Repository: spark
Updated Branches:
  refs/heads/master 92a86b285 -> fbebaedf2


Spark parquet improvements

A few improvements to the Parquet support for SQL queries:
- Instead of files a ParquetRelation is now backed by a directory, which simplifies importing data from other
  sources
- InsertIntoParquetTable operation now supports switching between overwriting or appending (at least in
  HiveQL)
- tests now use the new API
- Parquet logging can be set to WARNING level (Default)
- Default compression for Parquet files (GZIP, as in parquet-mr)

Author: Andre Schumacher <an...@iki.fi>

Closes #195 from AndreSchumacher/spark_parquet_improvements and squashes the following commits:

54df314 [Andre Schumacher] SPARK-1383 [SQL] Improvements to ParquetRelation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbebaedf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbebaedf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbebaedf

Branch: refs/heads/master
Commit: fbebaedf26286ee8a75065822a3af1148351f828
Parents: 92a86b2
Author: Andre Schumacher <an...@iki.fi>
Authored: Thu Apr 3 15:31:47 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Apr 3 15:31:47 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |  14 +-
 .../spark/sql/catalyst/analysis/Catalog.scala   |  26 ++-
 .../scala/org/apache/spark/sql/SQLContext.scala |   4 +-
 .../spark/sql/execution/SparkStrategies.scala   |   6 +-
 .../spark/sql/parquet/ParquetRelation.scala     | 129 +++++++++-----
 .../sql/parquet/ParquetTableOperations.scala    | 139 ++++++++++++---
 .../spark/sql/parquet/ParquetTableSupport.scala |  35 ++--
 .../spark/sql/parquet/ParquetTestData.scala     |  10 +-
 sql/core/src/test/resources/log4j.properties    |   8 +-
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 118 +++++++++++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   2 +
 .../org/apache/spark/sql/hive/TestHive.scala    |   2 +
 .../spark/sql/hive/CachedTableSuite.scala       |   4 +-
 .../sql/hive/execution/HiveComparisonTest.scala |   6 +-
 .../spark/sql/parquet/HiveParquetSuite.scala    | 169 +++++++++----------
 15 files changed, 460 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 8de8759..4ea80fe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -106,6 +106,8 @@ class SqlParser extends StandardTokenParsers {
   protected val IF = Keyword("IF")
   protected val IN = Keyword("IN")
   protected val INNER = Keyword("INNER")
+  protected val INSERT = Keyword("INSERT")
+  protected val INTO = Keyword("INTO")
   protected val IS = Keyword("IS")
   protected val JOIN = Keyword("JOIN")
   protected val LEFT = Keyword("LEFT")
@@ -114,6 +116,7 @@ class SqlParser extends StandardTokenParsers {
   protected val NULL = Keyword("NULL")
   protected val ON = Keyword("ON")
   protected val OR = Keyword("OR")
+  protected val OVERWRITE = Keyword("OVERWRITE")
   protected val LIKE = Keyword("LIKE")
   protected val RLIKE = Keyword("RLIKE")
   protected val REGEXP = Keyword("REGEXP")
@@ -162,7 +165,7 @@ class SqlParser extends StandardTokenParsers {
     select * (
       UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
       UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
-    )
+    ) | insert
 
   protected lazy val select: Parser[LogicalPlan] =
     SELECT ~> opt(DISTINCT) ~ projections ~
@@ -185,6 +188,13 @@ class SqlParser extends StandardTokenParsers {
         withLimit
   }
 
+  protected lazy val insert: Parser[LogicalPlan] =
+    INSERT ~> opt(OVERWRITE) ~ inTo ~ select <~ opt(";") ^^ {
+      case o ~ r ~ s =>
+        val overwrite: Boolean = o.getOrElse("") == "OVERWRITE"
+        InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
+    }
+
   protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
 
   protected lazy val projection: Parser[Expression] =
@@ -195,6 +205,8 @@ class SqlParser extends StandardTokenParsers {
 
   protected lazy val from: Parser[LogicalPlan] = FROM ~> relations
 
+  protected lazy val inTo: Parser[LogicalPlan] = INTO ~> relation
+
   // Based very loosely on the MySQL Grammar.
   // http://dev.mysql.com/doc/refman/5.0/en/join.html
   protected lazy val relations: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 6b58b93..f30b5d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -31,19 +31,33 @@ trait Catalog {
     alias: Option[String] = None): LogicalPlan
 
   def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
+
   def unregisterTable(databaseName: Option[String], tableName: String): Unit
+
+  def unregisterAllTables(): Unit
 }
 
 class SimpleCatalog extends Catalog {
   val tables = new mutable.HashMap[String, LogicalPlan]()
 
-  def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
+  override def registerTable(
+      databaseName: Option[String],
+      tableName: String,
+      plan: LogicalPlan): Unit = {
     tables += ((tableName, plan))
   }
 
-  def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName }
+  override def unregisterTable(
+      databaseName: Option[String],
+      tableName: String) = {
+    tables -= tableName
+  }
+
+  override def unregisterAllTables() = {
+    tables.clear()
+  }
 
-  def lookupRelation(
+  override def lookupRelation(
       databaseName: Option[String],
       tableName: String,
       alias: Option[String] = None): LogicalPlan = {
@@ -92,6 +106,10 @@ trait OverrideCatalog extends Catalog {
   override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
     overrides.remove((databaseName, tableName))
   }
+
+  override def unregisterAllTables(): Unit = {
+    overrides.clear()
+  }
 }
 
 /**
@@ -113,4 +131,6 @@ object EmptyCatalog extends Catalog {
   def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
     throw new UnsupportedOperationException
   }
+
+  override def unregisterAllTables(): Unit = {}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f4bf00f..36059c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -80,12 +80,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
 
   /**
-   * Loads a parequet file, returning the result as a [[SchemaRDD]].
+   * Loads a Parquet file, returning the result as a [[SchemaRDD]].
    *
    * @group userf
    */
   def parquetFile(path: String): SchemaRDD =
-    new SchemaRDD(this, parquet.ParquetRelation("ParquetFile", path))
+    new SchemaRDD(this, parquet.ParquetRelation(path))
 
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b3e51fd..fe8bd5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -171,10 +171,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       // TODO: need to support writing to other types of files.  Unify the below code paths.
       case logical.WriteToFile(path, child) =>
         val relation =
-          ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
-        InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
+          ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
+        InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
       case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
-        InsertIntoParquetTable(table, planLater(child))(sparkContext) :: Nil
+        InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
       case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
         // TODO: Should be pushing down filters as well.
         pruneFilterProject(

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 4ab755c..114bfbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -17,30 +17,29 @@
 
 package org.apache.spark.sql.parquet
 
-import java.io.{IOException, FileNotFoundException}
-
-import scala.collection.JavaConversions._
+import java.io.IOException
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.mapreduce.Job
 
-import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
 import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
+import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader}
+import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
 import parquet.io.api.{Binary, RecordConsumer}
+import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser}
 import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
 import parquet.schema.Type.Repetition
-import parquet.schema.{MessageType, MessageTypeParser}
-import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
-import parquet.schema.{Type => ParquetType}
 
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
 import org.apache.spark.sql.catalyst.types._
 
+// Implicits
+import scala.collection.JavaConversions._
+
 /**
  * Relation that consists of data stored in a Parquet columnar format.
  *
@@ -48,14 +47,14 @@ import org.apache.spark.sql.catalyst.types._
  * of using this class directly.
  *
  * {{{
- *   val parquetRDD = sqlContext.parquetFile("path/to/parequet.file")
+ *   val parquetRDD = sqlContext.parquetFile("path/to/parquet.file")
  * }}}
  *
- * @param tableName The name of the relation that can be used in queries.
  * @param path The path to the Parquet file.
  */
-case class ParquetRelation(tableName: String, path: String)
-  extends BaseRelation with MultiInstanceRelation {
+private[sql] case class ParquetRelation(val path: String)
+    extends LeafNode with MultiInstanceRelation {
+  self: Product =>
 
   /** Schema derived from ParquetFile */
   def parquetSchema: MessageType =
@@ -65,33 +64,59 @@ case class ParquetRelation(tableName: String, path: String)
       .getSchema
 
   /** Attributes */
-  val attributes =
+  override val output =
     ParquetTypesConverter
-    .convertToAttributes(parquetSchema)
+      .convertToAttributes(parquetSchema)
 
-  /** Output */
-  override val output = attributes
-
-  // Parquet files have no concepts of keys, therefore no Partitioner
-  // Note: we could allow Block level access; needs to be thought through
-  override def isPartitioned = false
-
-  override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
+  override def newInstance = ParquetRelation(path).asInstanceOf[this.type]
 
   // Equals must also take into account the output attributes so that we can distinguish between
   // different instances of the same relation,
   override def equals(other: Any) = other match {
     case p: ParquetRelation =>
-      p.tableName == tableName && p.path == path && p.output == output
+      p.path == path && p.output == output
     case _ => false
   }
 }
 
-object ParquetRelation {
+private[sql] object ParquetRelation {
+
+  def enableLogForwarding() {
+    // Note: Parquet does not use forwarding to parent loggers which
+    // is required for the JUL-SLF4J bridge to work. Also there is
+    // a default logger that appends to Console which needs to be
+    // reset.
+    import org.slf4j.bridge.SLF4JBridgeHandler
+    import java.util.logging.Logger
+    import java.util.logging.LogManager
+
+    val loggerNames = Seq(
+      "parquet.hadoop.ColumnChunkPageWriteStore",
+      "parquet.hadoop.InternalParquetRecordWriter",
+      "parquet.hadoop.ParquetRecordReader",
+      "parquet.hadoop.ParquetInputFormat",
+      "parquet.hadoop.ParquetOutputFormat",
+      "parquet.hadoop.ParquetFileReader",
+      "parquet.hadoop.InternalParquetRecordReader",
+      "parquet.hadoop.codec.CodecConfig")
+    LogManager.getLogManager.reset()
+    SLF4JBridgeHandler.install()
+    for(name <- loggerNames) {
+      val logger = Logger.getLogger(name)
+      logger.setParent(Logger.getGlobal)
+      logger.setUseParentHandlers(true)
+    }
+  }
 
   // The element type for the RDDs that this relation maps to.
   type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow
 
+  // The compression type
+  type CompressionType = parquet.hadoop.metadata.CompressionCodecName
+
+  // The default compression
+  val defaultCompression = CompressionCodecName.GZIP
+
   /**
    * Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
    * this is used inside [[org.apache.spark.sql.execution.SparkStrategies SparkStrategies]] to
@@ -100,24 +125,39 @@ object ParquetRelation {
    *
    * @param pathString The directory the Parquetfile will be stored in.
    * @param child The child node that will be used for extracting the schema.
-   * @param conf A configuration configuration to be used.
-   * @param tableName The name of the resulting relation.
-   * @return An empty ParquetRelation inferred metadata.
+   * @param conf A configuration to be used.
+   * @return An empty ParquetRelation with inferred metadata.
    */
   def create(pathString: String,
              child: LogicalPlan,
-             conf: Configuration,
-             tableName: Option[String]): ParquetRelation = {
+             conf: Configuration): ParquetRelation = {
     if (!child.resolved) {
       throw new UnresolvedException[LogicalPlan](
         child,
         "Attempt to create Parquet table from unresolved child (when schema is not available)")
     }
+    createEmpty(pathString, child.output, conf)
+  }
 
-    val name = s"${tableName.getOrElse(child.nodeName)}_parquet"
+  /**
+   * Creates an empty ParquetRelation and underlying Parquetfile that only
+   * consists of the Metadata for the given schema.
+   *
+   * @param pathString The directory the Parquetfile will be stored in.
+   * @param attributes The schema of the relation.
+   * @param conf A configuration to be used.
+   * @return An empty ParquetRelation.
+   */
+  def createEmpty(pathString: String,
+                  attributes: Seq[Attribute],
+                  conf: Configuration): ParquetRelation = {
     val path = checkPath(pathString, conf)
-    ParquetTypesConverter.writeMetaData(child.output, path, conf)
-    new ParquetRelation(name, path.toString)
+    if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
+      conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
+    }
+    ParquetRelation.enableLogForwarding()
+    ParquetTypesConverter.writeMetaData(attributes, path, conf)
+    new ParquetRelation(path.toString)
   }
 
   private def checkPath(pathStr: String, conf: Configuration): Path = {
@@ -143,7 +183,7 @@ object ParquetRelation {
   }
 }
 
-object ParquetTypesConverter {
+private[parquet] object ParquetTypesConverter {
   def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
     // for now map binary to string type
     // TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
@@ -242,6 +282,7 @@ object ParquetTypesConverter {
       extraMetadata,
       "Spark")
 
+    ParquetRelation.enableLogForwarding()
     ParquetFileWriter.writeMetadataFile(
       conf,
       path,
@@ -268,16 +309,24 @@ object ParquetTypesConverter {
       throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
     }
     val path = origPath.makeQualified(fs)
+    if (!fs.getFileStatus(path).isDir) {
+      throw new IllegalArgumentException(
+        s"Expected $path for be a directory with Parquet files/metadata")
+    }
+    ParquetRelation.enableLogForwarding()
     val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+    // if this is a new table that was just created we will find only the metadata file
     if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
-      // TODO: improve exception handling, etc.
       ParquetFileReader.readFooter(conf, metadataPath)
     } else {
-      if (!fs.exists(path) || !fs.isFile(path)) {
-        throw new FileNotFoundException(
-          s"Could not find file ${path.toString} when trying to read metadata")
+      // there may be one or more Parquet files in the given directory
+      val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
+      // TODO: for now we assume that all footers (if there is more than one) have identical
+      // metadata; we may want to add a check here at some point
+      if (footers.size() == 0) {
+        throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
       }
-      ParquetFileReader.readFooter(conf, path)
+      footers(0).getParquetMetadata
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 7285f5b..d5846ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -24,26 +24,29 @@ import java.util.Date
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
 
-import parquet.hadoop.util.ContextUtil
 import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat}
+import parquet.hadoop.util.ContextUtil
 import parquet.io.InvalidRecordException
 import parquet.schema.MessageType
 
+import org.apache.spark.{SerializableWritable, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
-import org.apache.spark.{SerializableWritable, SparkContext, TaskContext}
 
 /**
  * Parquet table scan operator. Imports the file that backs the given
  * [[ParquetRelation]] as a RDD[Row].
  */
 case class ParquetTableScan(
-    @transient output: Seq[Attribute],
-    @transient relation: ParquetRelation,
-    @transient columnPruningPred: Option[Expression])(
+    // note: output cannot be transient, see
+    // https://issues.apache.org/jira/browse/SPARK-1367
+    output: Seq[Attribute],
+    relation: ParquetRelation,
+    columnPruningPred: Option[Expression])(
     @transient val sc: SparkContext)
   extends LeafNode {
 
@@ -53,6 +56,12 @@ case class ParquetTableScan(
       job,
       classOf[org.apache.spark.sql.parquet.RowReadSupport])
     val conf: Configuration = ContextUtil.getConfiguration(job)
+    val fileList = FileSystemHelper.listFiles(relation.path, conf)
+    // add all paths in the directory but skip "hidden" ones such
+    // as "_SUCCESS" and "_metadata"
+    for (path <- fileList if !path.getName.startsWith("_")) {
+      NewFileInputFormat.addInputPath(job, path)
+    }
     conf.set(
         RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA,
         ParquetTypesConverter.convertFromAttributes(output).toString)
@@ -63,14 +72,12 @@ case class ParquetTableScan(
       ``FilteredRecordReader`` (via Configuration, for example). Simple
       filter-rows-by-column-values however should be supported.
     */
-    sc.newAPIHadoopFile(
-      relation.path,
-      classOf[ParquetInputFormat[Row]],
-      classOf[Void], classOf[Row],
-      conf)
+    sc.newAPIHadoopRDD(conf, classOf[ParquetInputFormat[Row]], classOf[Void], classOf[Row])
     .map(_._2)
   }
 
+  override def otherCopyArgs = sc :: Nil
+
   /**
    * Applies a (candidate) projection.
    *
@@ -108,15 +115,31 @@ case class ParquetTableScan(
   }
 }
 
+/**
+ * Operator that acts as a sink for queries on RDDs and can be used to
+ * store the output inside a directory of Parquet files. This operator
+ * is similar to Hive's INSERT INTO TABLE operation in the sense that
+ * one can choose to either overwrite or append to a directory. Note
+ * that consecutive insertions to the same table must have compatible
+ * (source) schemas.
+ *
+ * WARNING: EXPERIMENTAL! InsertIntoParquetTable with overwrite=false may
+ * cause data corruption in the case that multiple users try to append to
+ * the same table simultaneously. Inserting into a table that was
+ * previously generated by other means (e.g., by creating an HDFS
+ * directory and importing Parquet files generated by other tools) may
+ * cause unpredicted behaviour and therefore results in a RuntimeException
+ * (only detected via filename pattern so will not catch all cases).
+ */
 case class InsertIntoParquetTable(
-    @transient relation: ParquetRelation,
-    @transient child: SparkPlan)(
+    relation: ParquetRelation,
+    child: SparkPlan,
+    overwrite: Boolean = false)(
     @transient val sc: SparkContext)
   extends UnaryNode with SparkHadoopMapReduceUtil {
 
   /**
-   * Inserts all the rows in the Parquet file. Note that OVERWRITE is implicit, since
-   * Parquet files are write-once.
+   * Inserts all rows into the Parquet file.
    */
   override def execute() = {
     // TODO: currently we do not check whether the "schema"s are compatible
@@ -135,19 +158,21 @@ case class InsertIntoParquetTable(
       classOf[org.apache.spark.sql.parquet.RowWriteSupport])
 
     // TODO: move that to function in object
-    val conf = job.getConfiguration
+    val conf = ContextUtil.getConfiguration(job)
     conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, relation.parquetSchema.toString)
 
     val fspath = new Path(relation.path)
     val fs = fspath.getFileSystem(conf)
 
-    try {
-      fs.delete(fspath, true)
-    } catch {
-      case e: IOException =>
-        throw new IOException(
-          s"Unable to clear output directory ${fspath.toString} prior"
-          + s" to InsertIntoParquetTable:\n${e.toString}")
+    if (overwrite) {
+      try {
+        fs.delete(fspath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(
+            s"Unable to clear output directory ${fspath.toString} prior"
+              + s" to InsertIntoParquetTable:\n${e.toString}")
+      }
     }
     saveAsHadoopFile(childRdd, relation.path.toString, conf)
 
@@ -157,6 +182,8 @@ case class InsertIntoParquetTable(
 
   override def output = child.output
 
+  override def otherCopyArgs = sc :: Nil
+
   // based on ``saveAsNewAPIHadoopFile`` in [[PairRDDFunctions]]
   // TODO: Maybe PairRDDFunctions should use Product2 instead of Tuple2?
   // .. then we could use the default one and could use [[MutablePair]]
@@ -167,15 +194,21 @@ case class InsertIntoParquetTable(
       conf: Configuration) {
     val job = new Job(conf)
     val keyType = classOf[Void]
-    val outputFormatType = classOf[parquet.hadoop.ParquetOutputFormat[Row]]
     job.setOutputKeyClass(keyType)
     job.setOutputValueClass(classOf[Row])
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
     NewFileOutputFormat.setOutputPath(job, new Path(path))
+    val wrappedConf = new SerializableWritable(job.getConfiguration)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = sc.newRddId()
 
+    val taskIdOffset =
+      if (overwrite) 1
+      else {
+        FileSystemHelper
+          .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
+      }
+
     def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
@@ -184,7 +217,7 @@ case class InsertIntoParquetTable(
       val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
         attemptNumber)
       val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
-      val format = outputFormatType.newInstance
+      val format = new AppendingParquetOutputFormat(taskIdOffset)
       val committer = format.getOutputCommitter(hadoopContext)
       committer.setupTask(hadoopContext)
       val writer = format.getRecordWriter(hadoopContext)
@@ -196,7 +229,7 @@ case class InsertIntoParquetTable(
       committer.commitTask(hadoopContext)
       return 1
     }
-    val jobFormat = outputFormatType.newInstance
+    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
     /* apparently we need a TaskAttemptID to construct an OutputCommitter;
      * however we're only going to use this local OutputCommitter for
      * setupJob/commitJob, so we just use a dummy "map" task.
@@ -210,3 +243,55 @@ case class InsertIntoParquetTable(
   }
 }
 
+// TODO: this will be able to append to directories it created itself, not necessarily
+// to imported ones
+private[parquet] class AppendingParquetOutputFormat(offset: Int)
+  extends parquet.hadoop.ParquetOutputFormat[Row] {
+  // override to accept existing directories as valid output directory
+  override def checkOutputSpecs(job: JobContext): Unit = {}
+
+  // override to choose output filename so not overwrite existing ones
+  override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+    val taskId: TaskID = context.getTaskAttemptID.getTaskID
+    val partition: Int = taskId.getId
+    val filename = s"part-r-${partition + offset}.parquet"
+    val committer: FileOutputCommitter =
+      getOutputCommitter(context).asInstanceOf[FileOutputCommitter]
+    new Path(committer.getWorkPath, filename)
+  }
+}
+
+private[parquet] object FileSystemHelper {
+  def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
+    val origPath = new Path(pathStr)
+    val fs = origPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException(
+        s"ParquetTableOperations: Path $origPath is incorrectly formatted")
+    }
+    val path = origPath.makeQualified(fs)
+    if (!fs.exists(path) || !fs.getFileStatus(path).isDir) {
+      throw new IllegalArgumentException(
+        s"ParquetTableOperations: path $path does not exist or is not a directory")
+    }
+    fs.listStatus(path).map(_.getPath)
+  }
+
+  // finds the maximum taskid in the output file names at the given path
+  def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
+    val files = FileSystemHelper.listFiles(pathStr, conf)
+    // filename pattern is part-r-<int>.parquet
+    val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
+    val hiddenFileP = new scala.util.matching.Regex("_.*")
+    files.map(_.getName).map {
+      case nameP(taskid) => taskid.toInt
+      case hiddenFileP() => 0
+      case other: String => {
+        sys.error("ERROR: attempting to append to set of Parquet files and found file" +
+          s"that does not match name pattern: $other")
+        0
+      }
+      case _ => 0
+    }.reduceLeft((a, b) => if (a < b) b else a)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index c21e400..84b1b46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.types._
  *
  *@param root The root group converter for the record.
  */
-class RowRecordMaterializer(root: CatalystGroupConverter) extends RecordMaterializer[Row] {
+private[parquet] class RowRecordMaterializer(root: CatalystGroupConverter)
+  extends RecordMaterializer[Row] {
 
   def this(parquetSchema: MessageType) =
     this(new CatalystGroupConverter(ParquetTypesConverter.convertToAttributes(parquetSchema)))
@@ -48,14 +49,14 @@ class RowRecordMaterializer(root: CatalystGroupConverter) extends RecordMaterial
 /**
  * A `parquet.hadoop.api.ReadSupport` for Row objects.
  */
-class RowReadSupport extends ReadSupport[Row] with Logging {
+private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
 
   override def prepareForRead(
       conf: Configuration,
       stringMap: java.util.Map[String, String],
       fileSchema: MessageType,
       readContext: ReadContext): RecordMaterializer[Row] = {
-    log.debug(s"preparing for read with schema ${fileSchema.toString}")
+    log.debug(s"preparing for read with file schema $fileSchema")
     new RowRecordMaterializer(readContext.getRequestedSchema)
   }
 
@@ -67,20 +68,20 @@ class RowReadSupport extends ReadSupport[Row] with Logging {
       configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString)
     val requested_schema =
       MessageTypeParser.parseMessageType(requested_schema_string)
-
-    log.debug(s"read support initialized for original schema ${requested_schema.toString}")
+    log.debug(s"read support initialized for requested schema $requested_schema")
+    ParquetRelation.enableLogForwarding()
     new ReadContext(requested_schema, keyValueMetaData)
   }
 }
 
-object RowReadSupport {
+private[parquet] object RowReadSupport {
   val PARQUET_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
 }
 
 /**
  * A `parquet.hadoop.api.WriteSupport` for Row ojects.
  */
-class RowWriteSupport extends WriteSupport[Row] with Logging {
+private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   def setSchema(schema: MessageType, configuration: Configuration) {
     // for testing
     this.schema = schema
@@ -104,6 +105,8 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
   override def init(configuration: Configuration): WriteSupport.WriteContext = {
     schema = if (schema == null) getSchema(configuration) else schema
     attributes = ParquetTypesConverter.convertToAttributes(schema)
+    log.debug(s"write support initialized for requested schema $schema")
+    ParquetRelation.enableLogForwarding()
     new WriteSupport.WriteContext(
       schema,
       new java.util.HashMap[java.lang.String, java.lang.String]())
@@ -111,10 +114,16 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
 
   override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
     writer = recordConsumer
+    log.debug(s"preparing for write with schema $schema")
   }
 
   // TODO: add groups (nested fields)
   override def write(record: Row): Unit = {
+    if (attributes.size > record.size) {
+      throw new IndexOutOfBoundsException(
+        s"Trying to write more fields than contained in row (${attributes.size}>${record.size})")
+    }
+
     var index = 0
     writer.startMessage()
     while(index < attributes.size) {
@@ -130,7 +139,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
   }
 }
 
-object RowWriteSupport {
+private[parquet] object RowWriteSupport {
   val PARQUET_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.schema"
 }
 
@@ -139,7 +148,7 @@ object RowWriteSupport {
  *
  * @param schema The corresponding Catalyst schema in the form of a list of attributes.
  */
-class CatalystGroupConverter(
+private[parquet] class CatalystGroupConverter(
     schema: Seq[Attribute],
     protected[parquet] val current: ParquetRelation.RowType) extends GroupConverter {
 
@@ -177,13 +186,12 @@ class CatalystGroupConverter(
  * @param parent The parent group converter.
  * @param fieldIndex The index inside the record.
  */
-class CatalystPrimitiveConverter(
+private[parquet] class CatalystPrimitiveConverter(
     parent: CatalystGroupConverter,
     fieldIndex: Int) extends PrimitiveConverter {
   // TODO: consider refactoring these together with ParquetTypesConverter
   override def addBinary(value: Binary): Unit =
-    // TODO: fix this once a setBinary will become available in MutableRow
-    parent.getCurrentRecord.setByte(fieldIndex, value.getBytes.apply(0))
+    parent.getCurrentRecord.update(fieldIndex, value.getBytes)
 
   override def addBoolean(value: Boolean): Unit =
     parent.getCurrentRecord.setBoolean(fieldIndex, value)
@@ -208,10 +216,9 @@ class CatalystPrimitiveConverter(
  * @param parent The parent group converter.
  * @param fieldIndex The index inside the record.
  */
-class CatalystPrimitiveStringConverter(
+private[parquet] class CatalystPrimitiveStringConverter(
     parent: CatalystGroupConverter,
     fieldIndex: Int) extends CatalystPrimitiveConverter(parent, fieldIndex) {
   override def addBinary(value: Binary): Unit =
     parent.getCurrentRecord.setString(fieldIndex, value.toStringUsingUTF8)
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index 3340c3f..728e3dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -26,7 +26,7 @@ import parquet.hadoop.util.ContextUtil
 import parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.util.Utils
 
 object ParquetTestData {
 
@@ -64,13 +64,13 @@ object ParquetTestData {
     "mylong:Long"
   )
 
-  val testFile = getTempFilePath("testParquetFile").getCanonicalFile
+  val testDir = Utils.createTempDir()
 
-  lazy val testData = new ParquetRelation("testData", testFile.toURI.toString)
+  lazy val testData = new ParquetRelation(testDir.toURI.toString)
 
   def writeFile() = {
-    testFile.delete
-    val path: Path = new Path(testFile.toURI)
+    testDir.delete
+    val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet"))
     val job = new Job()
     val configuration: Configuration = ContextUtil.getConfiguration(job)
     val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 7bb6789..dffd15a 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -45,8 +45,6 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
 log4j.additivity.hive.ql.metadata.Hive=false
 log4j.logger.hive.ql.metadata.Hive=OFF
 
-# Parquet logging
-parquet.hadoop.InternalParquetRecordReader=WARN
-log4j.logger.parquet.hadoop.InternalParquetRecordReader=WARN
-parquet.hadoop.ParquetInputFormat=WARN
-log4j.logger.parquet.hadoop.ParquetInputFormat=WARN
+# Parquet related logging
+log4j.logger.parquet.hadoop=WARN
+log4j.logger.org.apache.spark.sql.parquet=INFO

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index ea1733b..a62a3c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -19,27 +19,40 @@ package org.apache.spark.sql.parquet
 
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.hadoop.mapreduce.Job
+
 import parquet.hadoop.ParquetFileWriter
-import parquet.hadoop.util.ContextUtil
 import parquet.schema.MessageTypeParser
+import parquet.hadoop.util.ContextUtil
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
 import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
+import org.apache.spark.sql.{parquet, SchemaRDD}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import scala.Tuple2
 
 // Implicits
 import org.apache.spark.sql.test.TestSQLContext._
 
+case class TestRDDEntry(key: Int, value: String)
+
 class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
+
+  var testRDD: SchemaRDD = null
+
   override def beforeAll() {
     ParquetTestData.writeFile()
+    testRDD = parquetFile(ParquetTestData.testDir.toString)
+    testRDD.registerAsTable("testsource")
   }
 
   override def afterAll() {
-    ParquetTestData.testFile.delete()
+    Utils.deleteRecursively(ParquetTestData.testDir)
+    // here we should also unregister the table??
   }
 
   test("self-join parquet files") {
@@ -55,11 +68,18 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
       case Seq(_, _) => // All good
     }
 
-    // TODO: We can't run this query as it NPEs
+    val result = query.collect()
+    assert(result.size === 9, "self-join result has incorrect size")
+    assert(result(0).size === 12, "result row has incorrect size")
+    result.zipWithIndex.foreach {
+      case (row, index) => row.zipWithIndex.foreach {
+        case (field, column) => assert(field != null, s"self-join contains null value in row $index field $column")
+      }
+    }
   }
 
   test("Import of simple Parquet file") {
-    val result = getRDD(ParquetTestData.testData).collect()
+    val result = parquetFile(ParquetTestData.testDir.toString).collect()
     assert(result.size === 15)
     result.zipWithIndex.foreach {
       case (row, index) => {
@@ -125,20 +145,82 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
     fs.delete(path, true)
   }
 
+  test("Creating case class RDD table") {
+    TestSQLContext.sparkContext.parallelize((1 to 100))
+      .map(i => TestRDDEntry(i, s"val_$i"))
+      .registerAsTable("tmp")
+    val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0))
+    var counter = 1
+    rdd.foreach {
+      // '===' does not like string comparison?
+      row: Row => {
+        assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter")
+        counter = counter + 1
+      }
+    }
+  }
+
+  test("Saving case class RDD table to file and reading it back in") {
+    val file = getTempFilePath("parquet")
+    val path = file.toString
+    val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+      .map(i => TestRDDEntry(i, s"val_$i"))
+    rdd.saveAsParquetFile(path)
+    val readFile = parquetFile(path)
+    readFile.registerAsTable("tmpx")
+    val rdd_copy = sql("SELECT * FROM tmpx").collect()
+    val rdd_orig = rdd.collect()
+    for(i <- 0 to 99) {
+      assert(rdd_copy(i).apply(0) === rdd_orig(i).key,  s"key error in line $i")
+      assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
+    }
+    Utils.deleteRecursively(file)
+    assert(true)
+  }
+
+  test("insert (overwrite) via Scala API (new SchemaRDD)") {
+    val dirname = Utils.createTempDir()
+    val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+      .map(i => TestRDDEntry(i, s"val_$i"))
+    source_rdd.registerAsTable("source")
+    val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType))
+    dest_rdd.registerAsTable("dest")
+    sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
+    val rdd_copy1 = sql("SELECT * FROM dest").collect()
+    assert(rdd_copy1.size === 100)
+    assert(rdd_copy1(0).apply(0) === 1)
+    assert(rdd_copy1(0).apply(1) === "val_1")
+    sql("INSERT INTO dest SELECT * FROM source").collect()
+    val rdd_copy2 = sql("SELECT * FROM dest").collect()
+    assert(rdd_copy2.size === 200)
+    Utils.deleteRecursively(dirname)
+  }
+
+  test("insert (appending) to same table via Scala API") {
+    sql("INSERT INTO testsource SELECT * FROM testsource").collect()
+    val double_rdd = sql("SELECT * FROM testsource").collect()
+    assert(double_rdd != null)
+    assert(double_rdd.size === 30)
+    for(i <- (0 to 14)) {
+      assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match")
+    }
+    // let's restore the original test data
+    Utils.deleteRecursively(ParquetTestData.testDir)
+    ParquetTestData.writeFile()
+  }
+
   /**
-   * Computes the given [[ParquetRelation]] and returns its RDD.
+   * Creates an empty SchemaRDD backed by a ParquetRelation.
    *
-   * @param parquetRelation The Parquet relation.
-   * @return An RDD of Rows.
+   * TODO: since this is so experimental it is better to have it here and not
+   * in SQLContext. Also note that when creating new AttributeReferences
+   * one needs to take care not to create duplicate Attribute ID's.
    */
-  private def getRDD(parquetRelation: ParquetRelation): RDD[Row] = {
-    val scanner = new ParquetTableScan(
-      parquetRelation.output,
-      parquetRelation,
-      None)(TestSQLContext.sparkContext)
-    scanner
-      .execute
-      .map(_.copy())
+  private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
+    val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
+    new SchemaRDD(
+      TestSQLContext,
+      parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 29834a1..fc053c5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -148,6 +148,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
    */
   override def unregisterTable(
       databaseName: Option[String], tableName: String): Unit = ???
+
+  override def unregisterAllTables() = {}
 }
 
 object HiveMetastoreTypes extends RegexParsers {

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index bc3447b..0a6bea0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -313,6 +313,8 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
         catalog.client.dropDatabase(db, true, false, true)
       }
 
+      catalog.unregisterAllTables()
+
       FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName =>
         FunctionRegistry.unregisterTemporaryUDF(udfName)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 68d45e5..79ec1f1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -29,7 +29,7 @@ class CachedTableSuite extends HiveComparisonTest {
   }
 
   createQueryTest("read from cached table",
-    "SELECT * FROM src LIMIT 1")
+    "SELECT * FROM src LIMIT 1", reset = false)
 
   test("check that table is cached and uncache") {
     TestHive.table("src").queryExecution.analyzed match {
@@ -40,7 +40,7 @@ class CachedTableSuite extends HiveComparisonTest {
   }
 
   createQueryTest("read from uncached table",
-    "SELECT * FROM src LIMIT 1")
+    "SELECT * FROM src LIMIT 1", reset = false)
 
   test("make sure table is uncached") {
     TestHive.table("src").queryExecution.analyzed match {

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index c7a350e..18654b3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -170,7 +170,7 @@ abstract class HiveComparisonTest
   }
 
   val installHooksCommand = "(?i)SET.*hooks".r
-  def createQueryTest(testCaseName: String, sql: String) {
+  def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) {
     // If test sharding is enable, skip tests that are not in the correct shard.
     shardInfo.foreach {
       case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return
@@ -228,7 +228,7 @@ abstract class HiveComparisonTest
       try {
         // MINOR HACK: You must run a query before calling reset the first time.
         TestHive.sql("SHOW TABLES")
-        TestHive.reset()
+        if (reset) { TestHive.reset() }
 
         val hiveCacheFiles = queryList.zipWithIndex.map {
           case (queryString, i)  =>
@@ -295,7 +295,7 @@ abstract class HiveComparisonTest
                     fail(errorMessage)
                 }
             }.toSeq
-            TestHive.reset()
+            if (reset) { TestHive.reset() }
 
             computedResults
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/fbebaedf/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 05ad85b..314ca48 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,147 +17,138 @@
 
 package org.apache.spark.sql.parquet
 
-import java.io.File
-
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
+import org.apache.spark.sql.catalyst.types.{DataType, StringType, IntegerType}
+import org.apache.spark.sql.{parquet, SchemaRDD}
 import org.apache.spark.sql.hive.TestHive
+import org.apache.spark.util.Utils
+
+// Implicits
+import org.apache.spark.sql.hive.TestHive._
 
 class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
-  val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString
-
-  // runs a SQL and optionally resolves one Parquet table
-  def runQuery(
-      querystr: String,
-      tableName: Option[String] = None,
-      filename: Option[String] = None): Array[Row] = {
-
-    // call to resolve references in order to get CREATE TABLE AS to work
-    val query = TestHive
-      .parseSql(querystr)
-    val finalQuery =
-      if (tableName.nonEmpty && filename.nonEmpty)
-        resolveParquetTable(tableName.get, filename.get, query)
-      else
-        query
-    TestHive.executePlan(finalQuery)
-      .toRdd
-      .collect()
-  }
 
-  // stores a query output to a Parquet file
-  def storeQuery(querystr: String, filename: String): Unit = {
-    val query = WriteToFile(
-      filename,
-      TestHive.parseSql(querystr))
-    TestHive
-      .executePlan(query)
-      .stringResult()
-  }
+  val dirname = Utils.createTempDir()
 
-  /**
-   * TODO: This function is necessary as long as there is no notion of a Catalog for
-   * Parquet tables. Once such a thing exists this functionality should be moved there.
-   */
-  def resolveParquetTable(tableName: String, filename: String, plan: LogicalPlan): LogicalPlan = {
-    TestHive.loadTestTable("src") // may not be loaded now
-    plan.transform {
-      case relation @ UnresolvedRelation(databaseName, name, alias) =>
-        if (name == tableName)
-          ParquetRelation(tableName, filename)
-        else
-          relation
-      case op @ InsertIntoCreatedTable(databaseName, name, child) =>
-        if (name == tableName) {
-          // note: at this stage the plan is not yet analyzed but Parquet needs to know the schema
-          // and for that we need the child to be resolved
-          val relation = ParquetRelation.create(
-              filename,
-              TestHive.analyzer(child),
-              TestHive.sparkContext.hadoopConfiguration,
-              Some(tableName))
-          InsertIntoTable(
-            relation.asInstanceOf[BaseRelation],
-            Map.empty,
-            child,
-            overwrite = false)
-        } else
-          op
-    }
-  }
+  var testRDD: SchemaRDD = null
 
   override def beforeAll() {
     // write test data
-    ParquetTestData.writeFile()
-    // Override initial Parquet test table
-    TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData)
+    ParquetTestData.writeFile
+    testRDD = parquetFile(ParquetTestData.testDir.toString)
+    testRDD.registerAsTable("testsource")
   }
 
   override def afterAll() {
-    ParquetTestData.testFile.delete()
+    Utils.deleteRecursively(ParquetTestData.testDir)
+    Utils.deleteRecursively(dirname)
+    reset() // drop all tables that were registered as part of the tests
   }
 
+  // in case tests are failing we delete before and after each test
   override def beforeEach() {
-    new File(filename).getAbsoluteFile.delete()
+    Utils.deleteRecursively(dirname)
   }
 
   override def afterEach() {
-    new File(filename).getAbsoluteFile.delete()
+    Utils.deleteRecursively(dirname)
   }
 
   test("SELECT on Parquet table") {
-    val rdd = runQuery("SELECT * FROM parquet.testsource")
+    val rdd = sql("SELECT * FROM testsource").collect()
     assert(rdd != null)
     assert(rdd.forall(_.size == 6))
   }
 
   test("Simple column projection + filter on Parquet table") {
-    val rdd = runQuery("SELECT myboolean, mylong FROM parquet.testsource WHERE myboolean=true")
+    val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect()
     assert(rdd.size === 5, "Filter returned incorrect number of rows")
     assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
   }
 
-  test("Converting Hive to Parquet Table via WriteToFile") {
-    storeQuery("SELECT * FROM src", filename)
-    val rddOne = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
-    val rddTwo = runQuery("SELECT * from ptable", Some("ptable"), Some(filename)).sortBy(_.getInt(0))
+  test("Converting Hive to Parquet Table via saveAsParquetFile") {
+    sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath)
+    parquetFile(dirname.getAbsolutePath).registerAsTable("ptable")
+    val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0))
+    val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0))
     compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
   }
 
   test("INSERT OVERWRITE TABLE Parquet table") {
-    storeQuery("SELECT * FROM parquet.testsource", filename)
-    runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
-    runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
-    val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
-    val rddOrig = runQuery("SELECT * FROM parquet.testsource")
-    compareRDDs(rddOrig, rddCopy, "parquet.testsource", ParquetTestData.testSchemaFieldNames)
+    sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath)
+    parquetFile(dirname.getAbsolutePath).registerAsTable("ptable")
+    // let's do three overwrites for good measure
+    sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+    sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+    sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect()
+    val rddCopy = sql("SELECT * FROM ptable").collect()
+    val rddOrig = sql("SELECT * FROM testsource").collect()
+    assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??")
+    compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
   }
 
-  test("CREATE TABLE AS Parquet table") {
-    runQuery("CREATE TABLE ptable AS SELECT * FROM src", Some("ptable"), Some(filename))
-    val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
+  test("CREATE TABLE of Parquet table") {
+    createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+      .registerAsTable("tmp")
+    val rddCopy =
+      sql("INSERT INTO TABLE tmp SELECT * FROM src")
+      .collect()
       .sortBy[Int](_.apply(0) match {
         case x: Int => x
         case _ => 0
       })
-    val rddOrig = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
+    val rddOrig = sql("SELECT * FROM src")
+      .collect()
+      .sortBy(_.getInt(0))
     compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String"))
   }
 
+  test("Appending to Parquet table") {
+    createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+      .registerAsTable("tmpnew")
+    sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+    sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+    sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
+    val rddCopies = sql("SELECT * FROM tmpnew").collect()
+    val rddOrig = sql("SELECT * FROM src").collect()
+    assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number")
+  }
+
+  test("Appending to and then overwriting Parquet table") {
+    createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
+      .registerAsTable("tmp")
+    sql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
+    sql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
+    sql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect()
+    val rddCopies = sql("SELECT * FROM tmp").collect()
+    val rddOrig = sql("SELECT * FROM src").collect()
+    assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite")
+  }
+
   private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
     var counter = 0
     (rddOne, rddTwo).zipped.foreach {
       (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
-        case ((value_1:Array[Byte], value_2:Array[Byte]), index) =>
-          assert(new String(value_1) === new String(value_2), s"table $tableName row $counter field ${fieldNames(index)} don't match")
         case ((value_1, value_2), index) =>
           assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
       }
     counter = counter + 1
     }
   }
+
+  /**
+   * Creates an empty SchemaRDD backed by a ParquetRelation.
+   *
+   * TODO: since this is so experimental it is better to have it here and not
+   * in SQLContext. Also note that when creating new AttributeReferences
+   * one needs to take care not to create duplicate Attribute ID's.
+   */
+  private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
+    val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
+    new SchemaRDD(
+      TestHive,
+      parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
+  }
 }