You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/14 01:09:53 UTC
[2/2] git commit: [SQL] Update SparkSQL and ScalaTest in branch-1.0
to match master.
[SQL] Update SparkSQL and ScalaTest in branch-1.0 to match master.
#511 and #863 got left out of branch-1.0 since we were really close to the release. Now that they have been tested a little I see no reason to leave them out.
Author: Michael Armbrust <mi...@databricks.com>
Author: witgo <wi...@qq.com>
Closes #1078 from marmbrus/branch-1.0 and squashes the following commits:
22be674 [witgo] [SPARK-1841]: update scalatest to version 2.1.5
fc8fc79 [Michael Armbrust] Include #1071 as well.
c5d0adf [Michael Armbrust] Update SparkSQL in branch-1.0 to match master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e3e9afd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e3e9afd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e3e9afd
Branch: refs/heads/branch-1.0
Commit: 7e3e9afdb0d89d3d9636e37da6413806d6dc611c
Parents: 00b4317
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Jun 13 16:09:47 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Jun 13 16:09:47 2014 -0700
----------------------------------------------------------------------
core/pom.xml | 2 +-
.../org/apache/spark/ContextCleanerSuite.scala | 6 +-
.../org/apache/spark/ShuffleNettySuite.scala | 4 +-
.../scala/org/apache/spark/rdd/RDDSuite.scala | 5 +-
.../spark/scheduler/DAGSchedulerSuite.scala | 4 +-
.../spark/util/TimeStampedHashMapSuite.scala | 3 +-
pom.xml | 15 +-
project/SparkBuild.scala | 22 +-
.../scala/org/apache/spark/repl/ReplSuite.scala | 6 +-
.../sql/catalyst/plans/logical/commands.scala | 18 +-
.../optimizer/FilterPushdownSuite.scala | 2 +-
.../scala/org/apache/spark/sql/SQLContext.scala | 45 +-
.../scala/org/apache/spark/sql/SchemaRDD.scala | 2 +-
.../org/apache/spark/sql/SchemaRDDLike.scala | 15 +-
.../spark/sql/api/java/JavaSchemaRDD.scala | 2 +-
.../spark/sql/execution/SparkStrategies.scala | 38 +-
.../apache/spark/sql/execution/commands.scala | 81 ++--
.../spark/sql/parquet/ParquetFilters.scala | 436 +++++++++++++++++++
.../sql/parquet/ParquetTableOperations.scala | 90 +++-
.../spark/sql/parquet/ParquetTestData.scala | 90 +++-
.../org/apache/spark/sql/SQLQuerySuite.scala | 16 +-
.../spark/sql/parquet/ParquetQuerySuite.scala | 145 +++++-
.../org/apache/spark/sql/hive/HiveContext.scala | 55 +--
.../apache/spark/sql/hive/HiveStrategies.scala | 9 +
.../org/apache/spark/sql/hive/TestHive.scala | 1 -
.../sql/hive/execution/hiveOperators.scala | 32 +-
.../sql/hive/execution/HiveComparisonTest.scala | 3 +-
.../hive/execution/HiveCompatibilitySuite.scala | 9 +-
.../sql/hive/execution/HiveQuerySuite.scala | 125 ++++--
.../spark/streaming/BasicOperationsSuite.scala | 12 +-
30 files changed, 1039 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index bd739e5..ead33cd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -235,7 +235,7 @@
</dependency>
<dependency>
<groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
+ <artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 5a83100..dc2db66 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.util.Random
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.{PatienceConfiguration, Eventually}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -76,7 +76,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
tester.assertCleanup()
// Verify that shuffles can be re-executed after cleaning up
- assert(rdd.collect().toList === collected)
+ assert(rdd.collect().toList.equals(collected))
}
test("cleanup broadcast") {
@@ -285,7 +285,7 @@ class CleanerTester(
sc.cleaner.get.attachListener(cleanerListener)
/** Assert that all the stuff has been cleaned up */
- def assertCleanup()(implicit waitTimeout: Eventually.Timeout) {
+ def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
assert(isAllCleanedUp)
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 29d428a..47df000 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -23,11 +23,11 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
- override def beforeAll(configMap: Map[String, Any]) {
+ override def beforeAll() {
System.setProperty("spark.shuffle.use.netty", "true")
}
- override def afterAll(configMap: Map[String, Any]) {
+ override def afterAll() {
System.setProperty("spark.shuffle.use.netty", "false")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index fdbed45..87bfce3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -275,8 +275,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
- assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
- null)
+ val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
+ asInstanceOf[ShuffledRDD[_, _, _]] != null
+ assert(isEquals)
// when shuffling, we can increase the number of partitions
val coalesced6 = data.coalesce(20, shuffle = true)
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d172dd1..7e901f8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,7 +23,7 @@ import scala.language.reflectiveCalls
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.{BeforeAndAfter, FunSuiteLike}
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -37,7 +37,7 @@ class BuggyDAGEventProcessActor extends Actor {
}
}
-class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
+class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {
val conf = new SparkConf
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
index 6a5653e..c1c605c 100644
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
@@ -105,7 +105,8 @@ class TimeStampedHashMapSuite extends FunSuite {
map("k1") = strongRef
map("k2") = "v2"
map("k3") = "v3"
- assert(map("k1") === strongRef)
+ val isEquals = map("k1") == strongRef
+ assert(isEquals)
// clear strong reference to "k1"
strongRef = null
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c4a1b50..1e464a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -458,25 +458,31 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
- <version>1.9.1</version>
+ <version>2.1.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
+ <artifactId>easymockclassextension</artifactId>
<version>3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
- <version>1.8.5</version>
+ <version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
- <version>1.10.0</version>
+ <version>1.11.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -778,6 +784,7 @@
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
+ <arg>-language:postfixOps</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index deafbc5..c0e3bba 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -270,16 +270,17 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
- "io.netty" % "netty-all" % "4.0.17.Final",
- "org.eclipse.jetty" % "jetty-server" % jettyVersion,
- "org.eclipse.jetty" % "jetty-util" % jettyVersion,
- "org.eclipse.jetty" % "jetty-plus" % jettyVersion,
- "org.eclipse.jetty" % "jetty-security" % jettyVersion,
- "org.scalatest" %% "scalatest" % "1.9.1" % "test",
- "org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
- "com.novocode" % "junit-interface" % "0.10" % "test",
- "org.easymock" % "easymock" % "3.1" % "test",
- "org.mockito" % "mockito-all" % "1.8.5" % "test"
+ "io.netty" % "netty-all" % "4.0.17.Final",
+ "org.eclipse.jetty" % "jetty-server" % jettyVersion,
+ "org.eclipse.jetty" % "jetty-util" % jettyVersion,
+ "org.eclipse.jetty" % "jetty-plus" % jettyVersion,
+ "org.eclipse.jetty" % "jetty-security" % jettyVersion,
+ "org.scalatest" %% "scalatest" % "2.1.5" % "test",
+ "org.scalacheck" %% "scalacheck" % "1.11.3" % "test",
+ "com.novocode" % "junit-interface" % "0.10" % "test",
+ "org.easymock" % "easymockclassextension" % "3.1" % "test",
+ "org.mockito" % "mockito-all" % "1.9.0" % "test",
+ "junit" % "junit" % "4.10" % "test"
),
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
@@ -476,7 +477,6 @@ object SparkBuild extends Build {
// this non-deterministically. TODO: FIX THIS.
parallelExecution in Test := false,
libraryDependencies ++= Seq(
- "org.scalatest" %% "scalatest" % "1.9.1" % "test",
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1"
)
)
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 95460aa..95e1793 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -57,12 +57,14 @@ class ReplSuite extends FunSuite {
}
def assertContains(message: String, output: String) {
- assert(output.contains(message),
+ val isContain = output.contains(message)
+ assert(isContain,
"Interpreter output did not contain '" + message + "':\n" + output)
}
def assertDoesNotContain(message: String, output: String) {
- assert(!output.contains(message),
+ val isContain = output.contains(message)
+ assert(!isContain,
"Interpreter output contained '" + message + "':\n" + output)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index d05c965..3299e86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType
/**
@@ -26,23 +26,25 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
- def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
+ def output: Seq[Attribute] = Seq.empty
}
/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
-case class NativeCommand(cmd: String) extends Command
+case class NativeCommand(cmd: String) extends Command {
+ override def output =
+ Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
+}
/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
- AttributeReference("key", StringType, nullable = false)(),
- AttributeReference("value", StringType, nullable = false)()
- )
+ BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
+ BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}
/**
@@ -50,11 +52,11 @@ case class SetCommand(key: Option[String], value: Option[String]) extends Comman
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
- override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
+ override def output =
+ Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}
/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
-
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0cada78..1f67c80 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {
comparePlans(optimized, correctAnswer)
}
-
+
test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 38fc6b4..378ff54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.columnar.InMemoryRelation
@@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
- def sql(sqlText: String): SchemaRDD = {
- val result = new SchemaRDD(this, parseSql(sqlText))
- // We force query optimization to happen right away instead of letting it happen lazily like
- // when using the query DSL. This is so DDL commands behave as expected. This is only
- // generates the RDD lineage for DML queries, but do not perform any execution.
- result.queryExecution.toRdd
- result
- }
+ def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
@@ -220,17 +213,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
+ * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
+ * away by the filter pushdown optimization.
+ *
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
+ prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
val projectSet = projectList.flatMap(_.references).toSet
val filterSet = filterPredicates.flatMap(_.references).toSet
- val filterCondition = filterPredicates.reduceLeftOption(And)
+ val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)
// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
@@ -255,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val planner = new SparkPlanner
@transient
- protected[sql] lazy val emptyResult =
- sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
+ protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)
/**
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
@@ -276,22 +272,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan
- def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
- case SetCommand(key, value) =>
- // Only this case needs to be executed eagerly. The other cases will
- // be taken care of when the actual results are being extracted.
- // In the case of HiveContext, sqlConf is overridden to also pass the
- // pair into its HiveConf.
- if (key.isDefined && value.isDefined) {
- set(key.get, value.get)
- }
- // It doesn't matter what we return here, since this is only used
- // to force the evaluation to happen eagerly. To query the results,
- // one must use SchemaRDD operations to extract them.
- emptyResult
- case _ => executedPlan.execute()
- }
-
lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
@@ -299,12 +279,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
- lazy val toRdd: RDD[Row] = {
- logical match {
- case s: SetCommand => eagerlyProcess(s)
- case _ => executedPlan.execute()
- }
- }
+ lazy val toRdd: RDD[Row] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
@@ -326,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* TODO: We only support primitive types, add support for nested types.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
- val schema = rdd.first.map { case (fieldName, obj) =>
+ val schema = rdd.first().map { case (fieldName, obj) =>
val dataType = obj.getClass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 7ad8edf..821ac85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -97,7 +97,7 @@ import java.util.{Map => JMap}
@AlphaComponent
class SchemaRDD(
@transient val sqlContext: SQLContext,
- @transient protected[spark] val logicalPlan: LogicalPlan)
+ @transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
def baseSchemaRDD = this
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 3a895e1..656be96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -20,13 +20,14 @@ package org.apache.spark.sql
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkLogicalPlan
/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
*/
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
- @transient protected[spark] val logicalPlan: LogicalPlan
+ @transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD
@@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike {
*/
@transient
@DeveloperApi
- lazy val queryExecution = sqlContext.executePlan(logicalPlan)
+ lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
+
+ @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
+ // For various commands (like DDL) and queries with side effects, we force query optimization to
+ // happen right away to let these side effects take place eagerly.
+ case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
+ queryExecution.toRdd
+ SparkLogicalPlan(queryExecution.executedPlan)
+ case _ =>
+ baseLogicalPlan
+ }
override def toString =
s"""${super.toString}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
index 22f57b7..aff6ffe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
*/
class JavaSchemaRDD(
@transient val sqlContext: SQLContext,
- @transient protected[spark] val logicalPlan: LogicalPlan)
+ @transient val baseLogicalPlan: LogicalPlan)
extends JavaRDDLike[Row, JavaRDD[Row]]
with SchemaRDDLike {
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 1039be5..2233216 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.{SQLConf, SQLContext, execution}
+import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
@@ -157,12 +157,36 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
- case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
- // TODO: Should be pushing down filters as well.
+ case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
+ val prunePushedDownFilters =
+ if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
+ (filters: Seq[Expression]) => {
+ filters.filter { filter =>
+ // Note: filters cannot be pushed down to Parquet if they contain more complex
+ // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
+ // all filters that have been pushed down. Note that a predicate such as
+ // "(A AND B) OR C" can result in "A OR C" being pushed down.
+ val recordFilter = ParquetFilters.createFilter(filter)
+ if (!recordFilter.isDefined) {
+ // First case: the pushdown did not result in any record filter.
+ true
+ } else {
+ // Second case: a record filter was created; here we are conservative in
+ // the sense that even if "A" was pushed and we check for "A AND B" we
+ // still want to keep "A AND B" in the higher-level filter, not just "B".
+ !ParquetFilters.findExpression(recordFilter.get, filter).isDefined
+ }
+ }
+ }
+ } else {
+ identity[Seq[Expression]] _
+ }
pruneFilterProject(
projectList,
filters,
- ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
+ prunePushedDownFilters,
+ ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
+
case _ => Nil
}
}
@@ -225,12 +249,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.SetCommand(key, value) =>
- Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
+ Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
val executedPlan = context.executePlan(child).executedPlan
- Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
+ Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
case logical.CacheCommand(tableName, cache) =>
- Seq(execution.CacheCommandPhysical(tableName, cache)(context))
+ Seq(execution.CacheCommand(tableName, cache)(context))
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index be26d19..0377290 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
+trait Command {
+ /**
+ * A concrete command should override this lazy field to wrap up any side effects caused by the
+ * command or any other computation that should be evaluated exactly once. The value of this field
+ * can be used as the contents of the corresponding RDD generated from the physical plan of this
+ * command.
+ *
+ * The `execute()` method of all the physical command classes should reference `sideEffectResult`
+ * so that the command can be executed eagerly right after the command query is created.
+ */
+ protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
+}
+
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
- (@transient context: SQLContext) extends LeafNode {
- def execute(): RDD[Row] = (key, value) match {
- // Set value for key k; the action itself would
- // have been performed in QueryExecution eagerly.
- case (Some(k), Some(v)) => context.emptyResult
+case class SetCommand(
+ key: Option[String], value: Option[String], output: Seq[Attribute])(
+ @transient context: SQLContext)
+ extends LeafNode with Command {
+
+ override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
+ // Set value for key k.
+ case (Some(k), Some(v)) =>
+ context.set(k, v)
+ Array(k -> v)
+
// Query the value bound to key k.
- case (Some(k), None) =>
- val resultString = context.getOption(k) match {
- case Some(v) => s"$k=$v"
- case None => s"$k is undefined"
- }
- context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
+ case (Some(k), _) =>
+ Array(k -> context.getOption(k).getOrElse("<undefined>"))
+
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
- val pairs = context.getAll
- val rows = pairs.map { case (k, v) =>
- new GenericRow(Array[Any](s"$k=$v"))
- }.toSeq
- // Assume config parameters can fit into one split (machine) ;)
- context.sparkContext.parallelize(rows, 1)
- // The only other case is invalid semantics and is impossible.
- case _ => context.emptyResult
+ context.getAll
+
+ case _ =>
+ throw new IllegalArgumentException()
}
+
+ def execute(): RDD[Row] = {
+ val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
+ context.sparkContext.parallelize(rows, 1)
+ }
+
+ override def otherCopyArgs = context :: Nil
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
- (@transient context: SQLContext) extends UnaryNode {
+case class ExplainCommand(
+ child: SparkPlan, output: Seq[Attribute])(
+ @transient context: SQLContext)
+ extends UnaryNode with Command {
+
+ // Actually "EXPLAIN" command doesn't cause any side effect.
+ override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")
+
def execute(): RDD[Row] = {
- val planString = new GenericRow(Array[Any](child.toString))
- context.sparkContext.parallelize(Seq(planString))
+ val explanation = sideEffectResult.mkString("\n")
+ context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
}
override def otherCopyArgs = context :: Nil
@@ -70,19 +94,20 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
- extends LeafNode {
+case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+ extends LeafNode with Command {
- lazy val commandSideEffect = {
+ override protected[sql] lazy val sideEffectResult = {
if (doCache) {
context.cacheTable(tableName)
} else {
context.uncacheTable(tableName)
}
+ Seq.empty[Any]
}
override def execute(): RDD[Row] = {
- commandSideEffect
+ sideEffectResult
context.emptyResult
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
new file mode 100644
index 0000000..052b0a9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.hadoop.conf.Configuration
+
+import parquet.filter._
+import parquet.filter.ColumnPredicates._
+import parquet.column.ColumnReader
+
+import com.google.common.io.BaseEncoding
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+object ParquetFilters {
+ val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
+ // set this to false if pushdown should be disabled
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
+
+ def createRecordFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = {
+ val filters: Seq[CatalystFilter] = filterExpressions.collect {
+ case (expression: Expression) if createFilter(expression).isDefined =>
+ createFilter(expression).get
+ }
+ if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null
+ }
+
+ def createFilter(expression: Expression): Option[CatalystFilter] = {
+ def createEqualityFilter(
+ name: String,
+ literal: Literal,
+ predicate: CatalystPredicate) = literal.dataType match {
+ case BooleanType =>
+ ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean], predicate)
+ case IntegerType =>
+ ComparisonFilter.createIntFilter(
+ name,
+ (x: Int) => x == literal.value.asInstanceOf[Int],
+ predicate)
+ case LongType =>
+ ComparisonFilter.createLongFilter(
+ name,
+ (x: Long) => x == literal.value.asInstanceOf[Long],
+ predicate)
+ case DoubleType =>
+ ComparisonFilter.createDoubleFilter(
+ name,
+ (x: Double) => x == literal.value.asInstanceOf[Double],
+ predicate)
+ case FloatType =>
+ ComparisonFilter.createFloatFilter(
+ name,
+ (x: Float) => x == literal.value.asInstanceOf[Float],
+ predicate)
+ case StringType =>
+ ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String], predicate)
+ }
+ def createLessThanFilter(
+ name: String,
+ literal: Literal,
+ predicate: CatalystPredicate) = literal.dataType match {
+ case IntegerType =>
+ ComparisonFilter.createIntFilter(
+ name,
+ (x: Int) => x < literal.value.asInstanceOf[Int],
+ predicate)
+ case LongType =>
+ ComparisonFilter.createLongFilter(
+ name,
+ (x: Long) => x < literal.value.asInstanceOf[Long],
+ predicate)
+ case DoubleType =>
+ ComparisonFilter.createDoubleFilter(
+ name,
+ (x: Double) => x < literal.value.asInstanceOf[Double],
+ predicate)
+ case FloatType =>
+ ComparisonFilter.createFloatFilter(
+ name,
+ (x: Float) => x < literal.value.asInstanceOf[Float],
+ predicate)
+ }
+ def createLessThanOrEqualFilter(
+ name: String,
+ literal: Literal,
+ predicate: CatalystPredicate) = literal.dataType match {
+ case IntegerType =>
+ ComparisonFilter.createIntFilter(
+ name,
+ (x: Int) => x <= literal.value.asInstanceOf[Int],
+ predicate)
+ case LongType =>
+ ComparisonFilter.createLongFilter(
+ name,
+ (x: Long) => x <= literal.value.asInstanceOf[Long],
+ predicate)
+ case DoubleType =>
+ ComparisonFilter.createDoubleFilter(
+ name,
+ (x: Double) => x <= literal.value.asInstanceOf[Double],
+ predicate)
+ case FloatType =>
+ ComparisonFilter.createFloatFilter(
+ name,
+ (x: Float) => x <= literal.value.asInstanceOf[Float],
+ predicate)
+ }
+ // TODO: combine these two types somehow?
+ def createGreaterThanFilter(
+ name: String,
+ literal: Literal,
+ predicate: CatalystPredicate) = literal.dataType match {
+ case IntegerType =>
+ ComparisonFilter.createIntFilter(
+ name,
+ (x: Int) => x > literal.value.asInstanceOf[Int],
+ predicate)
+ case LongType =>
+ ComparisonFilter.createLongFilter(
+ name,
+ (x: Long) => x > literal.value.asInstanceOf[Long],
+ predicate)
+ case DoubleType =>
+ ComparisonFilter.createDoubleFilter(
+ name,
+ (x: Double) => x > literal.value.asInstanceOf[Double],
+ predicate)
+ case FloatType =>
+ ComparisonFilter.createFloatFilter(
+ name,
+ (x: Float) => x > literal.value.asInstanceOf[Float],
+ predicate)
+ }
+ def createGreaterThanOrEqualFilter(
+ name: String,
+ literal: Literal,
+ predicate: CatalystPredicate) = literal.dataType match {
+ case IntegerType =>
+ ComparisonFilter.createIntFilter(
+ name, (x: Int) => x >= literal.value.asInstanceOf[Int],
+ predicate)
+ case LongType =>
+ ComparisonFilter.createLongFilter(
+ name,
+ (x: Long) => x >= literal.value.asInstanceOf[Long],
+ predicate)
+ case DoubleType =>
+ ComparisonFilter.createDoubleFilter(
+ name,
+ (x: Double) => x >= literal.value.asInstanceOf[Double],
+ predicate)
+ case FloatType =>
+ ComparisonFilter.createFloatFilter(
+ name,
+ (x: Float) => x >= literal.value.asInstanceOf[Float],
+ predicate)
+ }
+
+ /**
+ * TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until
+ * https://github.com/Parquet/parquet-mr/issues/371
+ * has been resolved.
+ */
+ expression match {
+ case p @ Or(left: Expression, right: Expression)
+ if createFilter(left).isDefined && createFilter(right).isDefined => {
+ // If either side of this Or-predicate is empty then this means
+ // it contains a more complex comparison than between attribute and literal
+ // (e.g., it contained a CAST). The only safe thing to do is then to disregard
+ // this disjunction, which could be contained in a conjunction. If it stands
+ // alone then it is also safe to drop it, since a Null return value of this
+ // function is interpreted as having no filters at all.
+ val leftFilter = createFilter(left).get
+ val rightFilter = createFilter(right).get
+ Some(new OrFilter(leftFilter, rightFilter))
+ }
+ case p @ And(left: Expression, right: Expression) => {
+ // This treats nested conjunctions; since either side of the conjunction
+ // may contain more complex filter expressions we may actually generate
+ // strictly weaker filter predicates in the process.
+ val leftFilter = createFilter(left)
+ val rightFilter = createFilter(right)
+ (leftFilter, rightFilter) match {
+ case (None, Some(filter)) => Some(filter)
+ case (Some(filter), None) => Some(filter)
+ case (_, _) =>
+ Some(new AndFilter(leftFilter.get, rightFilter.get))
+ }
+ }
+ case p @ Equals(left: Literal, right: NamedExpression) if !right.nullable =>
+ Some(createEqualityFilter(right.name, left, p))
+ case p @ Equals(left: NamedExpression, right: Literal) if !left.nullable =>
+ Some(createEqualityFilter(left.name, right, p))
+ case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
+ Some(createLessThanFilter(right.name, left, p))
+ case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
+ Some(createLessThanFilter(left.name, right, p))
+ case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
+ Some(createLessThanOrEqualFilter(right.name, left, p))
+ case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
+ Some(createLessThanOrEqualFilter(left.name, right, p))
+ case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
+ Some(createGreaterThanFilter(right.name, left, p))
+ case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
+ Some(createGreaterThanFilter(left.name, right, p))
+ case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
+ Some(createGreaterThanOrEqualFilter(right.name, left, p))
+ case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
+ Some(createGreaterThanOrEqualFilter(left.name, right, p))
+ case _ => None
+ }
+ }
+
+ /**
+ * Note: Inside the Hadoop API we only have access to `Configuration`, not to
+ * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
+ * the actual filter predicate.
+ */
+ def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
+ if (filters.length > 0) {
+ val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
+ val encoded: String = BaseEncoding.base64().encode(serialized)
+ conf.set(PARQUET_FILTER_DATA, encoded)
+ }
+ }
+
+ /**
+ * Note: Inside the Hadoop API we only have access to `Configuration`, not to
+ * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
+ * the actual filter predicate.
+ */
+ def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = {
+ val data = conf.get(PARQUET_FILTER_DATA)
+ if (data != null) {
+ val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
+ SparkSqlSerializer.deserialize(decoded)
+ } else {
+ Seq()
+ }
+ }
+
+ /**
+ * Try to find the given expression in the tree of filters in order to
+ * determine whether it is safe to remove it from the higher level filters. Note
+ * that strictly speaking we could stop the search whenever an expression is found
+ * that contains this expression as subexpression (e.g., when searching for "a"
+ * and "(a or c)" is found) but we don't care about optimizations here since the
+ * filter tree is assumed to be small.
+ *
+ * @param filter The [[org.apache.spark.sql.parquet.CatalystFilter]] to expand
+ * and search
+ * @param expression The expression to look for
+ * @return An optional [[org.apache.spark.sql.parquet.CatalystFilter]] that
+ * contains the expression.
+ */
+ def findExpression(
+ filter: CatalystFilter,
+ expression: Expression): Option[CatalystFilter] = filter match {
+ case f @ OrFilter(_, leftFilter, rightFilter, _) =>
+ if (f.predicate == expression) {
+ Some(f)
+ } else {
+ val left = findExpression(leftFilter, expression)
+ if (left.isDefined) left else findExpression(rightFilter, expression)
+ }
+ case f @ AndFilter(_, leftFilter, rightFilter, _) =>
+ if (f.predicate == expression) {
+ Some(f)
+ } else {
+ val left = findExpression(leftFilter, expression)
+ if (left.isDefined) left else findExpression(rightFilter, expression)
+ }
+ case f @ ComparisonFilter(_, _, predicate) =>
+ if (predicate == expression) Some(f) else None
+ case _ => None
+ }
+}
+
+abstract private[parquet] class CatalystFilter(
+ @transient val predicate: CatalystPredicate) extends UnboundRecordFilter
+
+private[parquet] case class ComparisonFilter(
+ val columnName: String,
+ private var filter: UnboundRecordFilter,
+ @transient override val predicate: CatalystPredicate)
+ extends CatalystFilter(predicate) {
+ override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
+ filter.bind(readers)
+ }
+}
+
+private[parquet] case class OrFilter(
+ private var filter: UnboundRecordFilter,
+ @transient val left: CatalystFilter,
+ @transient val right: CatalystFilter,
+ @transient override val predicate: Or)
+ extends CatalystFilter(predicate) {
+ def this(l: CatalystFilter, r: CatalystFilter) =
+ this(
+ OrRecordFilter.or(l, r),
+ l,
+ r,
+ Or(l.predicate, r.predicate))
+
+ override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
+ filter.bind(readers)
+ }
+}
+
+private[parquet] case class AndFilter(
+ private var filter: UnboundRecordFilter,
+ @transient val left: CatalystFilter,
+ @transient val right: CatalystFilter,
+ @transient override val predicate: And)
+ extends CatalystFilter(predicate) {
+ def this(l: CatalystFilter, r: CatalystFilter) =
+ this(
+ AndRecordFilter.and(l, r),
+ l,
+ r,
+ And(l.predicate, r.predicate))
+
+ override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
+ filter.bind(readers)
+ }
+}
+
+private[parquet] object ComparisonFilter {
+ def createBooleanFilter(
+ columnName: String,
+ value: Boolean,
+ predicate: CatalystPredicate): CatalystFilter =
+ new ComparisonFilter(
+ columnName,
+ ColumnRecordFilter.column(
+ columnName,
+ ColumnPredicates.applyFunctionToBoolean(
+ new BooleanPredicateFunction {
+ def functionToApply(input: Boolean): Boolean = input == value
+ }
+ )),
+ predicate)
+
+ def createStringFilter(
+ columnName: String,
+ value: String,
+ predicate: CatalystPredicate): CatalystFilter =
+ new ComparisonFilter(
+ columnName,
+ ColumnRecordFilter.column(
+ columnName,
+ ColumnPredicates.applyFunctionToString (
+ new ColumnPredicates.PredicateFunction[String] {
+ def functionToApply(input: String): Boolean = input == value
+ }
+ )),
+ predicate)
+
+ def createIntFilter(
+ columnName: String,
+ func: Int => Boolean,
+ predicate: CatalystPredicate): CatalystFilter =
+ new ComparisonFilter(
+ columnName,
+ ColumnRecordFilter.column(
+ columnName,
+ ColumnPredicates.applyFunctionToInteger(
+ new IntegerPredicateFunction {
+ def functionToApply(input: Int) = func(input)
+ }
+ )),
+ predicate)
+
+ def createLongFilter(
+ columnName: String,
+ func: Long => Boolean,
+ predicate: CatalystPredicate): CatalystFilter =
+ new ComparisonFilter(
+ columnName,
+ ColumnRecordFilter.column(
+ columnName,
+ ColumnPredicates.applyFunctionToLong(
+ new LongPredicateFunction {
+ def functionToApply(input: Long) = func(input)
+ }
+ )),
+ predicate)
+
+ def createDoubleFilter(
+ columnName: String,
+ func: Double => Boolean,
+ predicate: CatalystPredicate): CatalystFilter =
+ new ComparisonFilter(
+ columnName,
+ ColumnRecordFilter.column(
+ columnName,
+ ColumnPredicates.applyFunctionToDouble(
+ new DoublePredicateFunction {
+ def functionToApply(input: Double) = func(input)
+ }
+ )),
+ predicate)
+
+ def createFloatFilter(
+ columnName: String,
+ func: Float => Boolean,
+ predicate: CatalystPredicate): CatalystFilter =
+ new ComparisonFilter(
+ columnName,
+ ColumnRecordFilter.column(
+ columnName,
+ ColumnPredicates.applyFunctionToFloat(
+ new FloatPredicateFunction {
+ def functionToApply(input: Float) = func(input)
+ }
+ )),
+ predicate)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index f825ca3..65ba124 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -27,26 +27,27 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
-import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat}
+import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat}
+import parquet.hadoop.api.ReadSupport
import parquet.hadoop.util.ContextUtil
import parquet.io.InvalidRecordException
import parquet.schema.MessageType
-import org.apache.spark.{SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
/**
* Parquet table scan operator. Imports the file that backs the given
- * [[ParquetRelation]] as a RDD[Row].
+ * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
case class ParquetTableScan(
// note: output cannot be transient, see
// https://issues.apache.org/jira/browse/SPARK-1367
output: Seq[Attribute],
relation: ParquetRelation,
- columnPruningPred: Option[Expression])(
+ columnPruningPred: Seq[Expression])(
@transient val sc: SparkContext)
extends LeafNode {
@@ -62,18 +63,30 @@ case class ParquetTableScan(
for (path <- fileList if !path.getName.startsWith("_")) {
NewFileInputFormat.addInputPath(job, path)
}
+
+ // Store Parquet schema in `Configuration`
conf.set(
RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertFromAttributes(output).toString)
- // TODO: think about adding record filters
- /* Comments regarding record filters: it would be nice to push down as much filtering
- to Parquet as possible. However, currently it seems we cannot pass enough information
- to materialize an (arbitrary) Catalyst [[Predicate]] inside Parquet's
- ``FilteredRecordReader`` (via Configuration, for example). Simple
- filter-rows-by-column-values however should be supported.
- */
- sc.newAPIHadoopRDD(conf, classOf[ParquetInputFormat[Row]], classOf[Void], classOf[Row])
- .map(_._2)
+
+ // Store record filtering predicate in `Configuration`
+ // Note 1: the input format ignores all predicates that cannot be expressed
+ // as simple column predicate filters in Parquet. Here we just record
+ // the whole pruning predicate.
+ // Note 2: you can disable filter predicate pushdown by setting
+ // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf.
+ if (columnPruningPred.length > 0 &&
+ sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
+ ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
+ }
+
+ sc.newAPIHadoopRDD(
+ conf,
+ classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat],
+ classOf[Void],
+ classOf[Row])
+ .map(_._2)
+ .filter(_ != null) // Parquet's record filters may produce null values
}
override def otherCopyArgs = sc :: Nil
@@ -184,10 +197,19 @@ case class InsertIntoParquetTable(
override def otherCopyArgs = sc :: Nil
- // based on ``saveAsNewAPIHadoopFile`` in [[PairRDDFunctions]]
- // TODO: Maybe PairRDDFunctions should use Product2 instead of Tuple2?
- // .. then we could use the default one and could use [[MutablePair]]
- // instead of ``Tuple2``
+ /**
+ * Stores the given Row RDD as a Hadoop file.
+ *
+ * Note: We cannot use ``saveAsNewAPIHadoopFile`` from [[org.apache.spark.rdd.PairRDDFunctions]]
+ * together with [[org.apache.spark.util.MutablePair]] because ``PairRDDFunctions`` uses
+ * ``Tuple2`` and not ``Product2``. Also, we want to allow appending files to an existing
+ * directory and need to determine which was the largest written file index before starting to
+ * write.
+ *
+ * @param rdd The [[org.apache.spark.rdd.RDD]] to writer
+ * @param path The directory to write to.
+ * @param conf A [[org.apache.hadoop.conf.Configuration]].
+ */
private def saveAsHadoopFile(
rdd: RDD[Row],
path: String,
@@ -244,8 +266,10 @@ case class InsertIntoParquetTable(
}
}
-// TODO: this will be able to append to directories it created itself, not necessarily
-// to imported ones
+/**
+ * TODO: this will be able to append to directories it created itself, not necessarily
+ * to imported ones.
+ */
private[parquet] class AppendingParquetOutputFormat(offset: Int)
extends parquet.hadoop.ParquetOutputFormat[Row] {
// override to accept existing directories as valid output directory
@@ -262,6 +286,30 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
}
}
+/**
+ * We extend ParquetInputFormat in order to have more control over which
+ * RecordFilter we want to use.
+ */
+private[parquet] class FilteringParquetRowInputFormat
+ extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
+ override def createRecordReader(
+ inputSplit: InputSplit,
+ taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {
+ val readSupport: ReadSupport[Row] = new RowReadSupport()
+
+ val filterExpressions =
+ ParquetFilters.deserializeFilterExpressions(ContextUtil.getConfiguration(taskAttemptContext))
+ if (filterExpressions.length > 0) {
+ logInfo(s"Pushing down predicates for RecordFilter: ${filterExpressions.mkString(", ")}")
+ new ParquetRecordReader[Row](
+ readSupport,
+ ParquetFilters.createRecordFilter(filterExpressions))
+ } else {
+ new ParquetRecordReader[Row](readSupport)
+ }
+ }
+}
+
private[parquet] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
@@ -278,7 +326,9 @@ private[parquet] object FileSystemHelper {
fs.listStatus(path).map(_.getPath)
}
- // finds the maximum taskid in the output file names at the given path
+ /**
+ * Finds the maximum taskid in the output file names at the given path.
+ */
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
val files = FileSystemHelper.listFiles(pathStr, conf)
// filename pattern is part-r-<int>.parquet
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index f37976f..46c7172 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -19,15 +19,34 @@ package org.apache.spark.sql.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.Job
+import parquet.example.data.{GroupWriter, Group}
+import parquet.example.data.simple.SimpleGroup
import parquet.hadoop.ParquetWriter
-import parquet.hadoop.util.ContextUtil
+import parquet.hadoop.api.WriteSupport
+import parquet.hadoop.api.WriteSupport.WriteContext
+import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.util.Utils
+// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
+// with an empty configuration (it is after all not intended to be used in this way?)
+// and members are private so we need to make our own in order to pass the schema
+// to the writer.
+private class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] {
+ var groupWriter: GroupWriter = null
+ override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+ groupWriter = new GroupWriter(recordConsumer, schema)
+ }
+ override def init(configuration: Configuration): WriteContext = {
+ new WriteContext(schema, new java.util.HashMap[String, String]())
+ }
+ override def write(record: Group) {
+ groupWriter.write(record)
+ }
+}
+
private[sql] object ParquetTestData {
val testSchema =
@@ -43,7 +62,7 @@ private[sql] object ParquetTestData {
// field names for test assertion error messages
val testSchemaFieldNames = Seq(
"myboolean:Boolean",
- "mtint:Int",
+ "myint:Int",
"mystring:String",
"mylong:Long",
"myfloat:Float",
@@ -58,6 +77,18 @@ private[sql] object ParquetTestData {
|}
""".stripMargin
+ val testFilterSchema =
+ """
+ |message myrecord {
+ |required boolean myboolean;
+ |required int32 myint;
+ |required binary mystring;
+ |required int64 mylong;
+ |required float myfloat;
+ |required double mydouble;
+ |}
+ """.stripMargin
+
// field names for test assertion error messages
val subTestSchemaFieldNames = Seq(
"myboolean:Boolean",
@@ -65,36 +96,57 @@ private[sql] object ParquetTestData {
)
val testDir = Utils.createTempDir()
+ val testFilterDir = Utils.createTempDir()
lazy val testData = new ParquetRelation(testDir.toURI.toString)
def writeFile() = {
testDir.delete
val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet"))
- val job = new Job()
- val configuration: Configuration = ContextUtil.getConfiguration(job)
val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
- val writeSupport = new RowWriteSupport()
- writeSupport.setSchema(schema, configuration)
- val writer = new ParquetWriter(path, writeSupport)
for(i <- 0 until 15) {
- val data = new Array[Any](6)
+ val record = new SimpleGroup(schema)
if (i % 3 == 0) {
- data.update(0, true)
+ record.add(0, true)
} else {
- data.update(0, false)
+ record.add(0, false)
}
if (i % 5 == 0) {
- data.update(1, 5)
+ record.add(1, 5)
+ }
+ record.add(2, "abc")
+ record.add(3, i.toLong << 33)
+ record.add(4, 2.5F)
+ record.add(5, 4.5D)
+ writer.write(record)
+ }
+ writer.close()
+ }
+
+ def writeFilterFile(records: Int = 200) = {
+ // for microbenchmark use: records = 300000000
+ testFilterDir.delete
+ val path: Path = new Path(new Path(testFilterDir.toURI), new Path("part-r-0.parquet"))
+ val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema)
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+
+ for(i <- 0 to records) {
+ val record = new SimpleGroup(schema)
+ if (i % 4 == 0) {
+ record.add(0, true)
} else {
- data.update(1, null) // optional
+ record.add(0, false)
}
- data.update(2, "abc")
- data.update(3, i.toLong << 33)
- data.update(4, 2.5F)
- data.update(5, 4.5D)
- writer.write(new GenericRow(data.toArray))
+ record.add(1, i)
+ record.add(2, i.toString)
+ record.add(3, i.toLong)
+ record.add(4, i.toFloat + 0.5f)
+ record.add(5, i.toDouble + 0.5d)
+ writer.write(record)
}
writer.close()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c1fc99f..e9360b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -141,7 +141,7 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"),
Seq((2147483645.0,1),(2.0,2)))
}
-
+
test("count") {
checkAnswer(
sql("SELECT COUNT(*) FROM testData2"),
@@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest {
(3, "C"),
(4, "D")))
}
-
+
test("system function upper()") {
checkAnswer(
sql("SELECT n,UPPER(l) FROM lowerCaseData"),
@@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest {
(2, "ABC"),
(3, null)))
}
-
+
test("system function lower()") {
checkAnswer(
sql("SELECT N,LOWER(L) FROM upperCaseData"),
@@ -382,25 +382,25 @@ class SQLQuerySuite extends QueryTest {
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
- Seq(Seq(s"$testKey=$testVal"))
+ Seq(Seq(testKey, testVal))
)
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
- Seq(s"$testKey=$testVal"),
- Seq(s"${testKey + testKey}=${testVal + testVal}"))
+ Seq(testKey, testVal),
+ Seq(testKey + testKey, testVal + testVal))
)
// "set key"
checkAnswer(
sql(s"SET $testKey"),
- Seq(Seq(s"$testKey=$testVal"))
+ Seq(Seq(testKey, testVal))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
- Seq(Seq(s"$nonexistentKey is undefined"))
+ Seq(Seq(nonexistentKey, "<undefined>"))
)
clear()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 64aacab..9810520 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,25 +17,25 @@
package org.apache.spark.sql.parquet
-import java.io.File
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.mapreduce.Job
import parquet.hadoop.ParquetFileWriter
-import parquet.schema.MessageTypeParser
import parquet.hadoop.util.ContextUtil
+import parquet.schema.MessageTypeParser
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.getTempFilePath
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.TestData
+import org.apache.spark.sql.SchemaRDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.Equals
+import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.util.Utils
-import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
-import org.apache.spark.sql.{parquet, SchemaRDD}
// Implicits
import org.apache.spark.sql.test.TestSQLContext._
@@ -56,7 +56,7 @@ case class OptionalReflectData(
doubleField: Option[Double],
booleanField: Option[Boolean])
-class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
+class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
import TestData._
TestData // Load test data tables.
@@ -64,12 +64,16 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
override def beforeAll() {
ParquetTestData.writeFile()
+ ParquetTestData.writeFilterFile()
testRDD = parquetFile(ParquetTestData.testDir.toString)
testRDD.registerAsTable("testsource")
+ parquetFile(ParquetTestData.testFilterDir.toString)
+ .registerAsTable("testfiltersource")
}
override def afterAll() {
Utils.deleteRecursively(ParquetTestData.testDir)
+ Utils.deleteRecursively(ParquetTestData.testFilterDir)
// here we should also unregister the table??
}
@@ -120,7 +124,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
val scanner = new ParquetTableScan(
ParquetTestData.testData.output,
ParquetTestData.testData,
- None)(TestSQLContext.sparkContext)
+ Seq())(TestSQLContext.sparkContext)
val projected = scanner.pruneColumns(ParquetTypesConverter
.convertToAttributes(MessageTypeParser
.parseMessageType(ParquetTestData.subTestSchema)))
@@ -196,7 +200,6 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
assert(true)
}
-
test("insert (appending) to same table via Scala API") {
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
@@ -239,5 +242,125 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(file)
assert(true)
}
-}
+ test("create RecordFilter for simple predicates") {
+ val attribute1 = new AttributeReference("first", IntegerType, false)()
+ val predicate1 = new Equals(attribute1, new Literal(1, IntegerType))
+ val filter1 = ParquetFilters.createFilter(predicate1)
+ assert(filter1.isDefined)
+ assert(filter1.get.predicate == predicate1, "predicates do not match")
+ assert(filter1.get.isInstanceOf[ComparisonFilter])
+ val cmpFilter1 = filter1.get.asInstanceOf[ComparisonFilter]
+ assert(cmpFilter1.columnName == "first", "column name incorrect")
+
+ val predicate2 = new LessThan(attribute1, new Literal(4, IntegerType))
+ val filter2 = ParquetFilters.createFilter(predicate2)
+ assert(filter2.isDefined)
+ assert(filter2.get.predicate == predicate2, "predicates do not match")
+ assert(filter2.get.isInstanceOf[ComparisonFilter])
+ val cmpFilter2 = filter2.get.asInstanceOf[ComparisonFilter]
+ assert(cmpFilter2.columnName == "first", "column name incorrect")
+
+ val predicate3 = new And(predicate1, predicate2)
+ val filter3 = ParquetFilters.createFilter(predicate3)
+ assert(filter3.isDefined)
+ assert(filter3.get.predicate == predicate3, "predicates do not match")
+ assert(filter3.get.isInstanceOf[AndFilter])
+
+ val predicate4 = new Or(predicate1, predicate2)
+ val filter4 = ParquetFilters.createFilter(predicate4)
+ assert(filter4.isDefined)
+ assert(filter4.get.predicate == predicate4, "predicates do not match")
+ assert(filter4.get.isInstanceOf[OrFilter])
+
+ val attribute2 = new AttributeReference("second", IntegerType, false)()
+ val predicate5 = new GreaterThan(attribute1, attribute2)
+ val badfilter = ParquetFilters.createFilter(predicate5)
+ assert(badfilter.isDefined === false)
+ }
+
+ test("test filter by predicate pushdown") {
+ for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
+ println(s"testing field $myval")
+ val query1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100")
+ assert(
+ query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val result1 = query1.collect()
+ assert(result1.size === 50)
+ assert(result1(0)(1) === 100)
+ assert(result1(49)(1) === 149)
+ val query2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200")
+ assert(
+ query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val result2 = query2.collect()
+ assert(result2.size === 50)
+ if (myval == "myint" || myval == "mylong") {
+ assert(result2(0)(1) === 151)
+ assert(result2(49)(1) === 200)
+ } else {
+ assert(result2(0)(1) === 150)
+ assert(result2(49)(1) === 199)
+ }
+ }
+ for(myval <- Seq("myint", "mylong")) {
+ val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10")
+ assert(
+ query3.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val result3 = query3.collect()
+ assert(result3.size === 20)
+ assert(result3(0)(1) === 0)
+ assert(result3(9)(1) === 9)
+ assert(result3(10)(1) === 191)
+ assert(result3(19)(1) === 200)
+ }
+ for(myval <- Seq("mydouble", "myfloat")) {
+ val result4 =
+ if (myval == "mydouble") {
+ val query4 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10.0")
+ assert(
+ query4.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ query4.collect()
+ } else {
+ // CASTs are problematic. Here myfloat will be casted to a double and it seems there is
+ // currently no way to specify float constants in SqlParser?
+ sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect()
+ }
+ assert(result4.size === 20)
+ assert(result4(0)(1) === 0)
+ assert(result4(9)(1) === 9)
+ assert(result4(10)(1) === 191)
+ assert(result4(19)(1) === 200)
+ }
+ val query5 = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40")
+ assert(
+ query5.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val booleanResult = query5.collect()
+ assert(booleanResult.size === 10)
+ for(i <- 0 until 10) {
+ if (!booleanResult(i).getBoolean(0)) {
+ fail(s"Boolean value in result row $i not true")
+ }
+ if (booleanResult(i).getInt(1) != i * 4) {
+ fail(s"Int value in result row $i should be ${4*i}")
+ }
+ }
+ val query6 = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"")
+ assert(
+ query6.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+ "Top operator should be ParquetTableScan after pushdown")
+ val stringResult = query6.collect()
+ assert(stringResult.size === 1)
+ assert(stringResult(0).getString(2) == "100", "stringvalue incorrect")
+ assert(stringResult(0).getInt(1) === 100)
+ }
+
+ test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
+ val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
+ assert(query.collect().size === 10)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9cd13f6..96e0ec5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql
-package hive
+package org.apache.spark.sql.hive
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.util.{ArrayList => JArrayList}
@@ -32,12 +31,13 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution.{Command => PhysicalCommand}
/**
* Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
@@ -71,14 +71,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/**
* Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD.
*/
- def hiveql(hqlQuery: String): SchemaRDD = {
- val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
- // We force query optimization to happen right away instead of letting it happen lazily like
- // when using the query DSL. This is so DDL commands behave as expected. This is only
- // generates the RDD lineage for DML queries, but does not perform any execution.
- result.queryExecution.toRdd
- result
- }
+ def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
/** An alias for `hiveql`. */
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)
@@ -164,7 +157,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/**
* Runs the specified SQL query using Hive.
*/
- protected def runSqlHive(sql: String): Seq[String] = {
+ protected[sql] def runSqlHive(sql: String): Seq[String] = {
val maxResults = 100000
val results = runHive(sql, 100000)
// It is very confusing when you only get back some of the results...
@@ -228,6 +221,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override val strategies: Seq[Strategy] = Seq(
CommandStrategy(self),
+ HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
InMemoryScans,
@@ -252,25 +246,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override lazy val optimizedPlan =
optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
- override lazy val toRdd: RDD[Row] = {
- def processCmd(cmd: String): RDD[Row] = {
- val output = runSqlHive(cmd)
- if (output.size == 0) {
- emptyResult
- } else {
- val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
- sparkContext.parallelize(asRows, 1)
- }
- }
-
- logical match {
- case s: SetCommand => eagerlyProcess(s)
- case _ => analyzed match {
- case NativeCommand(cmd) => processCmd(cmd)
- case _ => executedPlan.execute().map(_.copy())
- }
- }
- }
+ override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
protected val primitiveTypes =
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
@@ -298,7 +274,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
struct.zip(fields).map {
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
}.mkString("{", ",", "}")
- case (seq: Seq[_], ArrayType(typ))=>
+ case (seq: Seq[_], ArrayType(typ)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
case (map: Map[_,_], MapType(kType, vType)) =>
map.map {
@@ -314,10 +290,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* Returns the result as a hive compatible sequence of strings. For native commands, the
* execution is simply passed back to Hive.
*/
- def stringResult(): Seq[String] = analyzed match {
- case NativeCommand(cmd) => runSqlHive(cmd)
- case ExplainCommand(plan) => executePlan(plan).toString.split("\n")
- case query =>
+ def stringResult(): Seq[String] = executedPlan match {
+ case command: PhysicalCommand =>
+ command.sideEffectResult.map(_.toString)
+
+ case other =>
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
@@ -328,8 +305,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def simpleString: String =
logical match {
- case _: NativeCommand => "<Executed by Hive>"
- case _: SetCommand => "<Set Command: Executed by Hive, and noted by SQLContext>"
+ case _: NativeCommand => "<Native command: executed by Hive>"
+ case _: SetCommand => "<SET command: executed by Hive, and noted by SQLContext>"
case _ => executedPlan.toString
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d1aa8c8..0ac0ee9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -70,9 +70,18 @@ private[hive] trait HiveStrategies {
pruneFilterProject(
projectList,
otherPredicates,
+ identity[Seq[Expression]],
HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
case _ =>
Nil
}
}
+
+ case class HiveCommandStrategy(context: HiveContext) extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case logical.NativeCommand(sql) =>
+ NativeCommand(sql, plan.output)(context) :: Nil
+ case _ => Nil
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index d199097..9386008 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -58,7 +58,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
- System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
override lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
index 29b4b9b..a839231 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
@@ -32,14 +32,15 @@ import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
+import org.apache.spark
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive._
-import org.apache.spark.{TaskContext, SparkException}
import org.apache.spark.util.MutablePair
+import org.apache.spark.{TaskContext, SparkException}
/* Implicits */
import scala.collection.JavaConversions._
@@ -57,7 +58,7 @@ case class HiveTableScan(
attributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Option[Expression])(
- @transient val sc: HiveContext)
+ @transient val context: HiveContext)
extends LeafNode
with HiveInspectors {
@@ -75,7 +76,7 @@ case class HiveTableScan(
}
@transient
- val hadoopReader = new HadoopTableReader(relation.tableDesc, sc)
+ val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
/**
* The hive object inspector for this table, which can be used to extract values from the
@@ -156,7 +157,7 @@ case class HiveTableScan(
hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
}
- addColumnMetadataToConf(sc.hiveconf)
+ addColumnMetadataToConf(context.hiveconf)
@transient
def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
@@ -428,3 +429,26 @@ case class InsertIntoHiveTable(
sc.sparkContext.makeRDD(Nil, 1)
}
}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class NativeCommand(
+ sql: String, output: Seq[Attribute])(
+ @transient context: HiveContext)
+ extends LeafNode with Command {
+
+ override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
+
+ override def execute(): RDD[spark.sql.Row] = {
+ if (sideEffectResult.size == 0) {
+ context.emptyResult
+ } else {
+ val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
+ context.sparkContext.parallelize(rows, 1)
+ }
+ }
+
+ override def otherCopyArgs = context :: Nil
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7e3e9afd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 357c7e6..24c929f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
import org.apache.spark.sql.Logging
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand => LogicalNativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.test.TestHive
@@ -141,7 +142,7 @@ abstract class HiveComparisonTest
// Hack: Hive simply prints the result of a SET command to screen,
// and does not return it as a query answer.
case _: SetCommand => Seq("0")
- case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
+ case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case plan => if (isSorted(plan)) answer else answer.sorted
}