You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/28 17:45:48 UTC

spark git commit: [SPARK-21216][SS] Hive strategies missed in Structured Streaming IncrementalExecution

Repository: spark
Updated Branches:
  refs/heads/master 838effb98 -> e68aed70f


[SPARK-21216][SS] Hive strategies missed in Structured Streaming IncrementalExecution

## What changes were proposed in this pull request?

If someone creates a HiveSession, the planner in `IncrementalExecution` doesn't take into account the Hive scan strategies. This causes joins of Streaming DataFrame's with Hive tables to fail.

## How was this patch tested?

Regression test

Author: Burak Yavuz <br...@gmail.com>

Closes #18426 from brkyvz/hive-join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e68aed70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e68aed70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e68aed70

Branch: refs/heads/master
Commit: e68aed70fbf1cfa59ba51df70287d718d737a193
Parents: 838effb
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Jun 28 10:45:45 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jun 28 10:45:45 2017 -0700

----------------------------------------------------------------------
 .../streaming/IncrementalExecution.scala        |  4 ++
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 41 +++++++++++++++++++-
 2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e68aed70/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index ab89dc6..dbe652b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -47,6 +47,10 @@ class IncrementalExecution(
       sparkSession.sparkContext,
       sparkSession.sessionState.conf,
       sparkSession.sessionState.experimentalMethods) {
+    override def strategies: Seq[Strategy] =
+      extraPlanningStrategies ++
+      sparkSession.sessionState.planner.strategies
+
     override def extraPlanningStrategies: Seq[Strategy] =
       StatefulAggregationStrategy ::
       FlatMapGroupsWithStateStrategy ::

http://git-wip-us.apache.org/repos/asf/spark/blob/e68aed70/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index aca9649..31fa3d2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -160,7 +160,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
   test("drop table") {
     testDropTable(isDatasourceTable = false)
   }
-
 }
 
 class HiveDDLSuite
@@ -1956,4 +1955,44 @@ class HiveDDLSuite
       }
     }
   }
+
+  test("SPARK-21216: join with a streaming DataFrame") {
+    import org.apache.spark.sql.execution.streaming.MemoryStream
+    import testImplicits._
+
+    implicit val _sqlContext = spark.sqlContext
+
+    Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word").createOrReplaceTempView("t1")
+    // Make a table and ensure it will be broadcast.
+    sql("""CREATE TABLE smallTable(word string, number int)
+          |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+          |STORED AS TEXTFILE
+        """.stripMargin)
+
+    sql(
+      """INSERT INTO smallTable
+        |SELECT word, number from t1
+      """.stripMargin)
+
+    val inputData = MemoryStream[Int]
+    val joined = inputData.toDS().toDF()
+      .join(spark.table("smallTable"), $"value" === $"number")
+
+    val sq = joined.writeStream
+      .format("memory")
+      .queryName("t2")
+      .start()
+    try {
+      inputData.addData(1, 2)
+
+      sq.processAllAvailable()
+
+      checkAnswer(
+        spark.table("t2"),
+        Seq(Row(1, "one", 1), Row(2, "two", 2))
+      )
+    } finally {
+      sq.stop()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org