You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/21 02:04:51 UTC

[8/9] SPARK-1251 Support for optimizing and executing structured queries

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 382a38d..a5569ff 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -72,6 +72,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-graphx_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/examples/src/main/resources/kv1.txt
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/kv1.txt b/examples/src/main/resources/kv1.txt
new file mode 100644
index 0000000..9825414
--- /dev/null
+++ b/examples/src/main/resources/kv1.txt
@@ -0,0 +1,500 @@
+238val_238
+86val_86
+311val_311
+27val_27
+165val_165
+409val_409
+255val_255
+278val_278
+98val_98
+484val_484
+265val_265
+193val_193
+401val_401
+150val_150
+273val_273
+224val_224
+369val_369
+66val_66
+128val_128
+213val_213
+146val_146
+406val_406
+429val_429
+374val_374
+152val_152
+469val_469
+145val_145
+495val_495
+37val_37
+327val_327
+281val_281
+277val_277
+209val_209
+15val_15
+82val_82
+403val_403
+166val_166
+417val_417
+430val_430
+252val_252
+292val_292
+219val_219
+287val_287
+153val_153
+193val_193
+338val_338
+446val_446
+459val_459
+394val_394
+237val_237
+482val_482
+174val_174
+413val_413
+494val_494
+207val_207
+199val_199
+466val_466
+208val_208
+174val_174
+399val_399
+396val_396
+247val_247
+417val_417
+489val_489
+162val_162
+377val_377
+397val_397
+309val_309
+365val_365
+266val_266
+439val_439
+342val_342
+367val_367
+325val_325
+167val_167
+195val_195
+475val_475
+17val_17
+113val_113
+155val_155
+203val_203
+339val_339
+0val_0
+455val_455
+128val_128
+311val_311
+316val_316
+57val_57
+302val_302
+205val_205
+149val_149
+438val_438
+345val_345
+129val_129
+170val_170
+20val_20
+489val_489
+157val_157
+378val_378
+221val_221
+92val_92
+111val_111
+47val_47
+72val_72
+4val_4
+280val_280
+35val_35
+427val_427
+277val_277
+208val_208
+356val_356
+399val_399
+169val_169
+382val_382
+498val_498
+125val_125
+386val_386
+437val_437
+469val_469
+192val_192
+286val_286
+187val_187
+176val_176
+54val_54
+459val_459
+51val_51
+138val_138
+103val_103
+239val_239
+213val_213
+216val_216
+430val_430
+278val_278
+176val_176
+289val_289
+221val_221
+65val_65
+318val_318
+332val_332
+311val_311
+275val_275
+137val_137
+241val_241
+83val_83
+333val_333
+180val_180
+284val_284
+12val_12
+230val_230
+181val_181
+67val_67
+260val_260
+404val_404
+384val_384
+489val_489
+353val_353
+373val_373
+272val_272
+138val_138
+217val_217
+84val_84
+348val_348
+466val_466
+58val_58
+8val_8
+411val_411
+230val_230
+208val_208
+348val_348
+24val_24
+463val_463
+431val_431
+179val_179
+172val_172
+42val_42
+129val_129
+158val_158
+119val_119
+496val_496
+0val_0
+322val_322
+197val_197
+468val_468
+393val_393
+454val_454
+100val_100
+298val_298
+199val_199
+191val_191
+418val_418
+96val_96
+26val_26
+165val_165
+327val_327
+230val_230
+205val_205
+120val_120
+131val_131
+51val_51
+404val_404
+43val_43
+436val_436
+156val_156
+469val_469
+468val_468
+308val_308
+95val_95
+196val_196
+288val_288
+481val_481
+457val_457
+98val_98
+282val_282
+197val_197
+187val_187
+318val_318
+318val_318
+409val_409
+470val_470
+137val_137
+369val_369
+316val_316
+169val_169
+413val_413
+85val_85
+77val_77
+0val_0
+490val_490
+87val_87
+364val_364
+179val_179
+118val_118
+134val_134
+395val_395
+282val_282
+138val_138
+238val_238
+419val_419
+15val_15
+118val_118
+72val_72
+90val_90
+307val_307
+19val_19
+435val_435
+10val_10
+277val_277
+273val_273
+306val_306
+224val_224
+309val_309
+389val_389
+327val_327
+242val_242
+369val_369
+392val_392
+272val_272
+331val_331
+401val_401
+242val_242
+452val_452
+177val_177
+226val_226
+5val_5
+497val_497
+402val_402
+396val_396
+317val_317
+395val_395
+58val_58
+35val_35
+336val_336
+95val_95
+11val_11
+168val_168
+34val_34
+229val_229
+233val_233
+143val_143
+472val_472
+322val_322
+498val_498
+160val_160
+195val_195
+42val_42
+321val_321
+430val_430
+119val_119
+489val_489
+458val_458
+78val_78
+76val_76
+41val_41
+223val_223
+492val_492
+149val_149
+449val_449
+218val_218
+228val_228
+138val_138
+453val_453
+30val_30
+209val_209
+64val_64
+468val_468
+76val_76
+74val_74
+342val_342
+69val_69
+230val_230
+33val_33
+368val_368
+103val_103
+296val_296
+113val_113
+216val_216
+367val_367
+344val_344
+167val_167
+274val_274
+219val_219
+239val_239
+485val_485
+116val_116
+223val_223
+256val_256
+263val_263
+70val_70
+487val_487
+480val_480
+401val_401
+288val_288
+191val_191
+5val_5
+244val_244
+438val_438
+128val_128
+467val_467
+432val_432
+202val_202
+316val_316
+229val_229
+469val_469
+463val_463
+280val_280
+2val_2
+35val_35
+283val_283
+331val_331
+235val_235
+80val_80
+44val_44
+193val_193
+321val_321
+335val_335
+104val_104
+466val_466
+366val_366
+175val_175
+403val_403
+483val_483
+53val_53
+105val_105
+257val_257
+406val_406
+409val_409
+190val_190
+406val_406
+401val_401
+114val_114
+258val_258
+90val_90
+203val_203
+262val_262
+348val_348
+424val_424
+12val_12
+396val_396
+201val_201
+217val_217
+164val_164
+431val_431
+454val_454
+478val_478
+298val_298
+125val_125
+431val_431
+164val_164
+424val_424
+187val_187
+382val_382
+5val_5
+70val_70
+397val_397
+480val_480
+291val_291
+24val_24
+351val_351
+255val_255
+104val_104
+70val_70
+163val_163
+438val_438
+119val_119
+414val_414
+200val_200
+491val_491
+237val_237
+439val_439
+360val_360
+248val_248
+479val_479
+305val_305
+417val_417
+199val_199
+444val_444
+120val_120
+429val_429
+169val_169
+443val_443
+323val_323
+325val_325
+277val_277
+230val_230
+478val_478
+178val_178
+468val_468
+310val_310
+317val_317
+333val_333
+493val_493
+460val_460
+207val_207
+249val_249
+265val_265
+480val_480
+83val_83
+136val_136
+353val_353
+172val_172
+214val_214
+462val_462
+233val_233
+406val_406
+133val_133
+175val_175
+189val_189
+454val_454
+375val_375
+401val_401
+421val_421
+407val_407
+384val_384
+256val_256
+26val_26
+134val_134
+67val_67
+384val_384
+379val_379
+18val_18
+462val_462
+492val_492
+100val_100
+298val_298
+9val_9
+341val_341
+498val_498
+146val_146
+458val_458
+362val_362
+186val_186
+285val_285
+348val_348
+167val_167
+18val_18
+273val_273
+183val_183
+281val_281
+344val_344
+97val_97
+469val_469
+315val_315
+84val_84
+28val_28
+37val_37
+448val_448
+152val_152
+348val_348
+307val_307
+194val_194
+414val_414
+477val_477
+222val_222
+126val_126
+90val_90
+169val_169
+403val_403
+400val_400
+200val_200
+97val_97

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/examples/src/main/resources/people.txt
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/people.txt b/examples/src/main/resources/people.txt
new file mode 100644
index 0000000..3bcace4
--- /dev/null
+++ b/examples/src/main/resources/people.txt
@@ -0,0 +1,3 @@
+Michael, 29
+Andy, 30
+Justin, 19

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
new file mode 100644
index 0000000..abcc1f0
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.examples
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.LocalHiveContext
+
+object HiveFromSpark {
+  case class Record(key: Int, value: String)
+
+  def main(args: Array[String]) {
+    val sc = new SparkContext("local", "HiveFromSpark")
+
+    // A local hive context creates an instance of the Hive Metastore in process, storing the
+    // the warehouse data in the current directory.  This location can be overridden by
+    // specifying a second parameter to the constructor.
+    val hiveContext = new LocalHiveContext(sc)
+    import hiveContext._
+
+    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+    sql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")
+
+    // Queries are expressed in HiveQL
+    println("Result of 'SELECT *': ")
+    sql("SELECT * FROM src").collect.foreach(println)
+
+    // Aggregation queries are also supported.
+    val count = sql("SELECT COUNT(*) FROM src").collect().head.getInt(0)
+    println(s"COUNT(*): $count")
+
+    // The results of SQL queries are themselves RDDs and support all normal RDD functions.  The
+    // items in the RDD are of type Row, which allows you to access each column by ordinal.
+    val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+
+    println("Result of RDD.map:")
+    val rddAsStrings = rddFromSql.map {
+      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
+    }
+
+    // You can also register RDDs as temporary tables within a HiveContext.
+    val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
+    rdd.registerAsTable("records")
+
+    // Queries can then join RDD data with data stored in Hive.
+    println("Result of SELECT *:")
+    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
new file mode 100644
index 0000000..8210ad9
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.examples
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+
+// One method for defining the schema of an RDD is to make a case class with the desired column
+// names and types.
+case class Record(key: Int, value: String)
+
+object RDDRelation {
+  def main(args: Array[String]) {
+    val sc = new SparkContext("local", "RDDRelation")
+    val sqlContext = new SQLContext(sc)
+
+    // Importing the SQL context gives access to all the SQL functions and implicit conversions.
+    import sqlContext._
+
+    val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
+    // Any RDD containing case classes can be registered as a table.  The schema of the table is
+    // automatically inferred using scala reflection.
+    rdd.registerAsTable("records")
+
+    // Once tables have been registered, you can run SQL queries over them.
+    println("Result of SELECT *:")
+    sql("SELECT * FROM records").collect().foreach(println)
+
+    // Aggregation queries are also supported.
+    val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0)
+    println(s"COUNT(*): $count")
+
+    // The results of SQL queries are themselves RDDs and support all normal RDD functions.  The
+    // items in the RDD are of type Row, which allows you to access each column by ordinal.
+    val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10")
+
+    println("Result of RDD.map:")
+    rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println)
+
+    // Queries can also be written using a LINQ-like Scala DSL.
+    rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
+
+    // Write out an RDD as a parquet file.
+    rdd.saveAsParquetFile("pair.parquet")
+
+    // Read in parquet file.  Parquet files are self-describing so the schmema is preserved.
+    val parquetFile = sqlContext.parquetFile("pair.parquet")
+
+    // Queries can be run using the DSL on parequet files just like the original RDD.
+    parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)
+
+    // These files can also be registered as tables.
+    parquetFile.registerAsTable("parquetFile")
+    sql("SELECT * FROM parquetFile").collect().foreach(println)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/graphx/pom.xml
----------------------------------------------------------------------
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 894a7c2..5a50229 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -29,7 +29,7 @@
   <artifactId>spark-graphx_2.10</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project GraphX</name>
-  <url>http://spark-project.org/</url>
+  <url>http://spark.apache.org/</url>
 
   <profiles>
     <profile>

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 524e5da..9db34a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,9 @@
     <module>mllib</module>
     <module>tools</module>
     <module>streaming</module>
+    <module>sql/catalyst</module>
+    <module>sql/core</module>
+    <module>sql/hive</module>
     <module>repl</module>
     <module>assembly</module>
     <module>external/twitter</module>
@@ -118,6 +121,8 @@
     <protobuf.version>2.4.1</protobuf.version>
     <yarn.version>0.23.7</yarn.version>
     <hbase.version>0.94.6</hbase.version>
+    <hive.version>0.12.0</hive.version>
+    <parquet.version>1.3.2</parquet.version>
 
     <PermGen>64m</PermGen>
     <MaxPermGen>512m</MaxPermGen>

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aff191c..e4ad659 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -52,7 +52,7 @@ object SparkBuild extends Build {
   lazy val core = Project("core", file("core"), settings = coreSettings)
 
   lazy val repl = Project("repl", file("repl"), settings = replSettings)
-    .dependsOn(core, graphx, bagel, mllib)
+    .dependsOn(core, graphx, bagel, mllib, sql)
 
   lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
 
@@ -60,12 +60,19 @@ object SparkBuild extends Build {
 
   lazy val graphx = Project("graphx", file("graphx"), settings = graphxSettings) dependsOn(core)
 
+  lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core)
+
+  lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core, catalyst)
+
+  // Since hive is its own assembly, it depends on all of the modules.
+  lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql, graphx, bagel, mllib, streaming, repl)
+
   lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core)
 
   lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
 
   lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
-    .dependsOn(core, graphx, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) dependsOn(maybeGanglia: _*)
+    .dependsOn(core, graphx, bagel, mllib, streaming, repl, sql) dependsOn(maybeYarn: _*) dependsOn(maybeGanglia: _*)
 
   lazy val assembleDeps = TaskKey[Unit]("assemble-deps", "Build assembly of dependencies and packages Spark projects")
 
@@ -131,13 +138,13 @@ object SparkBuild extends Build {
   lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
 
   lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
-    .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
+    .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter, hive) dependsOn(allExternal: _*)
 
-  // Everything except assembly, tools, java8Tests and examples belong to packageProjects
-  lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef ++ maybeGangliaRef
+  // Everything except assembly, hive, tools, java8Tests and examples belong to packageProjects
+  lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx, catalyst, sql) ++ maybeYarnRef ++ maybeGangliaRef
 
   lazy val allProjects = packageProjects ++ allExternalRefs ++
-    Seq[ProjectReference](examples, tools, assemblyProj) ++ maybeJava8Tests
+    Seq[ProjectReference](examples, tools, assemblyProj, hive) ++ maybeJava8Tests
 
   def sharedSettings = Defaults.defaultSettings ++ Seq(
     organization       := "org.apache.spark",
@@ -164,7 +171,7 @@ object SparkBuild extends Build {
     // Show full stack trace and duration in test cases.
     testOptions in Test += Tests.Argument("-oDF"),
     // Remove certain packages from Scaladoc
-    scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq(
+    scalacOptions in (Compile,doc) := Seq("-groups", "-skip-packages", Seq(
       "akka",
       "org.apache.spark.network",
       "org.apache.spark.deploy",
@@ -362,6 +369,61 @@ object SparkBuild extends Build {
     )
   )
 
+  def catalystSettings = sharedSettings ++ Seq(
+    name := "catalyst",
+    // The mechanics of rewriting expression ids to compare trees in some test cases makes
+    // assumptions about the the expression ids being contiguious.  Running tests in parallel breaks
+    // this non-deterministically.  TODO: FIX THIS.
+    parallelExecution in Test := false,
+    libraryDependencies ++= Seq(
+      "org.scalatest" %% "scalatest" % "1.9.1" % "test",
+      "com.typesafe" %% "scalalogging-slf4j" % "1.0.1"
+    )
+  )
+
+  def sqlCoreSettings = sharedSettings ++ Seq(
+    name := "spark-sql",
+    libraryDependencies ++= Seq(
+      "com.twitter" % "parquet-column" % "1.3.2",
+      "com.twitter" % "parquet-hadoop" % "1.3.2"
+    )
+  )
+
+  // Since we don't include hive in the main assembly this project also acts as an alternative
+  // assembly jar.
+  def hiveSettings = sharedSettings ++ assemblyProjSettings ++ Seq(
+    name := "spark-hive",
+    jarName in assembly <<= version map { v => "spark-hive-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" },
+    jarName in packageDependency <<= version map { v => "spark-hive-assembly-" + v + "-hadoop" + hadoopVersion + "-deps.jar" },
+    javaOptions += "-XX:MaxPermSize=1g",
+    libraryDependencies ++= Seq(
+      "org.apache.hive" % "hive-metastore" % "0.12.0",
+      "org.apache.hive" % "hive-exec" % "0.12.0",
+      "org.apache.hive" % "hive-serde" % "0.12.0"
+    ),
+    // Multiple queries rely on the TestHive singleton.  See comments there for more details.
+    parallelExecution in Test := false,
+    // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
+    // only for this subproject.
+    scalacOptions <<= scalacOptions map { currentOpts: Seq[String] =>
+      currentOpts.filterNot(_ == "-deprecation")
+    },
+    initialCommands in console :=
+      """
+        |import org.apache.spark.sql.catalyst.analysis._
+        |import org.apache.spark.sql.catalyst.dsl._
+        |import org.apache.spark.sql.catalyst.errors._
+        |import org.apache.spark.sql.catalyst.expressions._
+        |import org.apache.spark.sql.catalyst.plans.logical._
+        |import org.apache.spark.sql.catalyst.rules._
+        |import org.apache.spark.sql.catalyst.types._
+        |import org.apache.spark.sql.catalyst.util._
+        |import org.apache.spark.sql.execution
+        |import org.apache.spark.sql.hive._
+        |import org.apache.spark.sql.hive.TestHive._
+        |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
+  )
+
   def streamingSettings = sharedSettings ++ Seq(
     name := "spark-streaming",
     libraryDependencies ++= Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/README.md
----------------------------------------------------------------------
diff --git a/sql/README.md b/sql/README.md
new file mode 100644
index 0000000..4192fec
--- /dev/null
+++ b/sql/README.md
@@ -0,0 +1,80 @@
+Spark SQL
+=========
+
+This module provides support for executing relational queries expressed in either SQL or a LINQ-like Scala DSL.
+
+Spark SQL is broken up into three subprojects:
+ - Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
+ - Execution (sql/core) - A query planner / execution engine for translating Catalyst’s logical query plans into Spark RDDs.  This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
+ - Hive Support (sql/hive) - Includes an extension of SQLContext called HiveContext that allows users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes.  There are also wrappers that allows users to run queries that include Hive UDFs, UDAFs, and UDTFs.
+
+
+Other dependencies for developers
+---------------------------------
+In order to create new hive test cases , you will need to set several environmental variables.
+
+```
+export HIVE_HOME="<path to>/hive/build/dist"
+export HIVE_DEV_HOME="<path to>/hive/"
+export HADOOP_HOME="<path to>/hadoop-1.0.4"
+```
+
+Using the console
+=================
+An interactive scala console can be invoked by running `sbt/sbt hive/console`.  From here you can execute queries and inspect the various stages of query optimization.
+
+```scala
+catalyst$ sbt/sbt hive/console
+
+[info] Starting scala interpreter...
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.dsl._
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution
+import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.TestHive._
+Welcome to Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45).
+Type in expressions to have them evaluated.
+Type :help for more information.
+
+scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
+query: org.apache.spark.sql.ExecutedQuery =
+SELECT * FROM (SELECT * FROM src) a
+=== Query Plan ===
+Project [key#6:0.0,value#7:0.1]
+ HiveTableScan [key#6,value#7], (MetastoreRelation default, src, None), None
+```
+
+Query results are RDDs and can be operated as such.
+```
+scala> query.collect()
+res8: Array[org.apache.spark.sql.execution.Row] = Array([238,val_238], [86,val_86], [311,val_311]...
+```
+
+You can also build further queries on top of these RDDs using the query DSL.
+```
+scala> query.where('key === 100).toRdd.collect()
+res11: Array[org.apache.spark.sql.execution.Row] = Array([100,val_100], [100,val_100])
+```
+
+From the console you can even write rules that transform query plans.  For example, the above query has redundant project operators that aren't doing anything.  This redundancy can be eliminated using the `transform` function that is available on all [`TreeNode`](http://databricks.github.io/catalyst/latest/api/#catalyst.trees.TreeNode) objects.
+```scala
+scala> query.logicalPlan
+res1: catalyst.plans.logical.LogicalPlan = 
+Project {key#0,value#1}
+ Project {key#0,value#1}
+  MetastoreRelation default, src, None
+
+
+scala> query.logicalPlan transform {
+     |   case Project(projectList, child) if projectList == child.output => child
+     | }
+res2: catalyst.plans.logical.LogicalPlan = 
+Project {key#0,value#1}
+ MetastoreRelation default, src, None
+```

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/pom.xml
----------------------------------------------------------------------
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
new file mode 100644
index 0000000..740f1fd
--- /dev/null
+++ b/sql/catalyst/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-catalyst_2.10</artifactId>
+    <packaging>jar</packaging>
+    <name>Spark Project Catalyst</name>
+    <url>http://spark.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>scalalogging-slf4j_${scala.binary.version}</artifactId>
+            <version>1.0.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalacheck</groupId>
+            <artifactId>scalacheck_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
new file mode 100644
index 0000000..d3b1070
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import scala.util.matching.Regex
+import scala.util.parsing.combinator._
+import scala.util.parsing.input.CharArrayReader.EofCh
+import lexical._
+import syntactical._
+import token._
+
+import analysis._
+import expressions._
+import plans._
+import plans.logical._
+import types._
+
+/**
+ * A very simple SQL parser.  Based loosly on:
+ * https://github.com/stephentu/scala-sql-parser/blob/master/src/main/scala/parser.scala
+ *
+ * Limitations:
+ *  - Only supports a very limited subset of SQL.
+ *  - Keywords must be capital.
+ *
+ * This is currently included mostly for illustrative purposes.  Users wanting more complete support
+ * for a SQL like language should checkout the HiveQL support in the sql/hive subproject.
+ */
+class SqlParser extends StandardTokenParsers {
+
+  def apply(input: String): LogicalPlan = {
+    phrase(query)(new lexical.Scanner(input)) match {
+      case Success(r, x) => r
+      case x => sys.error(x.toString)
+    }
+  }
+
+  protected case class Keyword(str: String)
+  protected implicit def asParser(k: Keyword): Parser[String] = k.str
+
+  protected class SqlLexical extends StdLexical {
+    case class FloatLit(chars: String) extends Token {
+      override def toString = chars
+    }
+    override lazy val token: Parser[Token] = (
+        identChar ~ rep( identChar | digit ) ^^
+          { case first ~ rest => processIdent(first :: rest mkString "") }
+      | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
+        case i ~ None    => NumericLit(i mkString "")
+        case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
+      }
+      | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
+        { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
+      | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
+        { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
+      | EofCh ^^^ EOF
+      | '\'' ~> failure("unclosed string literal")
+      | '\"' ~> failure("unclosed string literal")
+      | delim
+      | failure("illegal character")
+    )
+
+    override def identChar = letter | elem('.') | elem('_')
+
+    override def whitespace: Parser[Any] = rep(
+      whitespaceChar
+    | '/' ~ '*' ~ comment
+    | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
+    | '#' ~ rep( chrExcept(EofCh, '\n') )
+    | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
+    | '/' ~ '*' ~ failure("unclosed comment")
+    )
+  }
+
+  override val lexical = new SqlLexical
+
+  protected val ALL = Keyword("ALL")
+  protected val AND = Keyword("AND")
+  protected val AS = Keyword("AS")
+  protected val ASC = Keyword("ASC")
+  protected val AVG = Keyword("AVG")
+  protected val BY = Keyword("BY")
+  protected val CAST = Keyword("CAST")
+  protected val COUNT = Keyword("COUNT")
+  protected val DESC = Keyword("DESC")
+  protected val DISTINCT = Keyword("DISTINCT")
+  protected val FALSE = Keyword("FALSE")
+  protected val FIRST = Keyword("FIRST")
+  protected val FROM = Keyword("FROM")
+  protected val FULL = Keyword("FULL")
+  protected val GROUP = Keyword("GROUP")
+  protected val HAVING = Keyword("HAVING")
+  protected val IF = Keyword("IF")
+  protected val IN = Keyword("IN")
+  protected val INNER = Keyword("INNER")
+  protected val IS = Keyword("IS")
+  protected val JOIN = Keyword("JOIN")
+  protected val LEFT = Keyword("LEFT")
+  protected val LIMIT = Keyword("LIMIT")
+  protected val NOT = Keyword("NOT")
+  protected val NULL = Keyword("NULL")
+  protected val ON = Keyword("ON")
+  protected val OR = Keyword("OR")
+  protected val ORDER = Keyword("ORDER")
+  protected val OUTER = Keyword("OUTER")
+  protected val RIGHT = Keyword("RIGHT")
+  protected val SELECT = Keyword("SELECT")
+  protected val STRING = Keyword("STRING")
+  protected val SUM = Keyword("SUM")
+  protected val TRUE = Keyword("TRUE")
+  protected val UNION = Keyword("UNION")
+  protected val WHERE = Keyword("WHERE")
+
+  // Use reflection to find the reserved words defined in this class.
+  protected val reservedWords =
+    this.getClass
+      .getMethods
+      .filter(_.getReturnType == classOf[Keyword])
+      .map(_.invoke(this).asInstanceOf[Keyword])
+
+  lexical.reserved ++= reservedWords.map(_.str)
+
+  lexical.delimiters += (
+    "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+    ",", ";", "%", "{", "}", ":"
+  )
+
+  protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
+    exprs.zipWithIndex.map {
+      case (ne: NamedExpression, _) => ne
+      case (e, i) => Alias(e, s"c$i")()
+    }
+  }
+
+  protected lazy val query: Parser[LogicalPlan] =
+    select * (
+      UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
+      UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
+    )
+
+  protected lazy val select: Parser[LogicalPlan] =
+    SELECT ~> opt(DISTINCT) ~ projections ~
+    opt(from) ~ opt(filter) ~
+    opt(grouping) ~
+    opt(having) ~
+    opt(orderBy) ~
+    opt(limit) <~ opt(";") ^^ {
+      case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
+        val base = r.getOrElse(NoRelation)
+        val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
+        val withProjection =
+          g.map {g =>
+            Aggregate(assignAliases(g), assignAliases(p), withFilter)
+          }.getOrElse(Project(assignAliases(p), withFilter))
+        val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
+        val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
+        val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
+        val withLimit = l.map { l => StopAfter(l, withOrder) }.getOrElse(withOrder)
+        withLimit
+  }
+
+  protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
+
+  protected lazy val projection: Parser[Expression] =
+    expression ~ (opt(AS) ~> opt(ident)) ^^ {
+      case e ~ None => e
+      case e ~ Some(a) => Alias(e, a)()
+    }
+
+  protected lazy val from: Parser[LogicalPlan] = FROM ~> relations
+
+  // Based very loosly on the MySQL Grammar.
+  // http://dev.mysql.com/doc/refman/5.0/en/join.html
+  protected lazy val relations: Parser[LogicalPlan] =
+    relation ~ "," ~ relation ^^ { case r1 ~ _ ~ r2 => Join(r1, r2, Inner, None) } |
+    relation
+
+  protected lazy val relation: Parser[LogicalPlan] =
+    joinedRelation |
+    relationFactor
+
+  protected lazy val relationFactor: Parser[LogicalPlan] =
+    ident ~ (opt(AS) ~> opt(ident)) ^^ {
+      case ident ~ alias => UnresolvedRelation(alias, ident)
+    } |
+    "(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }
+
+   protected lazy val joinedRelation: Parser[LogicalPlan] =
+     relationFactor ~ opt(joinType) ~ JOIN ~ relationFactor ~ opt(joinConditions) ^^ {
+      case r1 ~ jt ~ _ ~ r2 ~ cond =>
+        Join(r1, r2, joinType = jt.getOrElse(Inner), cond)
+     }
+
+   protected lazy val joinConditions: Parser[Expression] =
+     ON ~> expression
+
+   protected lazy val joinType: Parser[JoinType] =
+     INNER ^^^ Inner |
+     LEFT ~ opt(OUTER) ^^^ LeftOuter |
+     RIGHT ~ opt(OUTER) ^^^ RightOuter |
+     FULL ~ opt(OUTER) ^^^ FullOuter
+
+  protected lazy val filter: Parser[Expression] = WHERE ~ expression ^^ { case _ ~ e => e }
+
+  protected lazy val orderBy: Parser[Seq[SortOrder]] =
+    ORDER ~> BY ~> ordering
+
+  protected lazy val ordering: Parser[Seq[SortOrder]] =
+    rep1sep(singleOrder, ",") |
+    rep1sep(expression, ",") ~ opt(direction) ^^ {
+      case exps ~ None => exps.map(SortOrder(_, Ascending))
+      case exps ~ Some(d) => exps.map(SortOrder(_, d))
+    }
+
+  protected lazy val singleOrder: Parser[SortOrder] =
+    expression ~ direction ^^ { case e ~ o => SortOrder(e,o) }
+
+  protected lazy val direction: Parser[SortDirection] =
+    ASC ^^^ Ascending |
+    DESC ^^^ Descending
+
+  protected lazy val grouping: Parser[Seq[Expression]] =
+    GROUP ~> BY ~> rep1sep(expression, ",")
+
+  protected lazy val having: Parser[Expression] =
+    HAVING ~> expression
+
+  protected lazy val limit: Parser[Expression] =
+    LIMIT ~> expression
+
+  protected lazy val expression: Parser[Expression] = orExpression
+
+  protected lazy val orExpression: Parser[Expression] =
+    andExpression * (OR ^^^ { (e1: Expression, e2: Expression) => Or(e1,e2) })
+
+  protected lazy val andExpression: Parser[Expression] =
+    comparisionExpression * (AND ^^^ { (e1: Expression, e2: Expression) => And(e1,e2) })
+
+  protected lazy val comparisionExpression: Parser[Expression] =
+    termExpression ~ "=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Equals(e1, e2) } |
+    termExpression ~ "<" ~ termExpression ^^ { case e1 ~ _ ~ e2 => LessThan(e1, e2) } |
+    termExpression ~ "<=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => LessThanOrEqual(e1, e2) } |
+    termExpression ~ ">" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThan(e1, e2) } |
+    termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThanOrEqual(e1, e2) } |
+    termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(Equals(e1, e2)) } |
+    termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(Equals(e1, e2)) } |
+    termExpression ~ IN ~ "(" ~ rep1sep(termExpression, ",") <~ ")" ^^ {
+      case e1 ~ _ ~ _ ~ e2 => In(e1, e2)
+    } |
+    termExpression ~ NOT ~ IN ~ "(" ~ rep1sep(termExpression, ",") <~ ")" ^^ {
+      case e1 ~ _ ~ _ ~ _ ~ e2 => Not(In(e1, e2))
+    } |
+    termExpression <~ IS ~ NULL ^^ { case e => IsNull(e) } |
+    termExpression <~ IS ~ NOT ~ NULL ^^ { case e => IsNotNull(e) } |
+    NOT ~> termExpression ^^ {e => Not(e)} |
+    termExpression
+
+  protected lazy val termExpression: Parser[Expression] =
+    productExpression * (
+      "+" ^^^ { (e1: Expression, e2: Expression) => Add(e1,e2) } |
+      "-" ^^^ { (e1: Expression, e2: Expression) => Subtract(e1,e2) } )
+
+  protected lazy val productExpression: Parser[Expression] =
+    baseExpression * (
+      "*" ^^^ { (e1: Expression, e2: Expression) => Multiply(e1,e2) } |
+      "/" ^^^ { (e1: Expression, e2: Expression) => Divide(e1,e2) } |
+      "%" ^^^ { (e1: Expression, e2: Expression) => Remainder(e1,e2) }
+    )
+
+  protected lazy val function: Parser[Expression] =
+    SUM ~> "(" ~> expression <~ ")" ^^ { case exp => Sum(exp) } |
+    SUM ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => SumDistinct(exp) } |
+    COUNT ~> "(" ~ "*" <~ ")" ^^ { case _ => Count(Literal(1)) } |
+    COUNT ~> "(" ~ expression <~ ")" ^^ { case dist ~ exp => Count(exp) } |
+    COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) } |
+    FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } |
+    AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } |
+    IF ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ {
+      case c ~ "," ~ t ~ "," ~ f => If(c,t,f)
+    } |
+    ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
+      case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
+    }
+
+  protected lazy val cast: Parser[Expression] =
+    CAST ~> "(" ~> expression ~ AS ~ dataType <~ ")" ^^ { case exp ~ _ ~ t => Cast(exp, t) }
+
+  protected lazy val literal: Parser[Literal] =
+    numericLit ^^ {
+      case i if i.toLong > Int.MaxValue => Literal(i.toLong)
+      case i => Literal(i.toInt)
+    } |
+    NULL ^^^ Literal(null, NullType) |
+    floatLit ^^ {case f => Literal(f.toDouble) } |
+    stringLit ^^ {case s => Literal(s, StringType) }
+
+  protected lazy val floatLit: Parser[String] =
+    elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
+
+  protected lazy val baseExpression: Parser[Expression] =
+    TRUE ^^^ Literal(true, BooleanType) |
+    FALSE ^^^ Literal(false, BooleanType) |
+    cast |
+    "(" ~> expression <~ ")" |
+    function |
+    "-" ~> literal ^^ UnaryMinus |
+    ident ^^ UnresolvedAttribute |
+    "*" ^^^ Star(None) |
+    literal
+
+  protected lazy val dataType: Parser[DataType] =
+    STRING ^^^ StringType
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
new file mode 100644
index 0000000..9eb992e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package analysis
+
+import expressions._
+import plans.logical._
+import rules._
+
+/**
+ * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
+ * when all relations are already filled in and the analyser needs only to resolve attribute
+ * references.
+ */
+object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
+
+/**
+ * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
+ * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
+ * a [[FunctionRegistry]].
+ */
+class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
+  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
+
+  // TODO: pass this in as a parameter.
+  val fixedPoint = FixedPoint(100)
+
+  val batches: Seq[Batch] = Seq(
+    Batch("MultiInstanceRelations", Once,
+      NewRelationInstances),
+    Batch("CaseInsensitiveAttributeReferences", Once,
+      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
+    Batch("Resolution", fixedPoint,
+      ResolveReferences ::
+      ResolveRelations ::
+      NewRelationInstances ::
+      ImplicitGenerate ::
+      StarExpansion ::
+      ResolveFunctions ::
+      GlobalAggregates ::
+      typeCoercionRules :_*)
+  )
+
+  /**
+   * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
+   */
+  object ResolveRelations extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case UnresolvedRelation(databaseName, name, alias) =>
+        catalog.lookupRelation(databaseName, name, alias)
+    }
+  }
+
+  /**
+   * Makes attribute naming case insensitive by turning all UnresolvedAttributes to lowercase.
+   */
+  object LowercaseAttributeReferences extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case UnresolvedRelation(databaseName, name, alias) =>
+        UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))
+      case Subquery(alias, child) => Subquery(alias.toLowerCase, child)
+      case q: LogicalPlan => q transformExpressions {
+        case s: Star => s.copy(table = s.table.map(_.toLowerCase))
+        case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
+        case Alias(c, name) => Alias(c, name.toLowerCase)()
+      }
+    }
+  }
+
+  /**
+   * Replaces [[UnresolvedAttribute]]s with concrete
+   * [[expressions.AttributeReference AttributeReferences]] from a logical plan node's children.
+   */
+  object ResolveReferences extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+      case q: LogicalPlan if q.childrenResolved =>
+        logger.trace(s"Attempting to resolve ${q.simpleString}")
+        q transformExpressions {
+          case u @ UnresolvedAttribute(name) =>
+            // Leave unchanged if resolution fails.  Hopefully will be resolved next round.
+            val result = q.resolve(name).getOrElse(u)
+            logger.debug(s"Resolving $u to $result")
+            result
+        }
+    }
+  }
+
+  /**
+   * Replaces [[UnresolvedFunction]]s with concrete [[expressions.Expression Expressions]].
+   */
+  object ResolveFunctions extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case q: LogicalPlan =>
+        q transformExpressions {
+          case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
+            registry.lookupFunction(name, children)
+        }
+    }
+  }
+
+  /**
+   * Turns projections that contain aggregate expressions into aggregations.
+   */
+  object GlobalAggregates extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case Project(projectList, child) if containsAggregates(projectList) =>
+        Aggregate(Nil, projectList, child)
+    }
+
+    def containsAggregates(exprs: Seq[Expression]): Boolean = {
+      exprs.foreach(_.foreach {
+        case agg: AggregateExpression => return true
+        case _ =>
+      })
+      false
+    }
+  }
+
+  /**
+   * When a SELECT clause has only a single expression and that expression is a
+   * [[catalyst.expressions.Generator Generator]] we convert the
+   * [[catalyst.plans.logical.Project Project]] to a [[catalyst.plans.logical.Generate Generate]].
+   */
+  object ImplicitGenerate extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case Project(Seq(Alias(g: Generator, _)), child) =>
+        Generate(g, join = false, outer = false, None, child)
+    }
+  }
+
+  /**
+   * Expands any references to [[Star]] (*) in project operators.
+   */
+  object StarExpansion extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      // Wait until children are resolved
+      case p: LogicalPlan if !p.childrenResolved => p
+      // If the projection list contains Stars, expand it.
+      case p @ Project(projectList, child) if containsStar(projectList) =>
+        Project(
+          projectList.flatMap {
+            case s: Star => s.expand(child.output)
+            case o => o :: Nil
+          },
+          child)
+      case t: ScriptTransformation if containsStar(t.input) =>
+        t.copy(
+          input = t.input.flatMap {
+            case s: Star => s.expand(t.child.output)
+            case o => o :: Nil
+          }
+        )
+      // If the aggregate function argument contains Stars, expand it.
+      case a: Aggregate if containsStar(a.aggregateExpressions) =>
+        a.copy(
+          aggregateExpressions = a.aggregateExpressions.flatMap {
+            case s: Star => s.expand(a.child.output)
+            case o => o :: Nil
+          }
+        )
+    }
+
+    /**
+     * Returns true if `exprs` contains a [[Star]].
+     */
+    protected def containsStar(exprs: Seq[Expression]): Boolean =
+      exprs.collect { case _: Star => true }.nonEmpty
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
new file mode 100644
index 0000000..71e4dcd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package analysis
+
+import plans.logical.{LogicalPlan, Subquery}
+import scala.collection.mutable
+
+/**
+ * An interface for looking up relations by name.  Used by an [[Analyzer]].
+ */
+trait Catalog {
+  def lookupRelation(
+    databaseName: Option[String],
+    tableName: String,
+    alias: Option[String] = None): LogicalPlan
+
+  def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
+}
+
+class SimpleCatalog extends Catalog {
+  val tables = new mutable.HashMap[String, LogicalPlan]()
+
+  def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
+    tables += ((tableName, plan))
+  }
+
+  def dropTable(tableName: String) = tables -= tableName
+
+  def lookupRelation(
+      databaseName: Option[String],
+      tableName: String,
+      alias: Option[String] = None): LogicalPlan = {
+    val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName"))
+
+    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
+    // properly qualified with this alias.
+    alias.map(a => Subquery(a.toLowerCase, table)).getOrElse(table)
+  }
+}
+
+/**
+ * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with
+ * new logical plans.  This can be used to bind query result to virtual tables, or replace tables
+ * with in-memory cached versions.  Note that the set of overrides is stored in memory and thus
+ * lost when the JVM exits.
+ */
+trait OverrideCatalog extends Catalog {
+
+  // TODO: This doesn't work when the database changes...
+  val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
+
+  abstract override def lookupRelation(
+    databaseName: Option[String],
+    tableName: String,
+    alias: Option[String] = None): LogicalPlan = {
+
+    val overriddenTable = overrides.get((databaseName, tableName))
+
+    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
+    // properly qualified with this alias.
+    val withAlias =
+      overriddenTable.map(r => alias.map(a => Subquery(a.toLowerCase, r)).getOrElse(r))
+
+    withAlias.getOrElse(super.lookupRelation(databaseName, tableName, alias))
+  }
+
+  override def registerTable(
+      databaseName: Option[String],
+      tableName: String,
+      plan: LogicalPlan): Unit = {
+    overrides.put((databaseName, tableName), plan)
+  }
+}
+
+/**
+ * A trivial catalog that returns an error when a relation is requested.  Used for testing when all
+ * relations are already filled in and the analyser needs only to resolve attribute references.
+ */
+object EmptyCatalog extends Catalog {
+  def lookupRelation(
+    databaseName: Option[String],
+    tableName: String,
+    alias: Option[String] = None) = {
+    throw new UnsupportedOperationException
+  }
+
+  def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
new file mode 100644
index 0000000..a359eb5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package analysis
+
+import expressions._
+
+/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
+trait FunctionRegistry {
+  def lookupFunction(name: String, children: Seq[Expression]): Expression
+}
+
+/**
+ * A trivial catalog that returns an error when a function is requested.  Used for testing when all
+ * functions are already filled in and the analyser needs only to resolve attribute references.
+ */
+object EmptyFunctionRegistry extends FunctionRegistry {
+  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
new file mode 100644
index 0000000..a0105cd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package analysis
+
+import expressions._
+import plans.logical._
+import rules._
+import types._
+
+/**
+ * A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
+ * participate in operations into compatible ones.  Most of these rules are based on Hive semantics,
+ * but they do not introduce any dependencies on the hive codebase.  For this reason they remain in
+ * Catalyst until we have a more standard set of coercions.
+ */
+trait HiveTypeCoercion {
+
+  val typeCoercionRules =
+    List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
+      StringToIntegralCasts, FunctionArgumentConversion)
+
+  /**
+   * Applies any changes to [[catalyst.expressions.AttributeReference AttributeReference]] dataTypes
+   * that are made by other rules to instances higher in the query tree.
+   */
+  object PropagateTypes extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      // No propagation required for leaf nodes.
+      case q: LogicalPlan if q.children.isEmpty => q
+
+      // Don't propagate types from unresolved children.
+      case q: LogicalPlan if !q.childrenResolved => q
+
+      case q: LogicalPlan => q transformExpressions {
+        case a: AttributeReference =>
+          q.inputSet.find(_.exprId == a.exprId) match {
+            // This can happen when a Attribute reference is born in a non-leaf node, for example
+            // due to a call to an external script like in the Transform operator.
+            // TODO: Perhaps those should actually be aliases?
+            case None => a
+            // Leave the same if the dataTypes match.
+            case Some(newType) if a.dataType == newType.dataType => a
+            case Some(newType) =>
+              logger.debug(s"Promoting $a to $newType in ${q.simpleString}}")
+              newType
+          }
+      }
+    }
+  }
+
+  /**
+   * Converts string "NaN"s that are in binary operators with a NaN-able types (Float / Double) to
+   * the appropriate numeric equivalent.
+   */
+  object ConvertNaNs extends Rule[LogicalPlan] {
+    val stringNaN = Literal("NaN", StringType)
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case q: LogicalPlan => q transformExpressions {
+        // Skip nodes who's children have not been resolved yet.
+        case e if !e.childrenResolved => e
+
+        /* Double Conversions */
+        case b: BinaryExpression if b.left == stringNaN && b.right.dataType == DoubleType =>
+          b.makeCopy(Array(b.right, Literal(Double.NaN)))
+        case b: BinaryExpression if b.left.dataType == DoubleType && b.right == stringNaN =>
+          b.makeCopy(Array(Literal(Double.NaN), b.left))
+        case b: BinaryExpression if b.left == stringNaN && b.right == stringNaN =>
+          b.makeCopy(Array(Literal(Double.NaN), b.left))
+
+        /* Float Conversions */
+        case b: BinaryExpression if b.left == stringNaN && b.right.dataType == FloatType =>
+          b.makeCopy(Array(b.right, Literal(Float.NaN)))
+        case b: BinaryExpression if b.left.dataType == FloatType && b.right == stringNaN =>
+          b.makeCopy(Array(Literal(Float.NaN), b.left))
+        case b: BinaryExpression if b.left == stringNaN && b.right == stringNaN =>
+          b.makeCopy(Array(Literal(Float.NaN), b.left))
+      }
+    }
+  }
+
+  /**
+   * Widens numeric types and converts strings to numbers when appropriate.
+   *
+   * Loosely based on rules from "Hadoop: The Definitive Guide" 2nd edition, by Tom White
+   *
+   * The implicit conversion rules can be summarized as follows:
+   *   - Any integral numeric type can be implicitly converted to a wider type.
+   *   - All the integral numeric types, FLOAT, and (perhaps surprisingly) STRING can be implicitly
+   *     converted to DOUBLE.
+   *   - TINYINT, SMALLINT, and INT can all be converted to FLOAT.
+   *   - BOOLEAN types cannot be converted to any other type.
+   *
+   * Additionally, all types when UNION-ed with strings will be promoted to strings.
+   * Other string conversions are handled by PromoteStrings.
+   */
+  object WidenTypes extends Rule[LogicalPlan] {
+    // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
+    // The conversion for integral and floating point types have a linear widening hierarchy:
+    val numericPrecedence =
+      Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
+    // Boolean is only wider than Void
+    val booleanPrecedence = Seq(NullType, BooleanType)
+    val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
+
+    def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
+      // Try and find a promotion rule that contains both types in question.
+      val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
+
+      // If found return the widest common type, otherwise None
+      applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
+    }
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case u @ Union(left, right) if u.childrenResolved && !u.resolved =>
+        val castedInput = left.output.zip(right.output).map {
+          // When a string is found on one side, make the other side a string too.
+          case (l, r) if l.dataType == StringType && r.dataType != StringType =>
+            (l, Alias(Cast(r, StringType), r.name)())
+          case (l, r) if l.dataType != StringType && r.dataType == StringType =>
+            (Alias(Cast(l, StringType), l.name)(), r)
+
+          case (l, r) if l.dataType != r.dataType =>
+            logger.debug(s"Resolving mismatched union input ${l.dataType}, ${r.dataType}")
+            findTightestCommonType(l.dataType, r.dataType).map { widestType =>
+              val newLeft =
+                if (l.dataType == widestType) l else Alias(Cast(l, widestType), l.name)()
+              val newRight =
+                if (r.dataType == widestType) r else Alias(Cast(r, widestType), r.name)()
+
+              (newLeft, newRight)
+            }.getOrElse((l, r)) // If there is no applicable conversion, leave expression unchanged.
+          case other => other
+        }
+
+        val (castedLeft, castedRight) = castedInput.unzip
+
+        val newLeft =
+          if (castedLeft.map(_.dataType) != left.output.map(_.dataType)) {
+            logger.debug(s"Widening numeric types in union $castedLeft ${left.output}")
+            Project(castedLeft, left)
+          } else {
+            left
+          }
+
+        val newRight =
+          if (castedRight.map(_.dataType) != right.output.map(_.dataType)) {
+            logger.debug(s"Widening numeric types in union $castedRight ${right.output}")
+            Project(castedRight, right)
+          } else {
+            right
+          }
+
+        Union(newLeft, newRight)
+
+      // Also widen types for BinaryExpressions.
+      case q: LogicalPlan => q transformExpressions {
+        // Skip nodes who's children have not been resolved yet.
+        case e if !e.childrenResolved => e
+
+        case b: BinaryExpression if b.left.dataType != b.right.dataType =>
+          findTightestCommonType(b.left.dataType, b.right.dataType).map { widestType =>
+            val newLeft =
+              if (b.left.dataType == widestType) b.left else Cast(b.left, widestType)
+            val newRight =
+              if (b.right.dataType == widestType) b.right else Cast(b.right, widestType)
+            b.makeCopy(Array(newLeft, newRight))
+          }.getOrElse(b)  // If there is no applicable conversion, leave expression unchanged.
+      }
+    }
+  }
+
+  /**
+   * Promotes strings that appear in arithmetic expressions.
+   */
+  object PromoteStrings extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      // Skip nodes who's children have not been resolved yet.
+      case e if !e.childrenResolved => e
+
+      case a: BinaryArithmetic if a.left.dataType == StringType =>
+        a.makeCopy(Array(Cast(a.left, DoubleType), a.right))
+      case a: BinaryArithmetic if a.right.dataType == StringType =>
+        a.makeCopy(Array(a.left, Cast(a.right, DoubleType)))
+
+      case p: BinaryPredicate if p.left.dataType == StringType && p.right.dataType != StringType =>
+        p.makeCopy(Array(Cast(p.left, DoubleType), p.right))
+      case p: BinaryPredicate if p.left.dataType != StringType && p.right.dataType == StringType =>
+        p.makeCopy(Array(p.left, Cast(p.right, DoubleType)))
+
+      case Sum(e) if e.dataType == StringType =>
+        Sum(Cast(e, DoubleType))
+      case Average(e) if e.dataType == StringType =>
+        Average(Cast(e, DoubleType))
+    }
+  }
+
+  /**
+   * Changes Boolean values to Bytes so that expressions like true < false can be Evaluated.
+   */
+  object BooleanComparisons extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      // Skip nodes who's children have not been resolved yet.
+      case e if !e.childrenResolved => e
+      // No need to change Equals operators as that actually makes sense for boolean types.
+      case e: Equals => e
+      // Otherwise turn them to Byte types so that there exists and ordering.
+      case p: BinaryComparison
+          if p.left.dataType == BooleanType && p.right.dataType == BooleanType =>
+        p.makeCopy(Array(Cast(p.left, ByteType), Cast(p.right, ByteType)))
+    }
+  }
+
+  /**
+   * Casts to/from [[catalyst.types.BooleanType BooleanType]] are transformed into comparisons since
+   * the JVM does not consider Booleans to be numeric types.
+   */
+  object BooleanCasts extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      // Skip nodes who's children have not been resolved yet.
+      case e if !e.childrenResolved => e
+
+      case Cast(e, BooleanType) => Not(Equals(e, Literal(0)))
+      case Cast(e, dataType) if e.dataType == BooleanType =>
+        Cast(If(e, Literal(1), Literal(0)), dataType)
+    }
+  }
+
+  /**
+   * When encountering a cast from a string representing a valid fractional number to an integral
+   * type the jvm will throw a `java.lang.NumberFormatException`.  Hive, in contrast, returns the
+   * truncated version of this number.
+   */
+  object StringToIntegralCasts extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      // Skip nodes who's children have not been resolved yet.
+      case e if !e.childrenResolved => e
+
+      case Cast(e @ StringType(), t: IntegralType) =>
+        Cast(Cast(e, DecimalType), t)
+    }
+  }
+
+  /**
+   * This ensure that the types for various functions are as expected.
+   */
+  object FunctionArgumentConversion extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      // Skip nodes who's children have not been resolved yet.
+      case e if !e.childrenResolved => e
+
+      // Promote SUM to largest types to prevent overflows.
+      case s @ Sum(e @ DecimalType()) => s // Decimal is already the biggest.
+      case Sum(e @ IntegralType()) if e.dataType != LongType => Sum(Cast(e, LongType))
+      case Sum(e @ FractionalType()) if e.dataType != DoubleType => Sum(Cast(e, DoubleType))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
new file mode 100644
index 0000000..fe18cc4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+package analysis
+
+import plans.logical.LogicalPlan
+import rules._
+
+/**
+ * A trait that should be mixed into query operators where an single instance might appear multiple
+ * times in a logical query plan.  It is invalid to have multiple copies of the same attribute
+ * produced by distinct operators in a query tree as this breaks the gurantee that expression
+ * ids, which are used to differentate attributes, are unique.
+ *
+ * Before analysis, all operators that include this trait will be asked to produce a new version
+ * of itself with globally unique expression ids.
+ */
+trait MultiInstanceRelation {
+  def newInstance: this.type
+}
+
+/**
+ * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
+ * that each instance has unique expression ids for the attributes produced.
+ */
+object NewRelationInstances extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    val localRelations = plan collect { case l: MultiInstanceRelation => l}
+    val multiAppearance = localRelations
+      .groupBy(identity[MultiInstanceRelation])
+      .filter { case (_, ls) => ls.size > 1 }
+      .map(_._1)
+      .toSet
+
+    plan transform {
+      case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
new file mode 100644
index 0000000..375c99f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package catalyst
+
+/**
+ * Provides a logical query plan [[Analyzer]] and supporting classes for performing analysis.
+ * Analysis consists of translating [[UnresolvedAttribute]]s and [[UnresolvedRelation]]s
+ * into fully typed objects using information in a schema [[Catalog]].
+ */
+package object analysis

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
new file mode 100644
index 0000000..2ed2af1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package analysis
+
+import expressions._
+import plans.logical.BaseRelation
+import trees.TreeNode
+
+/**
+ * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
+ * resolved.
+ */
+class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) extends
+  errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
+
+/**
+ * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
+ */
+case class UnresolvedRelation(
+    databaseName: Option[String],
+    tableName: String,
+    alias: Option[String] = None) extends BaseRelation {
+  def output = Nil
+  override lazy val resolved = false
+}
+
+/**
+ * Holds the name of an attribute that has yet to be resolved.
+ */
+case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNode[Expression] {
+  def exprId = throw new UnresolvedException(this, "exprId")
+  def dataType = throw new UnresolvedException(this, "dataType")
+  def nullable = throw new UnresolvedException(this, "nullable")
+  def qualifiers = throw new UnresolvedException(this, "qualifiers")
+  override lazy val resolved = false
+
+  def newInstance = this
+  def withQualifiers(newQualifiers: Seq[String]) = this
+
+  override def toString: String = s"'$name"
+}
+
+case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
+  def exprId = throw new UnresolvedException(this, "exprId")
+  def dataType = throw new UnresolvedException(this, "dataType")
+  override def foldable = throw new UnresolvedException(this, "foldable")
+  def nullable = throw new UnresolvedException(this, "nullable")
+  def qualifiers = throw new UnresolvedException(this, "qualifiers")
+  def references = children.flatMap(_.references).toSet
+  override lazy val resolved = false
+  override def toString = s"'$name(${children.mkString(",")})"
+}
+
+/**
+ * Represents all of the input attributes to a given relational operator, for example in
+ * "SELECT * FROM ...".
+ *
+ * @param table an optional table that should be the target of the expansion.  If omitted all
+ *              tables' columns are produced.
+ */
+case class Star(
+    table: Option[String],
+    mapFunction: Attribute => Expression = identity[Attribute])
+  extends Attribute with trees.LeafNode[Expression] {
+
+  def name = throw new UnresolvedException(this, "exprId")
+  def exprId = throw new UnresolvedException(this, "exprId")
+  def dataType = throw new UnresolvedException(this, "dataType")
+  def nullable = throw new UnresolvedException(this, "nullable")
+  def qualifiers = throw new UnresolvedException(this, "qualifiers")
+  override lazy val resolved = false
+
+  def newInstance = this
+  def withQualifiers(newQualifiers: Seq[String]) = this
+
+  def expand(input: Seq[Attribute]): Seq[NamedExpression] = {
+    val expandedAttributes: Seq[Attribute] = table match {
+      // If there is no table specified, use all input attributes.
+      case None => input
+      // If there is a table, pick out attributes that are part of this table.
+      case Some(table) => input.filter(_.qualifiers contains table)
+    }
+    val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map {
+      case (n: NamedExpression, _) => n
+      case (e, originalAttribute) =>
+        Alias(e, originalAttribute.name)(qualifiers = originalAttribute.qualifiers)
+    }
+    mappedAttributes
+  }
+
+  override def toString = table.map(_ + ".").getOrElse("") + "*"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
new file mode 100644
index 0000000..cd8de9d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import analysis.UnresolvedAttribute
+import expressions._
+import plans._
+import plans.logical._
+import types._
+
+/**
+ * Provides experimental support for generating catalyst schemas for scala objects.
+ */
+object ScalaReflection {
+  import scala.reflect.runtime.universe._
+
+  /** Returns a Sequence of attributes for the given case class type. */
+  def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
+    case s: StructType =>
+      s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
+  }
+
+  /** Returns a catalyst DataType for the given Scala Type using reflection. */
+  def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T])
+
+  /** Returns a catalyst DataType for the given Scala Type using reflection. */
+  def schemaFor(tpe: `Type`): DataType = tpe match {
+    case t if t <:< typeOf[Product] =>
+      val params = t.member("<init>": TermName).asMethod.paramss
+      StructType(
+        params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true)))
+    case t if t <:< typeOf[Seq[_]] =>
+      val TypeRef(_, _, Seq(elementType)) = t
+      ArrayType(schemaFor(elementType))
+    case t if t <:< typeOf[String] => StringType
+    case t if t <:< definitions.IntTpe => IntegerType
+    case t if t <:< definitions.LongTpe => LongType
+    case t if t <:< definitions.DoubleTpe => DoubleType
+    case t if t <:< definitions.ShortTpe => ShortType
+    case t if t <:< definitions.ByteTpe => ByteType
+  }
+
+  implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {
+
+    /**
+     * Implicitly added to Sequences of case class objects.  Returns a catalyst logical relation
+     * for the the data in the sequence.
+     */
+    def asRelation: LocalRelation = {
+      val output = attributesFor[A]
+      LocalRelation(output, data)
+    }
+  }
+}
+
+/**
+ * A collection of implicit conversions that create a DSL for constructing catalyst data structures.
+ *
+ * {{{
+ *  scala> import catalyst.dsl._
+ *
+ *  // Standard operators are added to expressions.
+ *  scala> Literal(1) + Literal(1)
+ *  res1: catalyst.expressions.Add = (1 + 1)
+ *
+ *  // There is a conversion from 'symbols to unresolved attributes.
+ *  scala> 'a.attr
+ *  res2: catalyst.analysis.UnresolvedAttribute = 'a
+ *
+ *  // These unresolved attributes can be used to create more complicated expressions.
+ *  scala> 'a === 'b
+ *  res3: catalyst.expressions.Equals = ('a = 'b)
+ *
+ *  // SQL verbs can be used to construct logical query plans.
+ *  scala> TestRelation('key.int, 'value.string).where('key === 1).select('value).analyze
+ *  res4: catalyst.plans.logical.LogicalPlan =
+ *  Project {value#1}
+ *   Filter (key#0 = 1)
+ *    TestRelation {key#0,value#1}
+ * }}}
+ */
+package object dsl {
+  trait ImplicitOperators {
+    def expr: Expression
+
+    def + (other: Expression) = Add(expr, other)
+    def - (other: Expression) = Subtract(expr, other)
+    def * (other: Expression) = Multiply(expr, other)
+    def / (other: Expression) = Divide(expr, other)
+
+    def && (other: Expression) = And(expr, other)
+    def || (other: Expression) = Or(expr, other)
+
+    def < (other: Expression) = LessThan(expr, other)
+    def <= (other: Expression) = LessThanOrEqual(expr, other)
+    def > (other: Expression) = GreaterThan(expr, other)
+    def >= (other: Expression) = GreaterThanOrEqual(expr, other)
+    def === (other: Expression) = Equals(expr, other)
+    def != (other: Expression) = Not(Equals(expr, other))
+
+    def asc = SortOrder(expr, Ascending)
+    def desc = SortOrder(expr, Descending)
+
+    def as(s: Symbol) = Alias(expr, s.name)()
+  }
+
+  trait ExpressionConversions {
+    implicit class DslExpression(e: Expression) extends ImplicitOperators {
+      def expr = e
+    }
+
+    implicit def intToLiteral(i: Int) = Literal(i)
+    implicit def longToLiteral(l: Long) = Literal(l)
+    implicit def floatToLiteral(f: Float) = Literal(f)
+    implicit def doubleToLiteral(d: Double) = Literal(d)
+    implicit def stringToLiteral(s: String) = Literal(s)
+
+    implicit def symbolToUnresolvedAttribute(s: Symbol) = analysis.UnresolvedAttribute(s.name)
+
+    implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s = sym.name }
+    implicit class DslString(val s: String) extends ImplicitAttribute
+
+    abstract class ImplicitAttribute extends ImplicitOperators {
+      def s: String
+      def expr = attr
+      def attr = analysis.UnresolvedAttribute(s)
+
+      /** Creates a new typed attributes of type int */
+      def int = AttributeReference(s, IntegerType, nullable = false)()
+
+      /** Creates a new typed attributes of type string */
+      def string = AttributeReference(s, StringType, nullable = false)()
+    }
+
+    implicit class DslAttribute(a: AttributeReference) {
+      def notNull = a.withNullability(false)
+      def nullable = a.withNullability(true)
+
+      // Protobuf terminology
+      def required = a.withNullability(false)
+    }
+  }
+
+
+  object expressions extends ExpressionConversions  // scalastyle:ignore
+
+  abstract class LogicalPlanFunctions {
+    def logicalPlan: LogicalPlan
+
+    def select(exprs: NamedExpression*) = Project(exprs, logicalPlan)
+
+    def where(condition: Expression) = Filter(condition, logicalPlan)
+
+    def join(
+        otherPlan: LogicalPlan,
+        joinType: JoinType = Inner,
+        condition: Option[Expression] = None) =
+      Join(logicalPlan, otherPlan, joinType, condition)
+
+    def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
+
+    def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
+      val aliasedExprs = aggregateExprs.map {
+        case ne: NamedExpression => ne
+        case e => Alias(e, e.toString)()
+      }
+      Aggregate(groupingExprs, aliasedExprs, logicalPlan)
+    }
+
+    def subquery(alias: Symbol) = Subquery(alias.name, logicalPlan)
+
+    def unionAll(otherPlan: LogicalPlan) = Union(logicalPlan, otherPlan)
+
+    def sfilter[T1](arg1: Symbol)(udf: (T1) => Boolean) =
+      Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan)
+
+    def sfilter(dynamicUdf: (DynamicRow) => Boolean) =
+      Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(logicalPlan.output))), logicalPlan)
+
+    def sample(
+        fraction: Double,
+        withReplacement: Boolean = true,
+        seed: Int = (math.random * 1000).toInt) =
+      Sample(fraction, withReplacement, seed, logicalPlan)
+
+    def generate(
+        generator: Generator,
+        join: Boolean = false,
+        outer: Boolean = false,
+        alias: Option[String] = None) =
+      Generate(generator, join, outer, None, logicalPlan)
+
+    def insertInto(tableName: String, overwrite: Boolean = false) =
+      InsertIntoTable(
+        analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)
+
+    def analyze = analysis.SimpleAnalyzer(logicalPlan)
+  }
+
+  object plans {  // scalastyle:ignore
+    implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions {
+      def writeToFile(path: String) = WriteToFile(path, logicalPlan)
+    }
+  }
+}