You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/04/05 04:04:02 UTC

spark git commit: [SPARK-14287] isStreaming method for Dataset

Repository: spark
Updated Branches:
  refs/heads/master 7201f033c -> ba24d1ee9


[SPARK-14287] isStreaming method for Dataset

With the addition of StreamExecution (ContinuousQuery) to Datasets, data will become unbounded. With unbounded data, the execution of some methods and operations will not make sense, e.g. `Dataset.count()`.

A simple API is required to check whether the data in a Dataset is bounded or unbounded. This will allow users to check whether their Dataset is in streaming mode or not. ML algorithms may check if the data is unbounded and throw an exception for example.

The implementation of this method is simple, however naming it is the challenge. Some possible names for this method are:
 - isStreaming
 - isContinuous
 - isBounded
 - isUnbounded

I've gone with `isStreaming` for now. We can change it before Spark 2.0 if we decide to come up with a different name. For that reason I've marked it as `Experimental`

Author: Burak Yavuz <br...@gmail.com>

Closes #12080 from brkyvz/is-streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba24d1ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba24d1ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba24d1ee

Branch: refs/heads/master
Commit: ba24d1ee9a1d97ca82282f3b811ec011c4285b99
Parents: 7201f03
Author: Burak Yavuz <br...@gmail.com>
Authored: Mon Apr 4 19:04:09 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Apr 4 19:04:09 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/Dataset.scala | 15 +++++++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala | 18 ++++++++++++++++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba24d1ee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 8dfe8ff..db2134b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
+import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -450,6 +451,20 @@ class Dataset[T] private[sql](
   def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
 
   /**
+   * Returns true if this [[Dataset]] contains one or more sources that continuously
+   * return data as it arrives. A [[Dataset]] that reads data from a streaming source
+   * must be executed as a [[ContinuousQuery]] using the `startStream()` method in
+   * [[DataFrameWriter]].  Methods that return a single answer, (e.g., `count()` or
+   * `collect()`) will throw an [[AnalysisException]] when there is a streaming
+   * source present.
+   *
+   * @group basic
+   * @since 2.0.0
+   */
+  @Experimental
+  def isStreaming: Boolean = logicalPlan.find(_.isInstanceOf[StreamingRelation]).isDefined
+
+  /**
    * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
    * and all cells will be aligned right. For example:
    * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/ba24d1ee/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2aa9056..e8e8010 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -23,6 +23,7 @@ import java.sql.{Date, Timestamp}
 import scala.language.postfixOps
 
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
+import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
@@ -602,6 +603,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       TupleClass(1, "a")
     )
   }
+
+  test("isStreaming returns false for static Dataset") {
+    val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
+    assert(!data.isStreaming, "static Dataset returned true for 'isStreaming'.")
+  }
+
+  test("isStreaming returns true for streaming Dataset") {
+    val data = MemoryStream[Int].toDS()
+    assert(data.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
+  }
+
+  test("isStreaming returns true after static and streaming Dataset join") {
+    val static = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("a", "b")
+    val streaming = MemoryStream[Int].toDS().toDF("b")
+    val df = streaming.join(static, Seq("b"))
+    assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
+  }
 }
 
 case class OtherTuple(_1: String, _2: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org