You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/21 02:04:45 UTC

[2/9] SPARK-1251 Support for optimizing and executing structured queries

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
new file mode 100644
index 0000000..92d8420
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+
+import catalyst.expressions._
+import catalyst.planning._
+import catalyst.plans._
+import catalyst.plans.logical.{BaseRelation, LogicalPlan}
+
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.parquet.{ParquetRelation, InsertIntoParquetTable, ParquetTableScan}
+
+trait HiveStrategies {
+  // Possibly being too clever with types here... or not clever enough.
+  self: SQLContext#SparkPlanner =>
+
+  val hiveContext: HiveContext
+
+  object Scripts extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case logical.ScriptTransformation(input, script, output, child) =>
+        ScriptTransformation(input, script, output, planLater(child))(hiveContext) :: Nil
+      case _ => Nil
+    }
+  }
+
+  object DataSinks extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
+        InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+      case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
+        InsertIntoParquetTable(table, planLater(child))(hiveContext.sparkContext) :: Nil
+      case _ => Nil
+    }
+  }
+
+  object HiveTableScans extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      // Push attributes into table scan when possible.
+      case p @ logical.Project(projectList, m: MetastoreRelation) if isSimpleProject(projectList) =>
+        HiveTableScan(projectList.asInstanceOf[Seq[Attribute]], m, None)(hiveContext) :: Nil
+      case m: MetastoreRelation =>
+        HiveTableScan(m.output, m, None)(hiveContext) :: Nil
+      case _ => Nil
+    }
+  }
+
+  /**
+   * A strategy used to detect filtering predicates on top of a partitioned relation to help
+   * partition pruning.
+   *
+   * This strategy itself doesn't perform partition pruning, it just collects and combines all the
+   * partition pruning predicates and pass them down to the underlying [[HiveTableScan]] operator,
+   * which does the actual pruning work.
+   */
+  object PartitionPrunings extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case p @ FilteredOperation(predicates, relation: MetastoreRelation)
+        if relation.isPartitioned =>
+
+        val partitionKeyIds = relation.partitionKeys.map(_.id).toSet
+
+        // Filter out all predicates that only deal with partition keys
+        val (pruningPredicates, otherPredicates) = predicates.partition {
+          _.references.map(_.id).subsetOf(partitionKeyIds)
+        }
+
+        val scan = HiveTableScan(
+          relation.output, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)
+
+        otherPredicates
+          .reduceLeftOption(And)
+          .map(Filter(_, scan))
+          .getOrElse(scan) :: Nil
+
+      case _ =>
+        Nil
+    }
+  }
+
+  /**
+   * A strategy that detects projects and filters over some relation and applies column pruning if
+   * possible.  Partition pruning is applied first if the relation is partitioned.
+   */
+  object ColumnPrunings extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+        // TODO(andre): the current mix of HiveRelation and ParquetRelation
+        // here appears artificial; try to refactor to break it into two
+      case PhysicalOperation(projectList, predicates, relation: BaseRelation) =>
+        val predicateOpt = predicates.reduceOption(And)
+        val predicateRefs = predicateOpt.map(_.references).getOrElse(Set.empty)
+        val projectRefs = projectList.flatMap(_.references)
+
+        // To figure out what columns to preserve after column pruning, we need to consider:
+        //
+        // 1. Columns referenced by the project list (order preserved)
+        // 2. Columns referenced by filtering predicates but not by project list
+        // 3. Relation output
+        //
+        // Then the final result is ((1 union 2) intersect 3)
+        val prunedCols = (projectRefs ++ (predicateRefs -- projectRefs)).intersect(relation.output)
+
+        val filteredScans =
+          if (relation.isPartitioned) {  // from here on relation must be a [[MetaStoreRelation]]
+            // Applies partition pruning first for partitioned table
+            val filteredRelation = predicateOpt.map(logical.Filter(_, relation)).getOrElse(relation)
+            PartitionPrunings(filteredRelation).view.map(_.transform {
+              case scan: HiveTableScan =>
+                scan.copy(attributes = prunedCols)(hiveContext)
+            })
+          } else {
+            val scan = relation match {
+              case MetastoreRelation(_, _, _) => {
+                HiveTableScan(
+                  prunedCols,
+                  relation.asInstanceOf[MetastoreRelation],
+                  None)(hiveContext)
+              }
+              case ParquetRelation(_, _) => {
+                ParquetTableScan(
+                  relation.output,
+                  relation.asInstanceOf[ParquetRelation],
+                  None)(hiveContext.sparkContext)
+                  .pruneColumns(prunedCols)
+              }
+            }
+            predicateOpt.map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
+          }
+
+        if (isSimpleProject(projectList) && prunedCols == projectRefs) {
+          filteredScans
+        } else {
+          filteredScans.view.map(execution.Project(projectList, _))
+        }
+
+      case _ =>
+        Nil
+    }
+  }
+
+  /**
+   * Returns true if `projectList` only performs column pruning and does not evaluate other
+   * complex expressions.
+   */
+  def isSimpleProject(projectList: Seq[NamedExpression]) = {
+    projectList.forall(_.isInstanceOf[Attribute])
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
new file mode 100644
index 0000000..f20e9d4
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+
+import java.io.{InputStreamReader, BufferedReader}
+
+import catalyst.expressions._
+import org.apache.spark.sql.execution._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Transforms the input by forking and running the specified script.
+ *
+ * @param input the set of expression that should be passed to the script.
+ * @param script the command that should be executed.
+ * @param output the attributes that are produced by the script.
+ */
+case class ScriptTransformation(
+    input: Seq[Expression],
+    script: String,
+    output: Seq[Attribute],
+    child: SparkPlan)(@transient sc: HiveContext)
+  extends UnaryNode {
+
+  override def otherCopyArgs = sc :: Nil
+
+  def execute() = {
+    child.execute().mapPartitions { iter =>
+      val cmd = List("/bin/bash", "-c", script)
+      val builder = new ProcessBuilder(cmd)
+      val proc = builder.start()
+      val inputStream = proc.getInputStream
+      val outputStream = proc.getOutputStream
+      val reader = new BufferedReader(new InputStreamReader(inputStream))
+
+      // TODO: This should be exposed as an iterator instead of reading in all the data at once.
+      val outputLines = collection.mutable.ArrayBuffer[Row]()
+      val readerThread = new Thread("Transform OutputReader") {
+        override def run() {
+          var curLine = reader.readLine()
+          while (curLine != null) {
+            // TODO: Use SerDe
+            outputLines += new GenericRow(curLine.split("\t").asInstanceOf[Array[Any]])
+            curLine = reader.readLine()
+          }
+        }
+      }
+      readerThread.start()
+      val outputProjection = new Projection(input)
+      iter
+        .map(outputProjection)
+        // TODO: Use SerDe
+        .map(_.mkString("", "\t", "\n").getBytes).foreach(outputStream.write)
+      outputStream.close()
+      readerThread.join()
+      outputLines.toIterator
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
new file mode 100644
index 0000000..71d751c
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf, InputFormat}
+
+import org.apache.spark.SerializableWritable
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.{HadoopRDD, UnionRDD, EmptyRDD, RDD}
+
+
+/**
+ * A trait for subclasses that handle table scans.
+ */
+private[hive] sealed trait TableReader {
+  def makeRDDForTable(hiveTable: HiveTable): RDD[_]
+
+  def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_]
+
+}
+
+
+/**
+ * Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the
+ * data warehouse directory.
+ */
+private[hive]
+class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveContext)
+  extends TableReader {
+
+  // Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
+  // it is smaller than what Spark suggests.
+  private val _minSplitsPerRDD = math.max(
+    sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits)
+
+
+  // TODO: set aws s3 credentials.
+
+  private val _broadcastedHiveConf =
+    sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf))
+
+  def broadcastedHiveConf = _broadcastedHiveConf
+
+  def hiveConf = _broadcastedHiveConf.value.value
+
+  override def makeRDDForTable(hiveTable: HiveTable): RDD[_] =
+    makeRDDForTable(
+      hiveTable,
+      _tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]],
+      filterOpt = None)
+
+  /**
+   * Creates a Hadoop RDD to read data from the target table's data directory. Returns a transformed
+   * RDD that contains deserialized rows.
+   *
+   * @param hiveTable Hive metadata for the table being scanned.
+   * @param deserializerClass Class of the SerDe used to deserialize Writables read from Hadoop.
+   * @param filterOpt If defined, then the filter is used to reject files contained in the data
+   *                  directory being read. If None, then all files are accepted.
+   */
+  def makeRDDForTable(
+      hiveTable: HiveTable,
+      deserializerClass: Class[_ <: Deserializer],
+      filterOpt: Option[PathFilter]): RDD[_] =
+  {
+    assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table,
+      since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""")
+
+    // Create local references to member variables, so that the entire `this` object won't be
+    // serialized in the closure below.
+    val tableDesc = _tableDesc
+    val broadcastedHiveConf = _broadcastedHiveConf
+
+    val tablePath = hiveTable.getPath
+    val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
+
+    //logDebug("Table input: %s".format(tablePath))
+    val ifc = hiveTable.getInputFormatClass
+      .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+    val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
+
+    val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
+      val hconf = broadcastedHiveConf.value.value
+      val deserializer = deserializerClass.newInstance()
+      deserializer.initialize(hconf, tableDesc.getProperties)
+
+      // Deserialize each Writable to get the row value.
+      iter.map {
+        case v: Writable => deserializer.deserialize(v)
+        case value =>
+          sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}")
+      }
+    }
+    deserializedHadoopRDD
+  }
+
+  override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] = {
+    val partitionToDeserializer = partitions.map(part =>
+      (part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap
+    makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None)
+  }
+
+  /**
+   * Create a HadoopRDD for every partition key specified in the query. Note that for on-disk Hive
+   * tables, a data directory is created for each partition corresponding to keys specified using
+   * 'PARTITION BY'.
+   *
+   * @param partitionToDeserializer Mapping from a Hive Partition metadata object to the SerDe
+   *     class to use to deserialize input Writables from the corresponding partition.
+   * @param filterOpt If defined, then the filter is used to reject files contained in the data
+   *     subdirectory of each partition being read. If None, then all files are accepted.
+   */
+  def makeRDDForPartitionedTable(
+      partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
+      filterOpt: Option[PathFilter]): RDD[_] =
+  {
+    val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
+      val partDesc = Utilities.getPartitionDesc(partition)
+      val partPath = partition.getPartitionPath
+      val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
+      val ifc = partDesc.getInputFileFormatClass
+        .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+      // Get partition field info
+      val partSpec = partDesc.getPartSpec
+      val partProps = partDesc.getProperties
+
+      val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS)
+      // Partitioning columns are delimited by "/"
+      val partCols = partColsDelimited.trim().split("/").toSeq
+      // 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'.
+      val partValues = if (partSpec == null) {
+        Array.fill(partCols.size)(new String)
+      } else {
+        partCols.map(col => new String(partSpec.get(col))).toArray
+      }
+
+      // Create local references so that the outer object isn't serialized.
+      val tableDesc = _tableDesc
+      val broadcastedHiveConf = _broadcastedHiveConf
+      val localDeserializer = partDeserializer
+
+      val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
+      hivePartitionRDD.mapPartitions { iter =>
+        val hconf = broadcastedHiveConf.value.value
+        val rowWithPartArr = new Array[Object](2)
+        // Map each tuple to a row object
+        iter.map { value =>
+          val deserializer = localDeserializer.newInstance()
+          deserializer.initialize(hconf, partProps)
+          val deserializedRow = deserializer.deserialize(value)
+          rowWithPartArr.update(0, deserializedRow)
+          rowWithPartArr.update(1, partValues)
+          rowWithPartArr.asInstanceOf[Object]
+        }
+      }
+    }.toSeq
+    // Even if we don't use any partitions, we still need an empty RDD
+    if (hivePartitionRDDs.size == 0) {
+      new EmptyRDD[Object](sc.sparkContext)
+    } else {
+      new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
+    }
+  }
+
+  /**
+   * If `filterOpt` is defined, then it will be used to filter files from `path`. These files are
+   * returned in a single, comma-separated string.
+   */
+  private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
+    filterOpt match {
+      case Some(filter) =>
+        val fs = path.getFileSystem(sc.hiveconf)
+        val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+        filteredFiles.mkString(",")
+      case None => path.toString
+    }
+  }
+
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
+   * applied locally on each slave.
+   */
+  private def createHadoopRdd(
+    tableDesc: TableDesc,
+    path: String,
+    inputFormatClass: Class[InputFormat[Writable, Writable]])
+  : RDD[Writable] = {
+    val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+
+    val rdd = new HadoopRDD(
+      sc.sparkContext,
+      _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      Some(initializeJobConfFunc),
+      inputFormatClass,
+      classOf[Writable],
+      classOf[Writable],
+      _minSplitsPerRDD)
+
+    // Only take the value (skip the key) because Hive works only with values.
+    rdd.map(_._2)
+  }
+
+}
+
+private[hive] object HadoopTableReader {
+
+  /**
+   * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
+   * instantiate a HadoopRDD.
+   */
+  def initializeLocalJobConfFunc(path: String, tableDesc: TableDesc)(jobConf: JobConf) {
+    FileInputFormat.setInputPaths(jobConf, path)
+    if (tableDesc != null) {
+      Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
+    }
+    val bufferSize = System.getProperty("spark.buffer.size", "65536")
+    jobConf.set("io.file.buffer.size", bufferSize)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
new file mode 100644
index 0000000..17ae4ef
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+
+import java.io.File
+import java.util.{Set => JavaSet}
+
+import scala.collection.mutable
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+
+import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
+import org.apache.hadoop.hive.metastore.MetaStoreUtils
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry
+import org.apache.hadoop.hive.ql.io.avro.{AvroContainerOutputFormat, AvroContainerInputFormat}
+import org.apache.hadoop.hive.ql.metadata.Table
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.hive.serde2.RegexSerDe
+
+import org.apache.spark.{SparkContext, SparkConf}
+
+import catalyst.analysis._
+import catalyst.plans.logical.{LogicalPlan, NativeCommand}
+import catalyst.util._
+
+object TestHive
+  extends TestHiveContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
+
+/**
+ * A locally running test instance of Spark's Hive execution engine.
+ *
+ * Data from [[testTables]] will be automatically loaded whenever a query is run over those tables.
+ * Calling [[reset]] will delete all tables and other state in the database, leaving the database
+ * in a "clean" state.
+ *
+ * TestHive is singleton object version of this class because instantiating multiple copies of the
+ * hive metastore seems to lead to weird non-deterministic failures.  Therefore, the execution of
+ * testcases that rely on TestHive must be serialized.
+ */
+class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
+  self =>
+
+  // By clearing the port we force Spark to pick a new one.  This allows us to rerun tests
+  // without restarting the JVM.
+  System.clearProperty("spark.driver.port")
+  System.clearProperty("spark.hostPort")
+
+  override lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath
+  override lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath
+
+  /** The location of the compiled hive distribution */
+  lazy val hiveHome = envVarToFile("HIVE_HOME")
+  /** The location of the hive source code. */
+  lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME")
+
+  // Override so we can intercept relative paths and rewrite them to point at hive.
+  override def runSqlHive(sql: String): Seq[String] = super.runSqlHive(rewritePaths(sql))
+
+  override def executePlan(plan: LogicalPlan): this.QueryExecution =
+    new this.QueryExecution { val logical = plan }
+
+  /**
+   * Returns the value of specified environmental variable as a [[java.io.File]] after checking
+   * to ensure it exists
+   */
+  private def envVarToFile(envVar: String): Option[File] = {
+    Option(System.getenv(envVar)).map(new File(_))
+  }
+
+  /**
+   * Replaces relative paths to the parent directory "../" with hiveDevHome since this is how the
+   * hive test cases assume the system is set up.
+   */
+  private def rewritePaths(cmd: String): String =
+    if (cmd.toUpperCase contains "LOAD DATA") {
+      val testDataLocation =
+        hiveDevHome.map(_.getCanonicalPath).getOrElse(inRepoTests.getCanonicalPath)
+      cmd.replaceAll("\\.\\.", testDataLocation)
+    } else {
+      cmd
+    }
+
+  val hiveFilesTemp = File.createTempFile("catalystHiveFiles", "")
+  hiveFilesTemp.delete()
+  hiveFilesTemp.mkdir()
+
+  val inRepoTests = new File("src/test/resources/")
+  def getHiveFile(path: String): File = {
+    val stripped = path.replaceAll("""\.\.\/""", "")
+    hiveDevHome
+      .map(new File(_, stripped))
+      .filter(_.exists)
+      .getOrElse(new File(inRepoTests, stripped))
+  }
+
+  val describedTable = "DESCRIBE (\\w+)".r
+
+  class SqlQueryExecution(sql: String) extends this.QueryExecution {
+    lazy val logical = HiveQl.parseSql(sql)
+    def hiveExec() = runSqlHive(sql)
+    override def toString = sql + "\n" + super.toString
+  }
+
+  /**
+   * Override QueryExecution with special debug workflow.
+   */
+  abstract class QueryExecution extends super.QueryExecution {
+    override lazy val analyzed = {
+      val describedTables = logical match {
+        case NativeCommand(describedTable(tbl)) => tbl :: Nil
+        case _ => Nil
+      }
+
+      // Make sure any test tables referenced are loaded.
+      val referencedTables =
+        describedTables ++
+        logical.collect { case UnresolvedRelation(databaseName, name, _) => name }
+      val referencedTestTables = referencedTables.filter(testTables.contains)
+      logger.debug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
+      referencedTestTables.foreach(loadTestTable)
+      // Proceed with analysis.
+      analyzer(logical)
+    }
+  }
+
+  case class TestTable(name: String, commands: (()=>Unit)*)
+
+  implicit class SqlCmd(sql: String) {
+    def cmd = () => new SqlQueryExecution(sql).stringResult(): Unit
+  }
+
+  /**
+   * A list of test tables and the DDL required to initialize them.  A test table is loaded on
+   * demand when a query are run against it.
+   */
+  lazy val testTables = new mutable.HashMap[String, TestTable]()
+  def registerTestTable(testTable: TestTable) = testTables += (testTable.name -> testTable)
+
+  // The test tables that are defined in the Hive QTestUtil.
+  // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+  val hiveQTestUtilTables = Seq(
+    TestTable("src",
+      "CREATE TABLE src (key INT, value STRING)".cmd,
+      s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
+    TestTable("src1",
+      "CREATE TABLE src1 (key INT, value STRING)".cmd,
+      s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
+    TestTable("dest1",
+      "CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd),
+    TestTable("dest2",
+      "CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd),
+    TestTable("dest3",
+      "CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd),
+    TestTable("srcpart", () => {
+      runSqlHive(
+        "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
+      for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+        runSqlHive(
+          s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
+             |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
+           """.stripMargin)
+      }
+    }),
+    TestTable("srcpart1", () => {
+      runSqlHive("CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+      for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
+        runSqlHive(
+          s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
+             |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
+           """.stripMargin)
+      }
+    }),
+    TestTable("src_thrift", () => {
+      import org.apache.thrift.protocol.TBinaryProtocol
+      import org.apache.hadoop.hive.serde2.thrift.test.Complex
+      import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
+      import org.apache.hadoop.mapred.SequenceFileInputFormat
+      import org.apache.hadoop.mapred.SequenceFileOutputFormat
+
+      val srcThrift = new Table("default", "src_thrift")
+      srcThrift.setFields(Nil)
+      srcThrift.setInputFormatClass(classOf[SequenceFileInputFormat[_,_]].getName)
+      // In Hive, SequenceFileOutputFormat will be substituted by HiveSequenceFileOutputFormat.
+      srcThrift.setOutputFormatClass(classOf[SequenceFileOutputFormat[_,_]].getName)
+      srcThrift.setSerializationLib(classOf[ThriftDeserializer].getName)
+      srcThrift.setSerdeParam("serialization.class", classOf[Complex].getName)
+      srcThrift.setSerdeParam("serialization.format", classOf[TBinaryProtocol].getName)
+      catalog.client.createTable(srcThrift)
+
+
+      runSqlHive(
+        s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift")
+    }),
+    TestTable("serdeins",
+      s"""CREATE TABLE serdeins (key INT, value STRING)
+         |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
+         |WITH SERDEPROPERTIES ('field.delim'='\\t')
+       """.stripMargin.cmd,
+      "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
+    TestTable("sales",
+      s"""CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)
+         |ROW FORMAT SERDE '${classOf[RegexSerDe].getCanonicalName}'
+         |WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)\t([^ ]*)")
+       """.stripMargin.cmd,
+      s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/sales.txt")}' INTO TABLE sales".cmd),
+    TestTable("episodes",
+      s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
+         |ROW FORMAT SERDE '${classOf[AvroSerDe].getCanonicalName}'
+         |STORED AS
+         |INPUTFORMAT '${classOf[AvroContainerInputFormat].getCanonicalName}'
+         |OUTPUTFORMAT '${classOf[AvroContainerOutputFormat].getCanonicalName}'
+         |TBLPROPERTIES (
+         |  'avro.schema.literal'='{
+         |    "type": "record",
+         |    "name": "episodes",
+         |    "namespace": "testing.hive.avro.serde",
+         |    "fields": [
+         |      {
+         |          "name": "title",
+         |          "type": "string",
+         |          "doc": "episode title"
+         |      },
+         |      {
+         |          "name": "air_date",
+         |          "type": "string",
+         |          "doc": "initial date"
+         |      },
+         |      {
+         |          "name": "doctor",
+         |          "type": "int",
+         |          "doc": "main actor playing the Doctor in episode"
+         |      }
+         |    ]
+         |  }'
+         |)
+       """.stripMargin.cmd,
+      s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd
+    )
+  )
+
+  hiveQTestUtilTables.foreach(registerTestTable)
+
+  private val loadedTables = new collection.mutable.HashSet[String]
+
+  def loadTestTable(name: String) {
+    if (!(loadedTables contains name)) {
+      // Marks the table as loaded first to prevent infite mutually recursive table loading.
+      loadedTables += name
+      logger.info(s"Loading test table $name")
+      val createCmds =
+        testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
+      createCmds.foreach(_())
+    }
+  }
+
+  /**
+   * Records the UDFs present when the server starts, so we can delete ones that are created by
+   * tests.
+   */
+  protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames
+
+  /**
+   * Resets the test instance by deleting any tables that have been created.
+   * TODO: also clear out UDFs, views, etc.
+   */
+  def reset() {
+    try {
+      // HACK: Hive is too noisy by default.
+      org.apache.log4j.LogManager.getCurrentLoggers.foreach { logger =>
+        logger.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)
+      }
+
+      // It is important that we RESET first as broken hooks that might have been set could break
+      // other sql exec here.
+      runSqlHive("RESET")
+      // For some reason, RESET does not reset the following variables...
+      runSqlHive("set datanucleus.cache.collections=true")
+      runSqlHive("set datanucleus.cache.collections.lazy=true")
+      // Lots of tests fail if we do not change the partition whitelist from the default.
+      runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
+
+      loadedTables.clear()
+      catalog.client.getAllTables("default").foreach { t =>
+        logger.debug(s"Deleting table $t")
+        val table = catalog.client.getTable("default", t)
+
+        catalog.client.getIndexes("default", t, 255).foreach { index =>
+          catalog.client.dropIndex("default", t, index.getIndexName, true)
+        }
+
+        if (!table.isIndexTable) {
+          catalog.client.dropTable("default", t)
+        }
+      }
+
+      catalog.client.getAllDatabases.filterNot(_ == "default").foreach { db =>
+        logger.debug(s"Dropping Database: $db")
+        catalog.client.dropDatabase(db, true, false, true)
+      }
+
+      FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName =>
+        FunctionRegistry.unregisterTemporaryUDF(udfName)
+      }
+
+      configure()
+
+      runSqlHive("USE default")
+
+      // Just loading src makes a lot of tests pass.  This is because some tests do something like
+      // drop an index on src at the beginning.  Since we just pass DDL to hive this bypasses our
+      // Analyzer and thus the test table auto-loading mechanism.
+      // Remove after we handle more DDL operations natively.
+      loadTestTable("src")
+      loadTestTable("srcpart")
+    } catch {
+      case e: Exception =>
+        logger.error(s"FATAL ERROR: Failed to reset TestDB state. $e")
+        // At this point there is really no reason to continue, but the test framework traps exits.
+        // So instead we just pause forever so that at least the developer can see where things
+        // started to go wrong.
+        Thread.sleep(100000)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
new file mode 100644
index 0000000..d20fd87
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.metastore.MetaStoreUtils
+import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
+import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
+import org.apache.hadoop.hive.serde2.Serializer
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred._
+
+import catalyst.expressions._
+import catalyst.types.{BooleanType, DataType}
+import org.apache.spark.{TaskContext, SparkException}
+import catalyst.expressions.Cast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution._
+
+import scala.Some
+import scala.collection.immutable.ListMap
+
+/* Implicits */
+import scala.collection.JavaConversions._
+
+/**
+ * The Hive table scan operator.  Column and partition pruning are both handled.
+ *
+ * @constructor
+ * @param attributes Attributes to be fetched from the Hive table.
+ * @param relation The Hive table be be scanned.
+ * @param partitionPruningPred An optional partition pruning predicate for partitioned table.
+ */
+case class HiveTableScan(
+    attributes: Seq[Attribute],
+    relation: MetastoreRelation,
+    partitionPruningPred: Option[Expression])(
+    @transient val sc: HiveContext)
+  extends LeafNode
+  with HiveInspectors {
+
+  require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
+    "Partition pruning predicates only supported for partitioned tables.")
+
+  // Bind all partition key attribute references in the partition pruning predicate for later
+  // evaluation.
+  private val boundPruningPred = partitionPruningPred.map { pred =>
+    require(
+      pred.dataType == BooleanType,
+      s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
+
+    BindReferences.bindReference(pred, relation.partitionKeys)
+  }
+
+  @transient
+  val hadoopReader = new HadoopTableReader(relation.tableDesc, sc)
+
+  /**
+   * The hive object inspector for this table, which can be used to extract values from the
+   * serialized row representation.
+   */
+  @transient
+  lazy val objectInspector =
+    relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
+
+  /**
+   * Functions that extract the requested attributes from the hive output.  Partitioned values are
+   * casted from string to its declared data type.
+   */
+  @transient
+  protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
+    attributes.map { a =>
+      val ordinal = relation.partitionKeys.indexOf(a)
+      if (ordinal >= 0) {
+        (_: Any, partitionKeys: Array[String]) => {
+          val value = partitionKeys(ordinal)
+          val dataType = relation.partitionKeys(ordinal).dataType
+          castFromString(value, dataType)
+        }
+      } else {
+        val ref = objectInspector.getAllStructFieldRefs
+          .find(_.getFieldName == a.name)
+          .getOrElse(sys.error(s"Can't find attribute $a"))
+        (row: Any, _: Array[String]) => {
+          val data = objectInspector.getStructFieldData(row, ref)
+          unwrapData(data, ref.getFieldObjectInspector)
+        }
+      }
+    }
+  }
+
+  private def castFromString(value: String, dataType: DataType) = {
+    Cast(Literal(value), dataType).apply(null)
+  }
+
+  @transient
+  def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
+    hadoopReader.makeRDDForTable(relation.hiveQlTable)
+  } else {
+    hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
+  }
+
+  /**
+   * Prunes partitions not involve the query plan.
+   *
+   * @param partitions All partitions of the relation.
+   * @return Partitions that are involved in the query plan.
+   */
+  private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
+    boundPruningPred match {
+      case None => partitions
+      case Some(shouldKeep) => partitions.filter { part =>
+        val dataTypes = relation.partitionKeys.map(_.dataType)
+        val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
+          castFromString(value, dataType)
+        }
+
+        // Only partitioned values are needed here, since the predicate has already been bound to
+        // partition key attribute references.
+        val row = new GenericRow(castedValues.toArray)
+        shouldKeep.apply(row).asInstanceOf[Boolean]
+      }
+    }
+  }
+
+  def execute() = {
+    inputRdd.map { row =>
+      val values = row match {
+        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
+          attributeFunctions.map(_(deserializedRow, partitionKeys))
+        case deserializedRow: AnyRef =>
+          attributeFunctions.map(_(deserializedRow, Array.empty))
+      }
+      buildRow(values.map {
+        case n: String if n.toLowerCase == "null" => null
+        case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue
+        case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
+          BigDecimal(decimal.bigDecimalValue)
+        case other => other
+      })
+    }
+  }
+
+  def output = attributes
+}
+
+case class InsertIntoHiveTable(
+    table: MetastoreRelation,
+    partition: Map[String, Option[String]],
+    child: SparkPlan,
+    overwrite: Boolean)
+    (@transient sc: HiveContext)
+  extends UnaryNode {
+
+  val outputClass = newSerializer(table.tableDesc).getSerializedClass
+  @transient private val hiveContext = new Context(sc.hiveconf)
+  @transient private val db = Hive.get(sc.hiveconf)
+
+  private def newSerializer(tableDesc: TableDesc): Serializer = {
+    val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+    serializer.initialize(null, tableDesc.getProperties)
+    serializer
+  }
+
+  override def otherCopyArgs = sc :: Nil
+
+  def output = child.output
+
+  /**
+   * Wraps with Hive types based on object inspector.
+   * TODO: Consolidate all hive OI/data interface code.
+   */
+  protected def wrap(a: (Any, ObjectInspector)): Any = a match {
+    case (s: String, oi: JavaHiveVarcharObjectInspector) => new HiveVarchar(s, s.size)
+    case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) =>
+      new HiveDecimal(bd.underlying())
+    case (row: Row, oi: StandardStructObjectInspector) =>
+      val struct = oi.create()
+      row.zip(oi.getAllStructFieldRefs).foreach {
+        case (data, field) =>
+          oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector))
+      }
+      struct
+    case (s: Seq[_], oi: ListObjectInspector) =>
+      val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector))
+      seqAsJavaList(wrappedSeq)
+    case (obj, _) => obj
+  }
+
+  def saveAsHiveFile(
+      rdd: RDD[Writable],
+      valueClass: Class[_],
+      fileSinkConf: FileSinkDesc,
+      conf: JobConf,
+      isCompressed: Boolean) {
+    if (valueClass == null) {
+      throw new SparkException("Output value class not set")
+    }
+    conf.setOutputValueClass(valueClass)
+    if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
+      throw new SparkException("Output format class not set")
+    }
+    // Doesn't work in Scala 2.9 due to what may be a generics bug
+    // TODO: Should we uncomment this for Scala 2.10?
+    // conf.setOutputFormat(outputFormatClass)
+    conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
+    if (isCompressed) {
+      // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
+      // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
+      // to store compression information.
+      conf.set("mapred.output.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
+      fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
+    }
+    conf.setOutputCommitter(classOf[FileOutputCommitter])
+    FileOutputFormat.setOutputPath(
+      conf,
+      SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
+
+    logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
+
+    val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
+    writer.preSetup()
+
+    def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
+      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+      // around by taking a mod. We expect that no task will be attempted 2 billion times.
+      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+
+      writer.setup(context.stageId, context.partitionId, attemptNumber)
+      writer.open()
+
+      var count = 0
+      while(iter.hasNext) {
+        val record = iter.next()
+        count += 1
+        writer.write(record)
+      }
+
+      writer.close()
+      writer.commit()
+    }
+
+    sc.sparkContext.runJob(rdd, writeToFile _)
+    writer.commitJob()
+  }
+
+  /**
+   * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
+   * `org.apache.hadoop.hive.serde2.SerDe` and the
+   * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
+   */
+  def execute() = {
+    val childRdd = child.execute()
+    assert(childRdd != null)
+
+    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
+    // instances within the closure, since Serializer is not serializable while TableDesc is.
+    val tableDesc = table.tableDesc
+    val tableLocation = table.hiveQlTable.getDataLocation
+    val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
+    val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
+    val rdd = childRdd.mapPartitions { iter =>
+      val serializer = newSerializer(fileSinkConf.getTableInfo)
+      val standardOI = ObjectInspectorUtils
+        .getStandardObjectInspector(
+          fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+          ObjectInspectorCopyOption.JAVA)
+        .asInstanceOf[StructObjectInspector]
+
+      iter.map { row =>
+        // Casts Strings to HiveVarchars when necessary.
+        val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector)
+        val mappedRow = row.zip(fieldOIs).map(wrap)
+
+        serializer.serialize(mappedRow.toArray, standardOI)
+      }
+    }
+
+    // ORC stores compression information in table properties. While, there are other formats
+    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
+    val jobConf = new JobConf(sc.hiveconf)
+    saveAsHiveFile(
+      rdd,
+      outputClass,
+      fileSinkConf,
+      jobConf,
+      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
+
+    // TODO: Handle dynamic partitioning.
+    val outputPath = FileOutputFormat.getOutputPath(jobConf)
+    // Have to construct the format of dbname.tablename.
+    val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
+    // TODO: Correctly set holdDDLTime.
+    // In most of the time, we should have holdDDLTime = false.
+    // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
+    val holdDDLTime = false
+    if (partition.nonEmpty) {
+      val partitionSpec = partition.map {
+        case (key, Some(value)) => key -> value
+        case (key, None) => key -> "" // Should not reach here right now.
+      }
+      val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols(), partitionSpec)
+      db.validatePartitionNameCharacters(partVals)
+      // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
+      // which is currently considered as a Hive native command.
+      val inheritTableSpecs = true
+      // TODO: Correctly set isSkewedStoreAsSubdir.
+      val isSkewedStoreAsSubdir = false
+      db.loadPartition(
+        outputPath,
+        qualifiedTableName,
+        partitionSpec,
+        overwrite,
+        holdDDLTime,
+        inheritTableSpecs,
+        isSkewedStoreAsSubdir)
+    } else {
+      db.loadTable(
+        outputPath,
+        qualifiedTableName,
+        overwrite,
+        holdDDLTime)
+    }
+
+    // It would be nice to just return the childRdd unchanged so insert operations could be chained,
+    // however for now we return an empty list to simplify compatibility checks with hive, which
+    // does not return anything for insert operations.
+    // TODO: implement hive compatibility as rules.
+    sc.sparkContext.makeRDD(Nil, 1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
new file mode 100644
index 0000000..5e775d6
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.serde2.{io => hiveIo}
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
+import org.apache.hadoop.hive.ql.udf.generic._
+import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.hadoop.{io => hadoopIo}
+
+import catalyst.analysis
+import catalyst.expressions._
+import catalyst.types
+import catalyst.types._
+
+object HiveFunctionRegistry
+  extends analysis.FunctionRegistry with HiveFunctionFactory with HiveInspectors {
+
+  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    // We only look it up to see if it exists, but do not include it in the HiveUDF since it is
+    // not always serializable.
+    val functionInfo: FunctionInfo = Option(FunctionRegistry.getFunctionInfo(name)).getOrElse(
+      sys.error(s"Couldn't find function $name"))
+
+    if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
+      val function = createFunction[UDF](name)
+      val method = function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo))
+
+      lazy val expectedDataTypes = method.getParameterTypes.map(javaClassToDataType)
+
+      HiveSimpleUdf(
+        name,
+        children.zip(expectedDataTypes).map { case (e, t) => Cast(e, t) }
+      )
+    } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
+      HiveGenericUdf(name, children)
+    } else if (
+         classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
+      HiveGenericUdaf(name, children)
+
+    } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
+      HiveGenericUdtf(name, Nil, children)
+    } else {
+      sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
+    }
+  }
+
+  def javaClassToDataType(clz: Class[_]): DataType = clz match {
+    case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
+    case c: Class[_] if c == classOf[hiveIo.DoubleWritable] => DoubleType
+    case c: Class[_] if c == classOf[hiveIo.HiveDecimalWritable] => DecimalType
+    case c: Class[_] if c == classOf[hiveIo.ByteWritable] => ByteType
+    case c: Class[_] if c == classOf[hiveIo.ShortWritable] => ShortType
+    case c: Class[_] if c == classOf[hadoopIo.Text] => StringType
+    case c: Class[_] if c == classOf[hadoopIo.IntWritable] => IntegerType
+    case c: Class[_] if c == classOf[hadoopIo.LongWritable] => LongType
+    case c: Class[_] if c == classOf[hadoopIo.FloatWritable] => FloatType
+    case c: Class[_] if c == classOf[hadoopIo.BooleanWritable] => BooleanType
+    case c: Class[_] if c == classOf[java.lang.String] => StringType
+    case c: Class[_] if c == java.lang.Short.TYPE => ShortType
+    case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
+    case c: Class[_] if c == java.lang.Long.TYPE => LongType
+    case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
+    case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
+    case c: Class[_] if c == java.lang.Float.TYPE => FloatType
+    case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
+    case c: Class[_] if c == classOf[java.lang.Short] => ShortType
+    case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
+    case c: Class[_] if c == classOf[java.lang.Long] => LongType
+    case c: Class[_] if c == classOf[java.lang.Double] => DoubleType
+    case c: Class[_] if c == classOf[java.lang.Byte] => ByteType
+    case c: Class[_] if c == classOf[java.lang.Float] => FloatType
+    case c: Class[_] if c == classOf[java.lang.Boolean] => BooleanType
+    case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType))
+  }
+}
+
+trait HiveFunctionFactory {
+  def getFunctionInfo(name: String) = FunctionRegistry.getFunctionInfo(name)
+  def getFunctionClass(name: String) = getFunctionInfo(name).getFunctionClass
+  def createFunction[UDFType](name: String) =
+    getFunctionClass(name).newInstance.asInstanceOf[UDFType]
+
+  /** Converts hive types to native catalyst types. */
+  def unwrap(a: Any): Any = a match {
+    case null => null
+    case i: hadoopIo.IntWritable => i.get
+    case t: hadoopIo.Text => t.toString
+    case l: hadoopIo.LongWritable => l.get
+    case d: hadoopIo.DoubleWritable => d.get()
+    case d: hiveIo.DoubleWritable => d.get
+    case s: hiveIo.ShortWritable => s.get
+    case b: hadoopIo.BooleanWritable => b.get()
+    case b: hiveIo.ByteWritable => b.get
+    case list: java.util.List[_] => list.map(unwrap)
+    case map: java.util.Map[_,_] => map.map { case (k, v) => (unwrap(k), unwrap(v)) }.toMap
+    case array: Array[_] => array.map(unwrap).toSeq
+    case p: java.lang.Short => p
+    case p: java.lang.Long => p
+    case p: java.lang.Float => p
+    case p: java.lang.Integer => p
+    case p: java.lang.Double => p
+    case p: java.lang.Byte => p
+    case p: java.lang.Boolean => p
+    case str: String => str
+  }
+}
+
+abstract class HiveUdf
+    extends Expression with Logging with HiveFunctionFactory {
+  self: Product =>
+
+  type UDFType
+  type EvaluatedType = Any
+
+  val name: String
+
+  def nullable = true
+  def references = children.flatMap(_.references).toSet
+
+  // FunctionInfo is not serializable so we must look it up here again.
+  lazy val functionInfo = getFunctionInfo(name)
+  lazy val function = createFunction[UDFType](name)
+
+  override def toString = s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})"
+}
+
+case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf {
+  import HiveFunctionRegistry._
+  type UDFType = UDF
+
+  @transient
+  protected lazy val method =
+    function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo))
+
+  @transient
+  lazy val dataType = javaClassToDataType(method.getReturnType)
+
+  protected lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass =>
+    val primitiveClasses = Seq(
+      Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE,
+      classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long],
+      classOf[HiveDecimal], java.lang.Byte.TYPE, classOf[java.lang.Byte]
+    )
+    val matchingConstructor = argClass.getConstructors.find { c =>
+      c.getParameterTypes.size == 1 && primitiveClasses.contains(c.getParameterTypes.head)
+    }
+
+    val constructor = matchingConstructor.getOrElse(
+      sys.error(s"No matching wrapper found, options: ${argClass.getConstructors.toSeq}."))
+
+    (a: Any) => {
+      logger.debug(
+        s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} using $constructor.")
+      // We must make sure that primitives get boxed java style.
+      if (a == null) {
+        null
+      } else {
+        constructor.newInstance(a match {
+          case i: Int => i: java.lang.Integer
+          case bd: BigDecimal => new HiveDecimal(bd.underlying())
+          case other: AnyRef => other
+        }).asInstanceOf[AnyRef]
+      }
+    }
+  }
+
+  // TODO: Finish input output types.
+  override def apply(input: Row): Any = {
+    val evaluatedChildren = children.map(_.apply(input))
+    // Wrap the function arguments in the expected types.
+    val args = evaluatedChildren.zip(wrappers).map {
+      case (arg, wrapper) => wrapper(arg)
+    }
+
+    // Invoke the udf and unwrap the result.
+    unwrap(method.invoke(function, args: _*))
+  }
+}
+
+case class HiveGenericUdf(
+    name: String,
+    children: Seq[Expression]) extends HiveUdf with HiveInspectors {
+  import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
+  type UDFType = GenericUDF
+
+  @transient
+  protected lazy val argumentInspectors = children.map(_.dataType).map(toInspector)
+
+  @transient
+  protected lazy val returnInspector = function.initialize(argumentInspectors.toArray)
+
+  val dataType: DataType = inspectorToDataType(returnInspector)
+
+  override def apply(input: Row): Any = {
+    returnInspector // Make sure initialized.
+    val args = children.map { v =>
+      new DeferredObject {
+        override def prepare(i: Int) = {}
+        override def get(): AnyRef = wrap(v.apply(input))
+      }
+    }.toArray
+    unwrap(function.evaluate(args))
+  }
+}
+
+trait HiveInspectors {
+
+  def unwrapData(data: Any, oi: ObjectInspector): Any = oi match {
+    case pi: PrimitiveObjectInspector => pi.getPrimitiveJavaObject(data)
+    case li: ListObjectInspector =>
+      Option(li.getList(data))
+        .map(_.map(unwrapData(_, li.getListElementObjectInspector)).toSeq)
+        .orNull
+    case mi: MapObjectInspector =>
+      Option(mi.getMap(data)).map(
+        _.map {
+          case (k,v) =>
+            (unwrapData(k, mi.getMapKeyObjectInspector),
+              unwrapData(v, mi.getMapValueObjectInspector))
+        }.toMap).orNull
+    case si: StructObjectInspector =>
+      val allRefs = si.getAllStructFieldRefs
+      new GenericRow(
+        allRefs.map(r =>
+          unwrapData(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray)
+  }
+
+  /** Converts native catalyst types to the types expected by Hive */
+  def wrap(a: Any): AnyRef = a match {
+    case s: String => new hadoopIo.Text(s)
+    case i: Int => i: java.lang.Integer
+    case b: Boolean => b: java.lang.Boolean
+    case d: Double => d: java.lang.Double
+    case l: Long => l: java.lang.Long
+    case l: Short => l: java.lang.Short
+    case l: Byte => l: java.lang.Byte
+    case s: Seq[_] => seqAsJavaList(s.map(wrap))
+    case m: Map[_,_] =>
+      mapAsJavaMap(m.map { case (k, v) => wrap(k) -> wrap(v) })
+    case null => null
+  }
+
+  def toInspector(dataType: DataType): ObjectInspector = dataType match {
+    case ArrayType(tpe) => ObjectInspectorFactory.getStandardListObjectInspector(toInspector(tpe))
+    case MapType(keyType, valueType) =>
+      ObjectInspectorFactory.getStandardMapObjectInspector(
+        toInspector(keyType), toInspector(valueType))
+    case StringType => PrimitiveObjectInspectorFactory.javaStringObjectInspector
+    case IntegerType => PrimitiveObjectInspectorFactory.javaIntObjectInspector
+    case DoubleType => PrimitiveObjectInspectorFactory.javaDoubleObjectInspector
+    case BooleanType => PrimitiveObjectInspectorFactory.javaBooleanObjectInspector
+    case LongType => PrimitiveObjectInspectorFactory.javaLongObjectInspector
+    case FloatType => PrimitiveObjectInspectorFactory.javaFloatObjectInspector
+    case ShortType => PrimitiveObjectInspectorFactory.javaShortObjectInspector
+    case ByteType => PrimitiveObjectInspectorFactory.javaByteObjectInspector
+    case NullType => PrimitiveObjectInspectorFactory.javaVoidObjectInspector
+    case BinaryType => PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector
+  }
+
+  def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match {
+    case s: StructObjectInspector =>
+      StructType(s.getAllStructFieldRefs.map(f => {
+        types.StructField(
+          f.getFieldName, inspectorToDataType(f.getFieldObjectInspector), nullable = true)
+      }))
+    case l: ListObjectInspector => ArrayType(inspectorToDataType(l.getListElementObjectInspector))
+    case m: MapObjectInspector =>
+      MapType(
+        inspectorToDataType(m.getMapKeyObjectInspector),
+        inspectorToDataType(m.getMapValueObjectInspector))
+    case _: WritableStringObjectInspector => StringType
+    case _: JavaStringObjectInspector => StringType
+    case _: WritableIntObjectInspector => IntegerType
+    case _: JavaIntObjectInspector => IntegerType
+    case _: WritableDoubleObjectInspector => DoubleType
+    case _: JavaDoubleObjectInspector => DoubleType
+    case _: WritableBooleanObjectInspector => BooleanType
+    case _: JavaBooleanObjectInspector => BooleanType
+    case _: WritableLongObjectInspector => LongType
+    case _: JavaLongObjectInspector => LongType
+    case _: WritableShortObjectInspector => ShortType
+    case _: JavaShortObjectInspector => ShortType
+    case _: WritableByteObjectInspector => ByteType
+    case _: JavaByteObjectInspector => ByteType
+  }
+
+  implicit class typeInfoConversions(dt: DataType) {
+    import org.apache.hadoop.hive.serde2.typeinfo._
+    import TypeInfoFactory._
+
+    def toTypeInfo: TypeInfo = dt match {
+      case BinaryType => binaryTypeInfo
+      case BooleanType => booleanTypeInfo
+      case ByteType => byteTypeInfo
+      case DoubleType => doubleTypeInfo
+      case FloatType => floatTypeInfo
+      case IntegerType => intTypeInfo
+      case LongType => longTypeInfo
+      case ShortType => shortTypeInfo
+      case StringType => stringTypeInfo
+      case DecimalType => decimalTypeInfo
+      case NullType => voidTypeInfo
+    }
+  }
+}
+
+case class HiveGenericUdaf(
+    name: String,
+    children: Seq[Expression]) extends AggregateExpression
+  with HiveInspectors
+  with HiveFunctionFactory {
+
+  type UDFType = AbstractGenericUDAFResolver
+
+  protected lazy val resolver: AbstractGenericUDAFResolver = createFunction(name)
+
+  protected lazy val objectInspector  = {
+    resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
+      .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
+  }
+
+  protected lazy val inspectors = children.map(_.dataType).map(toInspector)
+
+  def dataType: DataType = inspectorToDataType(objectInspector)
+
+  def nullable: Boolean = true
+
+  def references: Set[Attribute] = children.map(_.references).flatten.toSet
+
+  override def toString = s"$nodeName#$name(${children.mkString(",")})"
+
+  def newInstance = new HiveUdafFunction(name, children, this)
+}
+
+/**
+ * Converts a Hive Generic User Defined Table Generating Function (UDTF) to a
+ * [[catalyst.expressions.Generator Generator]].  Note that the semantics of Generators do not allow
+ * Generators to maintain state in between input rows.  Thus UDTFs that rely on partitioning
+ * dependent operations like calls to `close()` before producing output will not operate the same as
+ * in Hive.  However, in practice this should not affect compatibility for most sane UDTFs
+ * (e.g. explode or GenericUDTFParseUrlTuple).
+ *
+ * Operators that require maintaining state in between input rows should instead be implemented as
+ * user defined aggregations, which have clean semantics even in a partitioned execution.
+ */
+case class HiveGenericUdtf(
+    name: String,
+    aliasNames: Seq[String],
+    children: Seq[Expression])
+  extends Generator with HiveInspectors with HiveFunctionFactory {
+
+  override def references = children.flatMap(_.references).toSet
+
+  @transient
+  protected lazy val function: GenericUDTF = createFunction(name)
+
+  protected lazy val inputInspectors = children.map(_.dataType).map(toInspector)
+
+  protected lazy val outputInspectors = {
+    val structInspector = function.initialize(inputInspectors.toArray)
+    structInspector.getAllStructFieldRefs.map(_.getFieldObjectInspector)
+  }
+
+  protected lazy val outputDataTypes = outputInspectors.map(inspectorToDataType)
+
+  override protected def makeOutput() = {
+    // Use column names when given, otherwise c_1, c_2, ... c_n.
+    if (aliasNames.size == outputDataTypes.size) {
+      aliasNames.zip(outputDataTypes).map {
+        case (attrName, attrDataType) =>
+          AttributeReference(attrName, attrDataType, nullable = true)()
+      }
+    } else {
+      outputDataTypes.zipWithIndex.map {
+        case (attrDataType, i) =>
+          AttributeReference(s"c_$i", attrDataType, nullable = true)()
+      }
+    }
+  }
+
+  override def apply(input: Row): TraversableOnce[Row] = {
+    outputInspectors // Make sure initialized.
+
+    val inputProjection = new Projection(children)
+    val collector = new UDTFCollector
+    function.setCollector(collector)
+
+    val udtInput = inputProjection(input).map(wrap).toArray
+    function.process(udtInput)
+    collector.collectRows()
+  }
+
+  protected class UDTFCollector extends Collector {
+    var collected = new ArrayBuffer[Row]
+
+    override def collect(input: java.lang.Object) {
+      // We need to clone the input here because implementations of
+      // GenericUDTF reuse the same object. Luckily they are always an array, so
+      // it is easy to clone.
+      collected += new GenericRow(input.asInstanceOf[Array[_]].map(unwrap))
+    }
+
+    def collectRows() = {
+      val toCollect = collected
+      collected = new ArrayBuffer[Row]
+      toCollect
+    }
+  }
+
+  override def toString() = s"$nodeName#$name(${children.mkString(",")})"
+}
+
+case class HiveUdafFunction(
+    functionName: String,
+    exprs: Seq[Expression],
+    base: AggregateExpression)
+  extends AggregateFunction
+  with HiveInspectors
+  with HiveFunctionFactory {
+
+  def this() = this(null, null, null)
+
+  private val resolver = createFunction[AbstractGenericUDAFResolver](functionName)
+
+  private val inspectors = exprs.map(_.dataType).map(toInspector).toArray
+
+  private val function = resolver.getEvaluator(exprs.map(_.dataType.toTypeInfo).toArray)
+
+  private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+
+  // Cast required to avoid type inference selecting a deprecated Hive API.
+  private val buffer =
+    function.getNewAggregationBuffer.asInstanceOf[GenericUDAFEvaluator.AbstractAggregationBuffer]
+
+  override def apply(input: Row): Any = unwrapData(function.evaluate(buffer), returnInspector)
+
+  @transient
+  val inputProjection = new Projection(exprs)
+
+  def update(input: Row): Unit = {
+    val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
+    function.iterate(buffer, inputs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
new file mode 100644
index 0000000..5e17e3b
--- /dev/null
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
new file mode 100644
index 0000000..4b45e69
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+import java.io.File
+
+/**
+ * A set of test cases based on the big-data-benchmark.
+ * https://amplab.cs.berkeley.edu/benchmark/
+ */
+class BigDataBenchmarkSuite extends HiveComparisonTest {
+  import TestHive._
+
+  val testDataDirectory = new File("target/big-data-benchmark-testdata")
+
+  val testTables = Seq(
+    TestTable(
+      "rankings",
+      s"""
+        |CREATE EXTERNAL TABLE rankings (
+        |  pageURL STRING,
+        |  pageRank INT,
+        |  avgDuration INT)
+        |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
+        |  STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "rankings").getCanonicalPath}"
+      """.stripMargin.cmd),
+    TestTable(
+      "scratch",
+      s"""
+        |CREATE EXTERNAL TABLE scratch (
+        |  pageURL STRING,
+        |  pageRank INT,
+        |  avgDuration INT)
+        |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
+        |  STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "scratch").getCanonicalPath}"
+      """.stripMargin.cmd),
+    TestTable(
+      "uservisits",
+      s"""
+        |CREATE EXTERNAL TABLE uservisits (
+        |  sourceIP STRING,
+        |  destURL STRING,
+        |  visitDate STRING,
+        |  adRevenue DOUBLE,
+        |  userAgent STRING,
+        |  countryCode STRING,
+        |  languageCode STRING,
+        |  searchWord STRING,
+        |  duration INT)
+        |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
+        |  STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "uservisits").getCanonicalPath}"
+      """.stripMargin.cmd),
+    TestTable(
+      "documents",
+      s"""
+        |CREATE EXTERNAL TABLE documents (line STRING)
+        |STORED AS TEXTFILE
+        |LOCATION "${new File(testDataDirectory, "crawl").getCanonicalPath}"
+      """.stripMargin.cmd))
+
+  testTables.foreach(registerTestTable)
+
+  if (!testDataDirectory.exists()) {
+    // TODO: Auto download the files on demand.
+    ignore("No data files found for BigDataBenchmark tests.") {}
+  } else {
+    createQueryTest("query1",
+      "SELECT pageURL, pageRank FROM rankings WHERE pageRank > 1")
+
+    createQueryTest("query2",
+      "SELECT SUBSTR(sourceIP, 1, 10), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 10)")
+
+    createQueryTest("query3",
+      """
+        |SELECT sourceIP,
+        |       sum(adRevenue) as totalRevenue,
+        |       avg(pageRank) as pageRank
+        |FROM
+        |  rankings R JOIN
+        |  (SELECT sourceIP, destURL, adRevenue
+        |   FROM uservisits UV
+        |   WHERE UV.visitDate > "1980-01-01"
+        |   AND UV.visitDate < "1980-04-01")
+        |   NUV ON (R.pageURL = NUV.destURL)
+        |GROUP BY sourceIP
+        |ORDER BY totalRevenue DESC
+        |LIMIT 1
+      """.stripMargin)
+
+    createQueryTest("query4",
+      """
+        |DROP TABLE IF EXISTS url_counts_partial;
+        |CREATE TABLE url_counts_partial AS
+        |  SELECT TRANSFORM (line)
+        |  USING 'python target/url_count.py' as (sourcePage,
+        |    destPage, count) from documents;
+        |DROP TABLE IF EXISTS url_counts_total;
+        |CREATE TABLE url_counts_total AS
+        |  SELECT SUM(count) AS totalCount, destpage
+        |  FROM url_counts_partial GROUP BY destpage
+        |-- The following queries run, but generate different results in HIVE likely because the UDF is not deterministic
+        |-- given different input splits.
+        |-- SELECT CAST(SUM(count) AS INT) FROM url_counts_partial
+        |-- SELECT COUNT(*) FROM url_counts_partial
+        |-- SELECT * FROM url_counts_partial
+        |-- SELECT * FROM url_counts_total
+      """.stripMargin)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
new file mode 100644
index 0000000..a12ab23
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+package sql
+package hive
+package execution
+
+
+import org.scalatest.{FunSuite, BeforeAndAfterAll}
+
+class ConcurrentHiveSuite extends FunSuite with BeforeAndAfterAll {
+  ignore("multiple instances not supported") {
+    test("Multiple Hive Instances") {
+      (1 to 10).map { i =>
+        val ts =
+          new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", new SparkConf()))
+        ts.executeSql("SHOW TABLES").toRdd.collect()
+        ts.executeSql("SELECT * FROM src").toRdd.collect()
+        ts.executeSql("SHOW TABLES").toRdd.collect()
+      }
+    }
+  }
+}
\ No newline at end of file