You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/28 17:45:48 UTC
spark git commit: [SPARK-21216][SS] Hive strategies missed in
Structured Streaming IncrementalExecution
Repository: spark
Updated Branches:
refs/heads/master 838effb98 -> e68aed70f
[SPARK-21216][SS] Hive strategies missed in Structured Streaming IncrementalExecution
## What changes were proposed in this pull request?
If someone creates a HiveSession, the planner in `IncrementalExecution` doesn't take into account the Hive scan strategies. This causes joins of Streaming DataFrame's with Hive tables to fail.
## How was this patch tested?
Regression test
Author: Burak Yavuz <br...@gmail.com>
Closes #18426 from brkyvz/hive-join.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e68aed70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e68aed70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e68aed70
Branch: refs/heads/master
Commit: e68aed70fbf1cfa59ba51df70287d718d737a193
Parents: 838effb
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Jun 28 10:45:45 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jun 28 10:45:45 2017 -0700
----------------------------------------------------------------------
.../streaming/IncrementalExecution.scala | 4 ++
.../spark/sql/hive/execution/HiveDDLSuite.scala | 41 +++++++++++++++++++-
2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e68aed70/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index ab89dc6..dbe652b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -47,6 +47,10 @@ class IncrementalExecution(
sparkSession.sparkContext,
sparkSession.sessionState.conf,
sparkSession.sessionState.experimentalMethods) {
+ override def strategies: Seq[Strategy] =
+ extraPlanningStrategies ++
+ sparkSession.sessionState.planner.strategies
+
override def extraPlanningStrategies: Seq[Strategy] =
StatefulAggregationStrategy ::
FlatMapGroupsWithStateStrategy ::
http://git-wip-us.apache.org/repos/asf/spark/blob/e68aed70/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index aca9649..31fa3d2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -160,7 +160,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
test("drop table") {
testDropTable(isDatasourceTable = false)
}
-
}
class HiveDDLSuite
@@ -1956,4 +1955,44 @@ class HiveDDLSuite
}
}
}
+
+ test("SPARK-21216: join with a streaming DataFrame") {
+ import org.apache.spark.sql.execution.streaming.MemoryStream
+ import testImplicits._
+
+ implicit val _sqlContext = spark.sqlContext
+
+ Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word").createOrReplaceTempView("t1")
+ // Make a table and ensure it will be broadcast.
+ sql("""CREATE TABLE smallTable(word string, number int)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |STORED AS TEXTFILE
+ """.stripMargin)
+
+ sql(
+ """INSERT INTO smallTable
+ |SELECT word, number from t1
+ """.stripMargin)
+
+ val inputData = MemoryStream[Int]
+ val joined = inputData.toDS().toDF()
+ .join(spark.table("smallTable"), $"value" === $"number")
+
+ val sq = joined.writeStream
+ .format("memory")
+ .queryName("t2")
+ .start()
+ try {
+ inputData.addData(1, 2)
+
+ sq.processAllAvailable()
+
+ checkAnswer(
+ spark.table("t2"),
+ Seq(Row(1, "one", 1), Row(2, "two", 2))
+ )
+ } finally {
+ sq.stop()
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org