You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/08/23 02:07:48 UTC

spark git commit: [SPARK-21765] Set isStreaming on leaf nodes for streaming plans.

Repository: spark
Updated Branches:
  refs/heads/master 41bb1ddc6 -> 3c0c2d09c


[SPARK-21765] Set isStreaming on leaf nodes for streaming plans.

## What changes were proposed in this pull request?
All streaming logical plans will now have isStreaming set. This involved adding isStreaming as a case class arg in a few cases, since a node might be logically streaming depending on where it came from.

## How was this patch tested?

Existing unit tests - no functional change is intended in this PR.

Author: Jose Torres <jo...@databricks.com>
Author: Tathagata Das <ta...@gmail.com>

Closes #18973 from joseph-torres/SPARK-21765.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0c2d09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0c2d09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0c2d09

Branch: refs/heads/master
Commit: 3c0c2d09ca89c6b6247137823169db17847dfae3
Parents: 41bb1dd
Author: Jose Torres <jo...@databricks.com>
Authored: Tue Aug 22 19:07:43 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Aug 22 19:07:43 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSource.scala |  2 +-
 .../sql/catalyst/optimizer/Optimizer.scala      | 10 +++----
 .../catalyst/plans/logical/LocalRelation.scala  |  5 +++-
 .../catalyst/plans/logical/LogicalPlan.scala    |  2 +-
 .../plans/logical/basicLogicalOperators.scala   | 11 +++----
 .../analysis/ResolveInlineTablesSuite.scala     |  2 +-
 .../analysis/UnsupportedOperationsSuite.scala   |  6 ++--
 .../optimizer/ReplaceOperatorSuite.scala        |  6 ++--
 .../sql/catalyst/plans/LogicalPlanSuite.scala   |  6 ++--
 .../org/apache/spark/sql/DataFrameReader.scala  |  4 +--
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +--
 .../scala/org/apache/spark/sql/Dataset.scala    |  7 +++--
 .../scala/org/apache/spark/sql/SQLContext.scala |  7 +++--
 .../org/apache/spark/sql/SparkSession.scala     |  8 +++--
 .../spark/sql/execution/ExistingRDD.scala       |  8 +++--
 .../execution/OptimizeMetadataOnlyQuery.scala   |  6 ++--
 .../spark/sql/execution/SparkStrategies.scala   |  8 +++--
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../datasources/DataSourceStrategy.scala        | 15 +++++-----
 .../datasources/FileSourceStrategy.scala        |  2 +-
 .../execution/datasources/LogicalRelation.scala | 12 ++++----
 .../datasources/PruneFileSourcePartitions.scala |  1 +
 .../spark/sql/execution/datasources/rules.scala | 10 +++----
 .../execution/streaming/FileStreamSource.scala  |  2 +-
 .../streaming/RateSourceProvider.scala          |  5 ++--
 .../execution/streaming/StreamExecution.scala   |  3 ++
 .../spark/sql/execution/streaming/memory.scala  | 31 ++++++++++++++++----
 .../OptimizeMetadataOnlyQuerySuite.scala        |  4 +--
 .../spark/sql/execution/SparkPlannerSuite.scala |  2 +-
 .../datasources/FileSourceStrategySuite.scala   |  2 +-
 .../parquet/ParquetFilterSuite.scala            |  3 +-
 .../ParquetPartitionDiscoverySuite.scala        |  2 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  2 +-
 .../spark/sql/sources/FilteredScanSuite.scala   |  2 +-
 .../spark/sql/sources/PathOptionSuite.scala     |  2 +-
 .../sql/streaming/FileStreamSinkSuite.scala     |  2 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |  5 +++-
 .../spark/sql/streaming/StreamSuite.scala       | 12 +++++++-
 .../streaming/StreamingAggregationSuite.scala   | 29 ++++++++++++++++--
 .../sql/streaming/StreamingQuerySuite.scala     |  5 +++-
 .../test/DataStreamReaderWriterSuite.scala      |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  2 +-
 .../spark/sql/hive/orc/OrcFilterSuite.scala     |  4 +--
 .../apache/spark/sql/hive/parquetSuites.scala   |  8 ++---
 46 files changed, 180 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 7ac1837..e9cff04 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -310,7 +310,7 @@ private[kafka010] class KafkaSource(
       currentPartitionOffsets = Some(untilPartitionOffsets)
     }
 
-    sqlContext.internalCreateDataFrame(rdd, schema)
+    sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
   /** Stop this source and free any resources it has allocated. */

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e2d7164..75d83bc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1175,14 +1175,14 @@ object DecimalAggregates extends Rule[LogicalPlan] {
  */
 object ConvertToLocalRelation extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case Project(projectList, LocalRelation(output, data))
+    case Project(projectList, LocalRelation(output, data, isStreaming))
         if !projectList.exists(hasUnevaluableExpr) =>
       val projection = new InterpretedProjection(projectList, output)
       projection.initialize(0)
-      LocalRelation(projectList.map(_.toAttribute), data.map(projection))
+      LocalRelation(projectList.map(_.toAttribute), data.map(projection), isStreaming)
 
-    case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
-      LocalRelation(output, data.take(limit))
+    case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
+      LocalRelation(output, data.take(limit), isStreaming)
   }
 
   private def hasUnevaluableExpr(expr: Expression): Boolean = {
@@ -1207,7 +1207,7 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
  */
 object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case Deduplicate(keys, child, streaming) if !streaming =>
+    case Deduplicate(keys, child) if !child.isStreaming =>
       val keyExprIds = keys.map(_.exprId)
       val aggCols = child.output.map { attr =>
         if (keyExprIds.contains(attr.exprId)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index 1c986fb..7a21183 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -43,7 +43,10 @@ object LocalRelation {
   }
 }
 
-case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
+case class LocalRelation(output: Seq[Attribute],
+                         data: Seq[InternalRow] = Nil,
+                         // Indicates whether this relation has data from a streaming source.
+                         override val isStreaming: Boolean = false)
   extends LeafNode with analysis.MultiInstanceRelation {
 
   // A local relation must have resolved output.

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 9b440cd..d893b39 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -47,7 +47,7 @@ abstract class LogicalPlan
    */
   def analyzed: Boolean = _analyzed
 
-  /** Returns true if this subtree contains any streaming data sources. */
+  /** Returns true if this subtree has data from a streaming data source. */
   def isStreaming: Boolean = children.exists(_.isStreaming == true)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 303014e..4b3054d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -429,9 +429,10 @@ case class Sort(
 
 /** Factory for constructing new `Range` nodes. */
 object Range {
-  def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = {
+  def apply(start: Long, end: Long, step: Long,
+            numSlices: Option[Int], isStreaming: Boolean = false): Range = {
     val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
-    new Range(start, end, step, numSlices, output)
+    new Range(start, end, step, numSlices, output, isStreaming)
   }
   def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
     Range(start, end, step, Some(numSlices))
@@ -443,7 +444,8 @@ case class Range(
     end: Long,
     step: Long,
     numSlices: Option[Int],
-    output: Seq[Attribute])
+    output: Seq[Attribute],
+    override val isStreaming: Boolean)
   extends LeafNode with MultiInstanceRelation {
 
   require(step != 0, s"step ($step) cannot be 0")
@@ -784,8 +786,7 @@ case class OneRowRelation() extends LeafNode {
 /** A logical plan for `dropDuplicates`. */
 case class Deduplicate(
     keys: Seq[Attribute],
-    child: LogicalPlan,
-    streaming: Boolean) extends UnaryNode {
+    child: LogicalPlan) extends UnaryNode {
 
   override def output: Seq[Attribute] = child.output
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
index d0fe815..9e99c8e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
@@ -93,7 +93,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
     val table = UnresolvedInlineTable(Seq("c1"),
       Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
     val withTimeZone = ResolveTimeZone(conf).apply(table)
-    val LocalRelation(output, data) = ResolveInlineTables(conf).apply(withTimeZone)
+    val LocalRelation(output, data, _) = ResolveInlineTables(conf).apply(withTimeZone)
     val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
       .withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
     assert(output.map(_.dataType) == Seq(TimestampType))

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index f68d930..4de7586 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -368,18 +368,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     Aggregate(
       Seq(attributeWithWatermark),
       aggExprs("c"),
-      Deduplicate(Seq(att), streamRelation, streaming = true)),
+      Deduplicate(Seq(att), streamRelation)),
     outputMode = Append)
 
   assertNotSupportedInStreamingPlan(
     "Deduplicate - Deduplicate on streaming relation after aggregation",
-    Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation), streaming = true),
+    Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation)),
     outputMode = Complete,
     expectedMsgs = Seq("dropDuplicates"))
 
   assertSupportedInStreamingPlan(
     "Deduplicate - Deduplicate on batch relation inside a streaming query",
-    Deduplicate(Seq(att), batchRelation, streaming = false),
+    Deduplicate(Seq(att), batchRelation),
     outputMode = Append
   )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index e68423f..85988d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest {
     val input = LocalRelation('a.int, 'b.int)
     val attrA = input.output(0)
     val attrB = input.output(1)
-    val query = Deduplicate(Seq(attrA), input, streaming = false) // dropDuplicates("a")
+    val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a")
     val optimized = Optimize.execute(query.analyze)
 
     val correctAnswer =
@@ -95,9 +95,9 @@ class ReplaceOperatorSuite extends PlanTest {
   }
 
   test("don't replace streaming Deduplicate") {
-    val input = LocalRelation('a.int, 'b.int)
+    val input = LocalRelation(Seq('a.int, 'b.int), isStreaming = true)
     val attrA = input.output(0)
-    val query = Deduplicate(Seq(attrA), input, streaming = true) // dropDuplicates("a")
+    val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a")
     val optimized = Optimize.execute(query.analyze)
 
     comparePlans(optimized, query)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index cc86f1f..cdf912d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -73,10 +73,8 @@ class LogicalPlanSuite extends SparkFunSuite {
 
   test("isStreaming") {
     val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
-    val incrementalRelation = new LocalRelation(
-      Seq(AttributeReference("a", IntegerType, nullable = true)())) {
-      override def isStreaming(): Boolean = true
-    }
+    val incrementalRelation = LocalRelation(
+      Seq(AttributeReference("a", IntegerType, nullable = true)()), isStreaming = true)
 
     case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
       override def output: Seq[Attribute] = left.output ++ right.output

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 10b28ce..41cb019 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -410,7 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
     Dataset.ofRows(
       sparkSession,
-      LogicalRDD(schema.toAttributes, parsed)(sparkSession))
+      LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
   }
 
   /**
@@ -473,7 +473,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
     Dataset.ofRows(
       sparkSession,
-      LogicalRDD(schema.toAttributes, parsed)(sparkSession))
+      LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 877051a..cca9352 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -371,14 +371,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       case (true, SaveMode.Overwrite) =>
         // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
-          case LogicalRelation(src: BaseRelation, _, _) => src
+          case LogicalRelation(src: BaseRelation, _, _, _) => src
           case relation: HiveTableRelation => relation.tableMeta.identifier
         }
 
         val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
         EliminateSubqueryAliases(tableRelation) match {
           // check if the table is a data source table (the relation is a BaseRelation).
-          case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
+          case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           // check hive table relation when overwrite mode

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 615686c..c670739 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -569,7 +569,8 @@ class Dataset[T] private[sql](
         logicalPlan.output,
         internalRdd,
         outputPartitioning,
-        physicalPlan.outputOrdering
+        physicalPlan.outputOrdering,
+        isStreaming
       )(sparkSession)).as[T]
   }
 
@@ -2233,7 +2234,7 @@ class Dataset[T] private[sql](
       }
       cols
     }
-    Deduplicate(groupCols, logicalPlan, isStreaming)
+    Deduplicate(groupCols, logicalPlan)
   }
 
   /**
@@ -2993,7 +2994,7 @@ class Dataset[T] private[sql](
    */
   def inputFiles: Array[String] = {
     val files: Seq[String] = queryExecution.optimizedPlan.collect {
-      case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
+      case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 7fde6e9..af60184 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -420,8 +420,11 @@ class SQLContext private[sql](val sparkSession: SparkSession)
    * converted to Catalyst rows.
    */
   private[sql]
-  def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = {
-    sparkSession.internalCreateDataFrame(catalystRows, schema)
+  def internalCreateDataFrame(
+      catalystRows: RDD[InternalRow],
+      schema: StructType,
+      isStreaming: Boolean = false) = {
+    sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 6dfe8a6..863c316 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -564,10 +564,14 @@ class SparkSession private(
    */
   private[sql] def internalCreateDataFrame(
       catalystRows: RDD[InternalRow],
-      schema: StructType): DataFrame = {
+      schema: StructType,
+      isStreaming: Boolean = false): DataFrame = {
     // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
     // schema differs from the existing schema on any field data type.
-    val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
+    val logicalPlan = LogicalRDD(
+      schema.toAttributes,
+      catalystRows,
+      isStreaming = isStreaming)(self)
     Dataset.ofRows(self, logicalPlan)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index dcb918e..f355550 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -125,7 +125,8 @@ case class LogicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
     outputPartitioning: Partitioning = UnknownPartitioning(0),
-    outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession)
+    outputOrdering: Seq[SortOrder] = Nil,
+    override val isStreaming: Boolean = false)(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
   override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
@@ -150,11 +151,12 @@ case class LogicalRDD(
       output.map(rewrite),
       rdd,
       rewrittenPartitioning,
-      rewrittenOrdering
+      rewrittenOrdering,
+      isStreaming
     )(session).asInstanceOf[this.type]
   }
 
-  override protected def stringArgs: Iterator[Any] = Iterator(output)
+  override protected def stringArgs: Iterator[Any] = Iterator(output, isStreaming)
 
   override def computeStats(): Statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 301c4f0..18f6f69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -94,10 +94,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
     child transform {
       case plan if plan eq relation =>
         relation match {
-          case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+          case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
             val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
             val partitionData = fsRelation.location.listFiles(Nil, Nil)
-            LocalRelation(partAttrs, partitionData.map(_.values))
+            LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
 
           case relation: HiveTableRelation =>
             val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
@@ -130,7 +130,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
   object PartitionedRelation {
 
     def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
-      case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
+      case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
         if fsRelation.partitionSchema.nonEmpty =>
         val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
         Some((AttributeSet(partAttrs), l))

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c115cb6..6b16408 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -221,12 +221,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   }
 
   /**
-   * Used to plan aggregation queries that are computed incrementally as part of a
+   * Used to plan streaming aggregation queries that are computed incrementally as part of a
    * [[StreamingQuery]]. Currently this rule is injected into the planner
    * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]]
    */
   object StatefulAggregationStrategy extends Strategy {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case _ if !plan.isStreaming => Nil
+
       case EventTimeWatermark(columnName, delay, child) =>
         EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil
 
@@ -248,7 +250,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    */
   object StreamingDeduplicationStrategy extends Strategy {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case Deduplicate(keys, child, true) =>
+      case Deduplicate(keys, child) if child.isStreaming =>
         StreamingDeduplicateExec(keys, planLater(child)) :: Nil
 
       case _ => Nil
@@ -410,7 +412,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
       case logical.Sample(lb, ub, withReplacement, seed, child) =>
         execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
-      case logical.LocalRelation(output, data) =>
+      case logical.LocalRelation(output, data, _) =>
         LocalTableScanExec(output, data) :: Nil
       case logical.LocalLimit(IntegerLiteral(limit), child) =>
         execution.LocalLimitExec(limit, planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 567ff49..b9502a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -455,7 +455,7 @@ case class DataSource(
 
     val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
       sparkSession.table(tableIdent).queryExecution.analyzed.collect {
-        case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
+        case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
       }.head
     }
     // For partitioned relation r, r.schema's column ordering can be different from the column

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2370177..0deac19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -136,12 +136,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
       CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
 
-    case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
+    case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
         parts, query, overwrite, false) if parts.isEmpty =>
       InsertIntoDataSourceCommand(l, query, overwrite)
 
     case i @ InsertIntoTable(
-        l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) =>
+        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
       // the user has specified static partitions, we add a Project operator on top of the query
       // to include those constant column values in the query result.
@@ -177,7 +177,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
 
       val outputPath = t.location.rootPaths.head
       val inputPaths = actualQuery.collect {
-        case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
+        case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
       }.flatten
 
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
@@ -268,7 +268,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
   import DataSourceStrategy._
 
   def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
       pruneFilterProjectRaw(
         l,
         projects,
@@ -276,21 +276,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
         (requestedColumns, allPredicates, _) =>
           toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
 
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
+    case PhysicalOperation(projects, filters,
+                           l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
       pruneFilterProject(
         l,
         projects,
         filters,
         (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
 
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
       pruneFilterProject(
         l,
         projects,
         filters,
         (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
 
-    case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
+    case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
       RowDataSourceScanExec(
         l.output,
         l.output.indices,

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 17f7e0e..16b2271 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -52,7 +52,7 @@ import org.apache.spark.sql.execution.SparkPlan
 object FileSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(projects, filters,
-      l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
+      l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:
       //  - partition keys only - used to prune directories to read

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 699f1ba..17a6107 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -30,12 +30,14 @@ import org.apache.spark.util.Utils
 case class LogicalRelation(
     relation: BaseRelation,
     output: Seq[AttributeReference],
-    catalogTable: Option[CatalogTable])
+    catalogTable: Option[CatalogTable],
+    override val isStreaming: Boolean)
   extends LeafNode with MultiInstanceRelation {
 
   // Logical Relations are distinct if they have different output for the sake of transformations.
   override def equals(other: Any): Boolean = other match {
-    case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
+    case l @ LogicalRelation(otherRelation, _, _, isStreaming) =>
+      relation == otherRelation && output == l.output && isStreaming == l.isStreaming
     case _ => false
   }
 
@@ -76,9 +78,9 @@ case class LogicalRelation(
 }
 
 object LogicalRelation {
-  def apply(relation: BaseRelation): LogicalRelation =
-    LogicalRelation(relation, relation.schema.toAttributes, None)
+  def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation =
+    LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming)
 
   def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
-    LogicalRelation(relation, relation.schema.toAttributes, Some(table))
+    LogicalRelation(relation, relation.schema.toAttributes, Some(table), false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index f5df184..3b830ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -36,6 +36,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
               _,
               _),
             _,
+            _,
             _))
         if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
       // The attribute name of predicate could be different than the one in schema in case of

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 84acca2..7a2c85e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -385,10 +385,10 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
         case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
           preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
-        case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
+        case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
           val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
           preprocess(i, tblName, h.partitionSchema.map(_.name))
-        case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
+        case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
           val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
           preprocess(i, tblName, Nil)
         case _ => i
@@ -428,7 +428,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {
   private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
     operator match {
       case _: HiveTableRelation => 1
-      case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
+      case _ @ LogicalRelation(_: HadoopFsRelation, _, _, _) => 1
       case _: LeafNode => 0
       // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
       case u: Union =>
@@ -454,10 +454,10 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) =>
+      case InsertIntoTable(l @ LogicalRelation(relation, _, _, _), partition, query, _, _) =>
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
-          case LogicalRelation(src, _, _) => src
+          case LogicalRelation(src, _, _, _) => src
         }
         if (srcRelations.contains(relation)) {
           failAnalysis("Cannot insert into table that is also being read from.")

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 4b1b252..f174173 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -171,7 +171,7 @@ class FileStreamSource(
         className = fileFormatClassName,
         options = optionsWithPartitionBasePath)
     Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
-      checkFilesExist = false)))
+      checkFilesExist = false), isStreaming = true))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index e76d4dc..077a477 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -200,7 +200,8 @@ class RateStreamSource(
       s"rangeStart: $rangeStart, rangeEnd: $rangeEnd")
 
     if (rangeStart == rangeEnd) {
-      return sqlContext.internalCreateDataFrame(sqlContext.sparkContext.emptyRDD, schema)
+      return sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
     }
 
     val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds)
@@ -211,7 +212,7 @@ class RateStreamSource(
       val relative = math.round((v - rangeStart) * relativeMsPerValue)
       InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), v)
     }
-    sqlContext.internalCreateDataFrame(rdd, schema)
+    sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
   override def stop(): Unit = {}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9bc114f..432b2d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -609,6 +609,9 @@ class StreamExecution(
           if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
           val current = committedOffsets.get(source)
           val batch = source.getBatch(current, available)
+          assert(batch.isStreaming,
+            s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
+              s"${batch.queryExecution.logical}")
           logDebug(s"Retrieving data from $source: $current -> $available")
           Some(source -> batch)
         case _ => None

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 587ae2b..c9784c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.util.control.NonFatal
 
@@ -27,13 +29,14 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
+
 object MemoryStream {
   protected val currentBlockId = new AtomicInteger(0)
   protected val memoryStreamId = new AtomicInteger(0)
@@ -44,7 +47,7 @@ object MemoryStream {
 
 /**
  * A [[Source]] that produces value stored in memory as they are added by the user.  This [[Source]]
- * is primarily intended for use in unit tests as it can only replay data when the object is still
+ * is intended for use in unit tests as it can only replay data when the object is still
  * available.
  */
 case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
@@ -85,8 +88,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
   }
 
   def addData(data: TraversableOnce[A]): Offset = {
-    import sqlContext.implicits._
-    val ds = data.toVector.toDS()
+    val encoded = data.toVector.map(d => encoder.toRow(d).copy())
+    val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true)
+    val ds = Dataset[A](sqlContext.sparkSession, plan)
     logDebug(s"Adding ds: $ds")
     this.synchronized {
       currentOffset = currentOffset + 1
@@ -118,8 +122,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       batches.slice(sliceStart, sliceEnd)
     }
 
-    logDebug(
-      s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
+    logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal))
+
     newBlocks
       .map(_.toDF())
       .reduceOption(_ union _)
@@ -128,6 +132,21 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       }
   }
 
+  private def generateDebugString(
+      blocks: TraversableOnce[Dataset[A]],
+      startOrdinal: Int,
+      endOrdinal: Int): String = {
+    val originalUnsupportedCheck =
+      sqlContext.getConf("spark.sql.streaming.unsupportedOperationCheck")
+    try {
+      sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", "false")
+      s"MemoryBatch [$startOrdinal, $endOrdinal]: " +
+          s"${blocks.flatMap(_.collect()).mkString(", ")}"
+    } finally {
+      sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", originalUnsupportedCheck)
+    }
+  }
+
   override def commit(end: Offset): Unit = synchronized {
     def check(newOffset: LongOffset): Unit = {
       val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index 58c3105..223c3d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -42,14 +42,14 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
 
   private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
     val localRelations = df.queryExecution.optimizedPlan.collect {
-      case l @ LocalRelation(_, _) => l
+      case l @ LocalRelation(_, _, _) => l
     }
     assert(localRelations.size == 1)
   }
 
   private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
     val localRelations = df.queryExecution.optimizedPlan.collect {
-      case l @ LocalRelation(_, _) => l
+      case l @ LocalRelation(_, _, _) => l
     }
     assert(localRelations.size == 0)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
index aecfd30..5828f97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
@@ -40,7 +40,7 @@ class SparkPlannerSuite extends SharedSQLContext {
         case Union(children) =>
           planned += 1
           UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil
-        case LocalRelation(output, data) =>
+        case LocalRelation(output, data, _) =>
           planned += 1
           LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil
         case NeverPlanned =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index d77f0c2..c1d61b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -556,7 +556,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
 
     if (buckets > 0) {
       val bucketed = df.queryExecution.analyzed transform {
-        case l @ LogicalRelation(r: HadoopFsRelation, _, _) =>
+        case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
           l.copy(relation =
             r.copy(bucketSpec =
               Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession))

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index c43c1ec..28e8521 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
         var maybeRelation: Option[HadoopFsRelation] = None
         val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-          case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) =>
+          case PhysicalOperation(_, filters,
+                                 LogicalRelation(relation: HadoopFsRelation, _, _, _)) =>
             maybeRelation = Some(relation)
             filters
         }.flatten.reduceLeftOption(_ && _)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 2f5fd84..837a087 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -651,7 +651,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
         case LogicalRelation(
-            HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) =>
+            HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _, _) =>
           assert(location.partitionSpec() === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 8dc11d8..f951b46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -247,7 +247,7 @@ class JDBCSuite extends SparkFunSuite
   // Check whether the tables are fetched in the expected degree of parallelism
   def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = {
     val jdbcRelations = df.queryExecution.analyzed.collect {
-      case LogicalRelation(r: JDBCRelation, _, _) => r
+      case LogicalRelation(r: JDBCRelation, _, _, _) => r
     }
     assert(jdbcRelations.length == 1)
     assert(jdbcRelations.head.parts.length == expectedNumPartitions,

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index fe9469b..c45b507 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -327,7 +327,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
 
         val table = spark.table("oneToTenFiltered")
         val relation = table.queryExecution.logical.collectFirst {
-          case LogicalRelation(r, _, _) => r
+          case LogicalRelation(r, _, _, _) => r
         }.get
 
         assert(

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 3fd7a5b..85da3f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -135,7 +135,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
 
   private def getPathOption(tableName: String): Option[String] = {
     spark.table(tableName).queryExecution.analyzed.collect {
-      case LogicalRelation(r: TestOptionsRelation, _, _) => r.pathOption
+      case LogicalRelation(r: TestOptionsRelation, _, _, _) => r.pathOption
     }.head
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index a5cf40c..08db06b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -127,7 +127,7 @@ class FileStreamSinkSuite extends StreamTest {
       // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
       // been inferred
       val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
-        case LogicalRelation(baseRelation: HadoopFsRelation, _, _) => baseRelation
+        case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
       }
       assert(hadoopdFsRelations.size === 1)
       assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index e2ec690..b6baaed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1105,7 +1105,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
             def verify(startId: Option[Int], endId: Int, expected: String*): Unit = {
               val start = startId.map(new FileStreamSourceOffset(_))
               val end = FileStreamSourceOffset(endId)
-              assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected)
+
+              withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
+                assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected)
+              }
             }
 
             verify(startId = None, endId = 2, "keep1", "keep2", "keep3")

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6f7b9d3..012cccf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
@@ -728,7 +729,16 @@ class FakeDefaultSource extends FakeSource {
 
       override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
         val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1
-        spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a")
+        val ds = new Dataset[java.lang.Long](
+          spark.sparkSession,
+          Range(
+            startOffset,
+            end.asInstanceOf[LongOffset].offset + 1,
+            1,
+            Some(spark.sparkSession.sparkContext.defaultParallelism),
+            isStreaming = true),
+          Encoders.LONG)
+        ds.toDF("a")
       }
 
       override def stop() {}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index b6e82b6..e0979ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.streaming
 
 import java.util.{Locale, TimeZone}
 
+import org.scalatest.Assertions
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, DataFrame}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.streaming._
@@ -31,12 +33,14 @@ import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.OutputMode._
 import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.StructType
 
 object FailureSinglton {
   var firstTime = true
 }
 
-class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
+class StreamingAggregationSuite extends StateStoreMetricsTest
+    with BeforeAndAfterAll with Assertions {
 
   override def afterAll(): Unit = {
     super.afterAll()
@@ -356,4 +360,25 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
       CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
     )
   }
+
+  test("SPARK-19690: do not convert batch aggregation in streaming query to streaming") {
+    val streamInput = MemoryStream[Int]
+    val batchDF = Seq(1, 2, 3, 4, 5)
+        .toDF("value")
+        .withColumn("parity", 'value % 2)
+        .groupBy('parity)
+        .agg(count("*") as 'joinValue)
+    val joinDF = streamInput
+        .toDF()
+        .join(batchDF, 'value === 'parity)
+
+    // make sure we're planning an aggregate in the first place
+    assert(batchDF.queryExecution.optimizedPlan match { case _: Aggregate => true })
+
+    testStream(joinDF, Append)(
+      AddData(streamInput, 0, 1, 2, 3),
+      CheckLastBatch((0, 0, 2), (1, 1, 3)),
+      AddData(streamInput, 0, 1, 2, 3),
+      CheckLastBatch((0, 0, 2), (1, 1, 3)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 27ea690..969f594 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -647,7 +647,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     val source = new Source() {
       override def schema: StructType = triggerDF.schema
       override def getOffset: Option[Offset] = Some(LongOffset(0))
-      override def getBatch(start: Option[Offset], end: Offset): DataFrame = triggerDF
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        sqlContext.internalCreateDataFrame(
+          triggerDF.queryExecution.toRdd, triggerDF.schema, isStreaming = true)
+      }
       override def stop(): Unit = {}
     }
     StreamingExecutionRelation(source)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index e8a6202..aa163d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -88,7 +88,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
       override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
         import spark.implicits._
 
-        Seq[Int]().toDS().toDF()
+        spark.internalCreateDataFrame(spark.sparkContext.emptyRDD, schema, isStreaming = true)
       }
 
       override def stop() {}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8bab059..f0f2c49 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -73,7 +73,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
     catalogProxy.getCachedTable(tableIdentifier) match {
       case null => None // Cache miss
-      case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
+      case logical @ LogicalRelation(relation: HadoopFsRelation, _, _, _) =>
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
 
         expectedFileFormat match {

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e01198d..83cee5d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -583,7 +583,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
             Row(3) :: Row(4) :: Nil)
 
           table("test_parquet_ctas").queryExecution.optimizedPlan match {
-            case LogicalRelation(p: HadoopFsRelation, _, _) => // OK
+            case LogicalRelation(p: HadoopFsRelation, _, _, _) => // OK
             case _ =>
               fail(s"test_parquet_ctas should have be converted to ${classOf[HadoopFsRelation]}")
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 02cfa02..d2a6ef7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -411,7 +411,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
-      case LogicalRelation(r: HadoopFsRelation, _, _) =>
+      case LogicalRelation(r: HadoopFsRelation, _, _, _) =>
         if (!isDataSourceTable) {
           fail(
             s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " +

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 222c249..de6f0d6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -45,7 +45,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
 
     var maybeRelation: Option[HadoopFsRelation] = None
     val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) =>
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) =>
         maybeRelation = Some(orcRelation)
         filters
     }.flatten.reduceLeftOption(_ && _)
@@ -89,7 +89,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
 
     var maybeRelation: Option[HadoopFsRelation] = None
     val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) =>
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) =>
         maybeRelation = Some(orcRelation)
         filters
     }.flatten.reduceLeftOption(_ && _)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c0c2d09/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 303884d..740e083 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -285,7 +285,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       )
 
       table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(_: HadoopFsRelation, _, _) => // OK
+        case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
         case _ => fail(
           "test_parquet_ctas should be converted to " +
               s"${classOf[HadoopFsRelation ].getCanonicalName }")
@@ -370,7 +370,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       assertResult(2) {
         analyzed.collect {
-          case r @ LogicalRelation(_: HadoopFsRelation, _, _) => r
+          case r @ LogicalRelation(_: HadoopFsRelation, _, _, _) => r
         }.size
       }
     }
@@ -379,7 +379,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   def collectHadoopFsRelation(df: DataFrame): HadoopFsRelation = {
     val plan = df.queryExecution.analyzed
     plan.collectFirst {
-      case LogicalRelation(r: HadoopFsRelation, _, _) => r
+      case LogicalRelation(r: HadoopFsRelation, _, _, _) => r
     }.getOrElse {
       fail(s"Expecting a HadoopFsRelation 2, but got:\n$plan")
     }
@@ -459,7 +459,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       // Converted test_parquet should be cached.
       getCachedDataSourceTable(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
-        case LogicalRelation(_: HadoopFsRelation, _, _) => // OK
+        case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
         case other =>
           fail(
             "The cached test_parquet should be a Parquet Relation. " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org