You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/04 03:57:53 UTC

git commit: [SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect()

Repository: spark
Updated Branches:
  refs/heads/master 4bba10c41 -> f48420fde


[SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect()

By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`.

Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class.

Author: Cheng Lian <li...@gmail.com>

Closes #2215 from liancheng/lightweight-commands and squashes the following commits:

3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command
5a0e16c [Cheng Lian] Passes test suites
e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect
995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand
542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes
55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f48420fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f48420fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f48420fd

Branch: refs/heads/master
Commit: f48420fde58d554480cc8830d2f8c4d17618f283
Parents: 4bba10c
Author: Cheng Lian <li...@gmail.com>
Authored: Wed Sep 3 18:57:20 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Sep 3 18:57:20 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/commands.scala   | 63 +++++++-------------
 .../org/apache/spark/sql/hive/HiveContext.scala |  4 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  | 14 +++--
 .../execution/DescribeHiveTableCommand.scala    | 30 ++++------
 .../sql/hive/execution/NativeCommand.scala      | 11 +---
 .../spark/sql/hive/execution/commands.scala     | 20 ++-----
 6 files changed, 48 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f48420fd/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 031b695..286c6d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -21,11 +21,13 @@ import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
 
 trait Command {
+  this: SparkPlan =>
+
   /**
    * A concrete command should override this lazy field to wrap up any side effects caused by the
    * command or any other computation that should be evaluated exactly once. The value of this field
@@ -35,7 +37,11 @@ trait Command {
    * The `execute()` method of all the physical command classes should reference `sideEffectResult`
    * so that the command can be executed eagerly right after the command query is created.
    */
-  protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
+  protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
+
+  override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+  override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
 }
 
 /**
@@ -47,17 +53,17 @@ case class SetCommand(
     @transient context: SQLContext)
   extends LeafNode with Command with Logging {
 
-  override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match {
     // Set value for key k.
     case (Some(k), Some(v)) =>
       if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
         logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
           s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
         context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
-        Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+        Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
       } else {
         context.setConf(k, v)
-        Array(s"$k=$v")
+        Array(Row(s"$k=$v"))
       }
 
     // Query the value bound to key k.
@@ -73,28 +79,22 @@ case class SetCommand(
           "hive-0.12.0.jar").mkString(":")
 
         Array(
-          "system:java.class.path=" + hiveJars,
-          "system:sun.java.command=shark.SharkServer2")
-      }
-      else {
-        Array(s"$k=${context.getConf(k, "<undefined>")}")
+          Row("system:java.class.path=" + hiveJars),
+          Row("system:sun.java.command=shark.SharkServer2"))
+      } else {
+        Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
       }
 
     // Query all key-value pairs that are set in the SQLConf of the context.
     case (None, None) =>
       context.getAllConfs.map { case (k, v) =>
-        s"$k=$v"
+        Row(s"$k=$v")
       }.toSeq
 
     case _ =>
       throw new IllegalArgumentException()
   }
 
-  def execute(): RDD[Row] = {
-    val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
-    context.sparkContext.parallelize(rows, 1)
-  }
-
   override def otherCopyArgs = context :: Nil
 }
 
@@ -113,19 +113,14 @@ case class ExplainCommand(
   extends LeafNode with Command {
 
   // Run through the optimizer to generate the physical plan.
-  override protected[sql] lazy val sideEffectResult: Seq[String] = try {
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = try {
     // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
     val queryExecution = context.executePlan(logicalPlan)
     val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
 
-    outputString.split("\n")
+    outputString.split("\n").map(Row(_))
   } catch { case cause: TreeNodeException[_] =>
-    ("Error occurred during query planning: \n" + cause.getMessage).split("\n")
-  }
-
-  def execute(): RDD[Row] = {
-    val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
-    context.sparkContext.parallelize(explanation, 1)
+    ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
   }
 
   override def otherCopyArgs = context :: Nil
@@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
     } else {
       context.uncacheTable(tableName)
     }
-    Seq.empty[Any]
-  }
-
-  override def execute(): RDD[Row] = {
-    sideEffectResult
-    context.emptyResult
+    Seq.empty[Row]
   }
 
   override def output: Seq[Attribute] = Seq.empty
@@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
     @transient context: SQLContext)
   extends LeafNode with Command {
 
-  override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
-    Seq(("# Registered as a temporary table", null, null)) ++
-      child.output.map(field => (field.name, field.dataType.toString, null))
-  }
-
-  override def execute(): RDD[Row] = {
-    val rows = sideEffectResult.map {
-      case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
-    }
-    context.sparkContext.parallelize(rows, 1)
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+    Row("# Registered as a temporary table", null, null) +:
+      child.output.map(field => Row(field.name, field.dataType.toString, null))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f48420fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d9b2bc7..ced8397 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -389,7 +389,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         }.mkString("{", ",", "}")
       case (seq: Seq[_], ArrayType(typ, _)) =>
         seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
-      case (map: Map[_,_], MapType(kType, vType, _)) =>
+      case (map: Map[_, _], MapType(kType, vType, _)) =>
         map.map {
           case (key, value) =>
             toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
@@ -409,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         // be similar with Hive.
         describeHiveTableCommand.hiveString
       case command: PhysicalCommand =>
-        command.sideEffectResult.map(_.toString)
+        command.sideEffectResult.map(_.head.toString)
 
       case other =>
         val result: Seq[Seq[Any]] = toRdd.collect().toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/f48420fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 47e24f0..24abb1b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,17 +18,19 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.catalyst.types.StringType
 import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
+import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
+import org.apache.spark.sql.hive
+import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
 
 import scala.collection.JavaConversions._
 
@@ -196,9 +198,9 @@ private[hive] trait HiveStrategies {
       case logical.NativeCommand(sql) =>
         NativeCommand(sql, plan.output)(context) :: Nil
 
-      case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
+      case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
 
-      case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
+      case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
 
       case describe: logical.DescribeCommand =>
         val resolvedTable = context.executePlan(describe.table).analyzed

http://git-wip-us.apache.org/repos/asf/spark/blob/f48420fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index a40e89e..3178010 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
 import org.apache.spark.sql.execution.{Command, LeafNode}
 import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
 
@@ -41,26 +41,21 @@ case class DescribeHiveTableCommand(
   extends LeafNode with Command {
 
   // Strings with the format like Hive. It is used for result comparison in our unit tests.
-  lazy val hiveString: Seq[String] = {
-    val alignment = 20
-    val delim = "\t"
-
-    sideEffectResult.map {
-      case (name, dataType, comment) =>
-        String.format("%-" + alignment + "s", name) + delim +
-          String.format("%-" + alignment + "s", dataType) + delim +
-          String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
-    }
+  lazy val hiveString: Seq[String] = sideEffectResult.map {
+    case Row(name: String, dataType: String, comment) =>
+      Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("None"))
+        .map(s => String.format(s"%-20s", s))
+        .mkString("\t")
   }
 
-  override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
     // Trying to mimic the format of Hive's output. But not exactly the same.
     var results: Seq[(String, String, String)] = Nil
 
     val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
     val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
     results ++= columns.map(field => (field.getName, field.getType, field.getComment))
-    if (!partitionColumns.isEmpty) {
+    if (partitionColumns.nonEmpty) {
       val partColumnInfo =
         partitionColumns.map(field => (field.getName, field.getType, field.getComment))
       results ++=
@@ -74,14 +69,9 @@ case class DescribeHiveTableCommand(
       results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
     }
 
-    results
-  }
-
-  override def execute(): RDD[Row] = {
-    val rows = sideEffectResult.map {
-      case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
+    results.map { case (name, dataType, comment) =>
+      Row(name, dataType, comment)
     }
-    context.sparkContext.parallelize(rows, 1)
   }
 
   override def otherCopyArgs = context :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/f48420fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
index fe60316..8f10e1b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
@@ -32,16 +32,7 @@ case class NativeCommand(
     @transient context: HiveContext)
   extends LeafNode with Command {
 
-  override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
-
-  override def execute(): RDD[Row] = {
-    if (sideEffectResult.size == 0) {
-      context.emptyResult
-    } else {
-      val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
-      context.sparkContext.parallelize(rows, 1)
-    }
-  }
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))
 
   override def otherCopyArgs = context :: Nil
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f48420fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 2985169..a1a4aa7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -33,19 +33,13 @@ import org.apache.spark.sql.hive.HiveContext
  */
 @DeveloperApi
 case class AnalyzeTable(tableName: String) extends LeafNode with Command {
-
   def hiveContext = sqlContext.asInstanceOf[HiveContext]
 
   def output = Seq.empty
 
-  override protected[sql] lazy val sideEffectResult = {
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
     hiveContext.analyze(tableName)
-    Seq.empty[Any]
-  }
-
-  override def execute(): RDD[Row] = {
-    sideEffectResult
-    sparkContext.emptyRDD[Row]
+    Seq.empty[Row]
   }
 }
 
@@ -55,20 +49,14 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
  */
 @DeveloperApi
 case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {
-
   def hiveContext = sqlContext.asInstanceOf[HiveContext]
 
   def output = Seq.empty
 
-  override protected[sql] lazy val sideEffectResult: Seq[Any] = {
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
     val ifExistsClause = if (ifExists) "IF EXISTS " else ""
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
     hiveContext.catalog.unregisterTable(None, tableName)
-    Seq.empty
-  }
-
-  override def execute(): RDD[Row] = {
-    sideEffectResult
-    sparkContext.emptyRDD[Row]
+    Seq.empty[Row]
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org