You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/30 00:32:58 UTC

git commit: [SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage.

Repository: spark
Updated Branches:
  refs/heads/master dc9653641 -> c7db274be


[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage.

The idea is that every Catalyst logical plan gets hold of a Statistics class, the usage of which provides useful estimations on various statistics. See the implementations of `MetastoreRelation`.

This patch also includes several usages of the estimation interface in the planner. For instance, we now use physical table sizes from the estimate interface to convert an equi-join to a broadcast join (when doing so is beneficial, as determined by a size threshold).

Finally, there are a couple minor accompanying changes including:
- Remove the not-in-use `BaseRelation`.
- Make SparkLogicalPlan take a `SQLContext` in the second param list.

Author: Zongheng Yang <zo...@gmail.com>

Closes #1238 from concretevitamin/estimates and squashes the following commits:

329071d [Zongheng Yang] Address review comments; turn config name from string to field in SQLConf.
8663e84 [Zongheng Yang] Use BigInt for stat; for logical leaves, by default throw an exception.
2f2fb89 [Zongheng Yang] Fix statistics for SparkLogicalPlan.
9951305 [Zongheng Yang] Remove childrenStats.
16fc60a [Zongheng Yang] Avoid calling statistics on plans if auto join conversion is disabled.
8bd2816 [Zongheng Yang] Add a note on performance of statistics.
6e594b8 [Zongheng Yang] Get size info from metastore for MetastoreRelation.
01b7a3e [Zongheng Yang] Update scaladoc for a field and move it to @param section.
549061c [Zongheng Yang] Remove numTuples in Statistics for now.
729a8e2 [Zongheng Yang] Update docs to be more explicit.
573e644 [Zongheng Yang] Remove singleton SQLConf and move back `settings` to the trait.
2d99eb5 [Zongheng Yang] {Cleanup, use synchronized in, enrich} StatisticsSuite.
ca5b825 [Zongheng Yang] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it.
43d38a6 [Zongheng Yang] Revert optimization for BroadcastNestedLoopJoin (this fixes tests).
0ef9e5b [Zongheng Yang] Use multiplication instead of sum for default estimates.
4ef0d26 [Zongheng Yang] Make Statistics a case class.
3ba8f3e [Zongheng Yang] Add comment.
e5bcf5b [Zongheng Yang] Fix optimization conditions & update scala docs to explain.
7d9216a [Zongheng Yang] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin.
73cde01 [Zongheng Yang] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan.
73412be [Zongheng Yang] Move SQLConf to Catalyst & add default val for sizeInBytes.
7a60ab7 [Zongheng Yang] s/Estimates/Statistics, s/cardinality/numTuples.
de3ae13 [Zongheng Yang] Add parquetAfter() properly in test.
dcff9bd [Zongheng Yang] Cleanups.
84301a4 [Zongheng Yang] Refactors.
5bf5586 [Zongheng Yang] Typo.
56a8e6e [Zongheng Yang] Prototype impl of estimations for Catalyst logical plans.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7db274b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7db274b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7db274b

Branch: refs/heads/master
Commit: c7db274be79f448fda566208946cb50958ea9b1a
Parents: dc96536
Author: Zongheng Yang <zo...@gmail.com>
Authored: Tue Jul 29 15:32:50 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Jul 29 15:32:50 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/unresolved.scala      |  4 +-
 .../catalyst/plans/logical/BaseRelation.scala   | 24 -----
 .../catalyst/plans/logical/LogicalPlan.scala    | 22 +++++
 .../scala/org/apache/spark/sql/SQLConf.scala    | 61 +++++++------
 .../scala/org/apache/spark/sql/SQLContext.scala | 20 ++---
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  3 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala    |  2 +-
 .../spark/sql/api/java/JavaSQLContext.scala     |  4 +-
 .../apache/spark/sql/execution/SparkPlan.scala  | 18 ++--
 .../spark/sql/execution/SparkStrategies.scala   | 57 ++++++------
 .../org/apache/spark/sql/json/JsonRDD.scala     | 11 ++-
 .../spark/sql/parquet/ParquetRelation.scala     |  4 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  2 -
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 47 +++++++---
 .../apache/spark/sql/hive/StatisticsSuite.scala | 95 ++++++++++++++++++++
 .../sql/hive/execution/HiveComparisonTest.scala |  2 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  2 +-
 .../spark/sql/parquet/HiveParquetSuite.scala    |  2 +-
 18 files changed, 256 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 7abeb03..a0e2577 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.{errors, trees}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
 /**
@@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
 case class UnresolvedRelation(
     databaseName: Option[String],
     tableName: String,
-    alias: Option[String] = None) extends BaseRelation {
+    alias: Option[String] = None) extends LeafNode {
   override def output = Nil
   override lazy val resolved = false
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
deleted file mode 100644
index 582334a..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical
-
-abstract class BaseRelation extends LeafNode {
-  self: Product =>
-
-  def tableName: String
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index edc37e3..ac85f95 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -27,6 +27,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
   self: Product =>
 
   /**
+   * Estimates of various statistics.  The default estimation logic simply lazily multiplies the
+   * corresponding statistic produced by the children.  To override this behavior, override
+   * `statistics` and assign it an overriden version of `Statistics`.
+   *
+   * '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
+   * performance of the implementations.  The reason is that estimations might get triggered in
+   * performance-critical processes, such as query plan planning.
+   *
+   * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
+   *                    defaults to the product of children's `sizeInBytes`.
+   */
+  case class Statistics(
+    sizeInBytes: BigInt
+  )
+  lazy val statistics: Statistics = Statistics(
+    sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
+  )
+
+  /**
    * Returns the set of attributes that are referenced by this node
    * during evaluation.
    */
@@ -92,6 +111,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
 abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
   self: Product =>
 
+  override lazy val statistics: Statistics =
+    throw new UnsupportedOperationException("default leaf nodes don't have meaningful Statistics")
+
   // Leaf nodes by definition cannot reference any input attributes.
   override def references = Set.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 41920c0..be8d4e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -21,17 +21,31 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
+object SQLConf {
+  val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
+  val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
+  val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
+
+  object Deprecated {
+    val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+  }
+}
+
 /**
- * SQLConf holds mutable config parameters and hints.  These can be set and
- * queried either by passing SET commands into Spark SQL's DSL
- * functions (sql(), hql(), etc.), or by programmatically using setters and
- * getters of this class.
+ * A trait that enables the setting and getting of mutable config parameters/hints.
+ *
+ * In the presence of a SQLContext, these can be set and queried by passing SET commands
+ * into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
+ * modify the hints by programmatically calling the setters and getters of this trait.
  *
- * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
+ * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
  */
 trait SQLConf {
   import SQLConf._
 
+  @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
+    new java.util.HashMap[String, String]())
+
   /** ************************ Spark SQL Params/Hints ******************* */
   // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
 
@@ -40,28 +54,33 @@ trait SQLConf {
 
   /**
    * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
-   * a broadcast value during the physical executions of join operations.  Setting this to 0
+   * a broadcast value during the physical executions of join operations.  Setting this to -1
    * effectively disables auto conversion.
-   * Hive setting: hive.auto.convert.join.noconditionaltask.size.
+   *
+   * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
    */
-  private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
+  private[spark] def autoBroadcastJoinThreshold: Int =
+    get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
 
-  /** A comma-separated list of table names marked to be broadcasted during joins. */
-  private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
+  /**
+   * The default size in bytes to assign to a logical operator's estimation statistics.  By default,
+   * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
+   * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
+   */
+  private[spark] def defaultSizeInBytes: Long =
+    getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)
 
   /** ********************** SQLConf functionality methods ************ */
 
-  @transient
-  private val settings = java.util.Collections.synchronizedMap(
-    new java.util.HashMap[String, String]())
-
   def set(props: Properties): Unit = {
-    props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
+    settings.synchronized {
+      props.asScala.foreach { case (k, v) => settings.put(k, v) }
+    }
   }
 
   def set(key: String, value: String): Unit = {
     require(key != null, "key cannot be null")
-    require(value != null, s"value cannot be null for $key")
+    require(value != null, s"value cannot be null for key: $key")
     settings.put(key, value)
   }
 
@@ -90,13 +109,3 @@ trait SQLConf {
   }
 
 }
-
-object SQLConf {
-  val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
-  val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
-  val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
-
-  object Deprecated {
-    val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index c178dad..a136c7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
-import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.columnar.InMemoryRelation
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.SparkStrategies
@@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group userf
    */
   implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
-    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
+    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
 
   /**
    * Loads a Parquet file, returning the result as a [[SchemaRDD]].
@@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
-    new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
+    new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
 
   /**
    * :: Experimental ::
@@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group userf
    */
   def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
-    val name = tableName
-    val newPlan = rdd.logicalPlan transform {
-      case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
-    }
-    catalog.registerTable(None, tableName, newPlan)
+    catalog.registerTable(None, tableName, rdd.logicalPlan)
   }
 
   /**
@@ -212,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
       case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
         inMem.cachedColumnBuffers.unpersist()
         catalog.unregisterTable(None, tableName)
-        catalog.registerTable(None, tableName, SparkLogicalPlan(e))
+        catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
       case inMem: InMemoryRelation =>
         inMem.cachedColumnBuffers.unpersist()
         catalog.unregisterTable(None, tableName)
@@ -405,7 +401,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
         new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
       }
     }
-    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
+    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 019ff9d..172b6e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -418,7 +418,8 @@ class SchemaRDD(
    * @group schema
    */
   private def applySchema(rdd: RDD[Row]): SchemaRDD = {
-    new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
+    new SchemaRDD(sqlContext,
+      SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
   }
 
   // =======================================================================

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index fe81721..fd75103 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
     // happen right away to let these side effects take place eagerly.
     case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
       queryExecution.toRdd
-      SparkLogicalPlan(queryExecution.executedPlan)
+      SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
     case _ =>
       baseLogicalPlan
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 790d9ef..806097c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
         new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
       }
     }
-    new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
+    new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
   }
 
   /**
@@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
    * @group userf
    */
   def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
-    new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
+    new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
 
   /**
    * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist only

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 27dc091..77c874d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Logging, Row}
+import org.apache.spark.sql.{Logging, Row, SQLContext}
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 
 /**
@@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
  * linking.
  */
 @DeveloperApi
-case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
-  extends BaseRelation with MultiInstanceRelation {
+case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
+  extends LogicalPlan with MultiInstanceRelation {
 
   def output = alreadyPlanned.output
   override def references = Set.empty
@@ -78,9 +78,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar
       alreadyPlanned match {
         case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
         case _ => sys.error("Multiple instance of the same relation detected.")
-      }, tableName)
-      .asInstanceOf[this.type]
+      })(sqlContext).asInstanceOf[this.type]
   }
+
+  @transient override lazy val statistics = Statistics(
+    // TODO: Instead of returning a default value here, find a way to return a meaningful size
+    // estimate for RDDs. See PR 1238 for more discussions.
+    sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+  )
+
 }
 
 private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c078e71..404d48a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.execution
 
+import scala.util.Try
+
 import org.apache.spark.sql.{SQLContext, execution}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
 import org.apache.spark.sql.parquet._
@@ -47,9 +49,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   /**
    * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
    * evaluated by matching hash keys.
+   *
+   * This strategy applies a simple optimization based on the estimates of the physical sizes of
+   * the two join sides.  When planning a [[execution.BroadcastHashJoin]], if one side has an
+   * estimated physical size smaller than the user-settable threshold
+   * [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
+   * ''build'' relation and mark the other relation as the ''stream'' side.  The build table will be
+   * ''broadcasted'' to all of the executors involved in the join, as a
+   * [[org.apache.spark.broadcast.Broadcast]] object.  If both estimates exceed the threshold, they
+   * will instead be used to decide the build side in a [[execution.ShuffledHashJoin]].
    */
   object HashJoin extends Strategy with PredicateHelper {
-    private[this] def broadcastHashJoin(
+    private[this] def makeBroadcastHashJoin(
         leftKeys: Seq[Expression],
         rightKeys: Seq[Expression],
         left: LogicalPlan,
@@ -61,33 +72,27 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
     }
 
-    def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer
-
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case ExtractEquiJoinKeys(
-              Inner,
-              leftKeys,
-              rightKeys,
-              condition,
-              left,
-              right @ PhysicalOperation(_, _, b: BaseRelation))
-        if broadcastTables.contains(b.tableName) =>
-          broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
+      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
+        if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
+          right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
+        makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
 
-      case ExtractEquiJoinKeys(
-              Inner,
-              leftKeys,
-              rightKeys,
-              condition,
-              left @ PhysicalOperation(_, _, b: BaseRelation),
-              right)
-        if broadcastTables.contains(b.tableName) =>
-          broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
+      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
+        if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
+          left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
+          makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
+        val buildSide =
+          if (Try(right.statistics.sizeInBytes <= left.statistics.sizeInBytes).getOrElse(false)) {
+            BuildRight
+          } else {
+            BuildLeft
+          }
         val hashJoin =
           execution.ShuffledHashJoin(
-            leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
+            leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
         condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
 
       case _ => Nil
@@ -273,8 +278,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.Limit(limit, planLater(child))(sqlContext) :: Nil
       case Unions(unionChildren) =>
         execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
-      case logical.Except(left,right) =>                                        
-        execution.Except(planLater(left),planLater(right)) :: Nil   
+      case logical.Except(left,right) =>
+        execution.Except(planLater(left),planLater(right)) :: Nil
       case logical.Intersect(left, right) =>
         execution.Intersect(planLater(left), planLater(right)) :: Nil
       case logical.Generate(generator, join, outer, _, child) =>
@@ -283,7 +288,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.ExistingRdd(Nil, singleRowRdd) :: Nil
       case logical.Repartition(expressions, child) =>
         execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
-      case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil
+      case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index b48c70e..6c2b553 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -28,11 +28,12 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.{SQLContext, Logging}
 
 private[sql] object JsonRDD extends Logging {
 
   private[sql] def inferSchema(
+      sqlContext: SQLContext,
       json: RDD[String],
       samplingRatio: Double = 1.0): LogicalPlan = {
     require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
@@ -40,15 +41,17 @@ private[sql] object JsonRDD extends Logging {
     val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
     val baseSchema = createSchema(allKeys)
 
-    createLogicalPlan(json, baseSchema)
+    createLogicalPlan(json, baseSchema, sqlContext)
   }
 
   private def createLogicalPlan(
       json: RDD[String],
-      baseSchema: StructType): LogicalPlan = {
+      baseSchema: StructType,
+      sqlContext: SQLContext): LogicalPlan = {
     val schema = nullTypeToStringType(baseSchema)
 
-    SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
+    SparkLogicalPlan(
+      ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext)
   }
 
   private def createSchema(allKeys: Set[(String, DataType)]): StructType = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 9c4771d..8c7dbd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -27,6 +27,7 @@ import parquet.hadoop.ParquetOutputFormat
 import parquet.hadoop.metadata.CompressionCodecName
 import parquet.schema.MessageType
 
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
@@ -45,7 +46,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
  */
 private[sql] case class ParquetRelation(
     path: String,
-    @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
+    @transient conf: Option[Configuration] = None)
+  extends LeafNode with MultiInstanceRelation {
 
   self: Product =>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index e17ecc8..025c396 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 
 class JoinSuite extends QueryTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 156b090..dff1d6a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,15 +19,16 @@ package org.apache.spark.sql.hive
 
 import scala.util.parsing.combinator.RegexParsers
 
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
 import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
 import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde2.Deserializer
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.{SQLContext, Logging}
 import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
@@ -64,9 +65,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
     // Since HiveQL is case insensitive for table names we make them all lowercase.
     MetastoreRelation(
-      databaseName,
-      tblName,
-      alias)(table.getTTable, partitions.map(part => part.getTPartition))
+      databaseName, tblName, alias)(
+      table.getTTable, partitions.map(part => part.getTPartition))(hive)
   }
 
   def createTable(
@@ -251,7 +251,11 @@ object HiveMetastoreTypes extends RegexParsers {
 private[hive] case class MetastoreRelation
     (databaseName: String, tableName: String, alias: Option[String])
     (val table: TTable, val partitions: Seq[TPartition])
-  extends BaseRelation {
+    (@transient sqlContext: SQLContext)
+  extends LeafNode {
+
+  self: Product =>
+
   // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
   // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
   // Right now, using org.apache.hadoop.hive.ql.metadata.Table and
@@ -264,6 +268,21 @@ private[hive] case class MetastoreRelation
     new Partition(hiveQlTable, p)
   }
 
+  @transient override lazy val statistics = Statistics(
+    sizeInBytes = {
+      // TODO: check if this estimate is valid for tables after partition pruning.
+      // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
+      // relatively cheap if parameters for the table are populated into the metastore.  An
+      // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
+      // of RPCs are involved.  Besides `totalSize`, there are also `numFiles`, `numRows`,
+      // `rawDataSize` keys that we can look at in the future.
+      BigInt(
+        Option(hiveQlTable.getParameters.get("totalSize"))
+          .map(_.toLong)
+          .getOrElse(sqlContext.defaultSizeInBytes))
+    }
+  )
+
   val tableDesc = new TableDesc(
     Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]],
     hiveQlTable.getInputFormatClass,
@@ -275,14 +294,14 @@ private[hive] case class MetastoreRelation
     hiveQlTable.getMetadata
   )
 
-   implicit class SchemaAttribute(f: FieldSchema) {
-     def toAttribute = AttributeReference(
-       f.getName,
-       HiveMetastoreTypes.toDataType(f.getType),
-       // Since data can be dumped in randomly with no validation, everything is nullable.
-       nullable = true
-     )(qualifiers = tableName +: alias.toSeq)
-   }
+  implicit class SchemaAttribute(f: FieldSchema) {
+    def toAttribute = AttributeReference(
+      f.getName,
+      HiveMetastoreTypes.toDataType(f.getType),
+      // Since data can be dumped in randomly with no validation, everything is nullable.
+      nullable = true
+    )(qualifiers = tableName +: alias.toSeq)
+  }
 
   // Must be a stable value since new attributes are born here.
   val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute)

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
new file mode 100644
index 0000000..a61fd9d
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+
+class StatisticsSuite extends QueryTest {
+
+  test("estimates the size of a test MetastoreRelation") {
+    val rdd = hql("""SELECT * FROM src""")
+    val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
+      mr.statistics.sizeInBytes
+    }
+    assert(sizes.size === 1)
+    assert(sizes(0).equals(BigInt(5812)),
+      s"expected exact size 5812 for test table 'src', got: ${sizes(0)}")
+  }
+
+  test("auto converts to broadcast hash join, by size estimate of a relation") {
+    def mkTest(
+        before: () => Unit,
+        after: () => Unit,
+        query: String,
+        expectedAnswer: Seq[Any],
+        ct: ClassTag[_]) = {
+      before()
+
+      var rdd = hql(query)
+
+      // Assert src has a size smaller than the threshold.
+      val sizes = rdd.queryExecution.analyzed.collect {
+        case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
+      }
+      assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold,
+        s"query should contain two relations, each of which has size smaller than autoConvertSize")
+
+      // Using `sparkPlan` because for relevant patterns in HashJoin to be
+      // matched, other strategies need to be applied.
+      var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+      assert(bhj.size === 1,
+        s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")
+
+      checkAnswer(rdd, expectedAnswer) // check correctness of output
+
+      TestHive.settings.synchronized {
+        val tmp = autoBroadcastJoinThreshold
+
+        hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
+        rdd = hql(query)
+        bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+        assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
+
+        val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
+        assert(shj.size === 1,
+          "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
+
+        hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
+      }
+
+      after()
+    }
+
+    /** Tests for MetastoreRelation */
+    val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key"""
+    val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238"))
+    mkTest(
+      () => (),
+      () => (),
+      metastoreQuery,
+      metastoreAnswer,
+      implicitly[ClassTag[MetastoreRelation]]
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index b4dbf2b..6c8fe4b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -132,7 +132,7 @@ abstract class HiveComparisonTest
     answer: Seq[String]): Seq[String] = {
 
     def isSorted(plan: LogicalPlan): Boolean = plan match {
-      case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
+      case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
       case PhysicalOperation(_, _, Sort(_, _)) => true
       case _ => plan.children.iterator.exists(isSorted)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index a022a1e..50f8528 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -21,7 +21,7 @@ import scala.util.Try
 
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{SchemaRDD, Row}
+import org.apache.spark.sql.{Row, SchemaRDD}
 
 case class TestData(a: Int, b: String)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7db274b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 91ad59d..3bfe49a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -35,7 +35,7 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
 
   override def beforeAll() {
     // write test data
-    ParquetTestData.writeFile
+    ParquetTestData.writeFile()
     testRDD = parquetFile(ParquetTestData.testDir.toString)
     testRDD.registerAsTable("testsource")
   }