You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/22 00:43:21 UTC
spark git commit: [SPARK-18234][SS] Made update mode public
Repository: spark
Updated Branches:
refs/heads/master afd9bc1d8 -> 83a6ace0d
[SPARK-18234][SS] Made update mode public
## What changes were proposed in this pull request?
Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec
## How was this patch tested?
Added new tests in changed modules
Author: Tathagata Das <ta...@gmail.com>
Closes #16360 from tdas/SPARK-18234.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a6ace0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a6ace0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a6ace0
Branch: refs/heads/master
Commit: 83a6ace0d1be44f70e768348ae6688798c84343e
Parents: afd9bc1
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 21 16:43:17 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 21 16:43:17 2016 -0800
----------------------------------------------------------------------
.../apache/spark/sql/streaming/OutputMode.java | 12 +-
.../apache/spark/sql/InternalOutputModes.scala | 47 ---
.../analysis/UnsupportedOperationChecker.scala | 3 +-
.../streaming/InternalOutputModes.scala | 47 +++
.../analysis/UnsupportedOperationsSuite.scala | 2 +-
.../sql/execution/datasources/DataSource.scala | 2 +-
.../execution/streaming/StatefulAggregate.scala | 61 ++--
.../spark/sql/execution/streaming/memory.scala | 5 +-
.../spark/sql/streaming/DataStreamWriter.scala | 17 +-
.../execution/streaming/MemorySinkSuite.scala | 287 +++++++++++++++++++
.../sql/streaming/EventTimeWatermarkSuite.scala | 55 +++-
.../sql/streaming/FileStreamSinkSuite.scala | 22 +-
.../sql/streaming/FileStreamSourceSuite.scala | 2 +-
.../spark/sql/streaming/MemorySinkSuite.scala | 274 ------------------
.../spark/sql/streaming/StreamSuite.scala | 8 +-
.../streaming/StreamingAggregationSuite.scala | 2 +-
.../test/DataStreamReaderWriterSuite.scala | 38 ++-
17 files changed, 507 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index a515c1a..cf0579f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.InternalOutputModes;
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes;
/**
* :: Experimental ::
@@ -54,4 +54,14 @@ public class OutputMode {
public static OutputMode Complete() {
return InternalOutputModes.Complete$.MODULE$;
}
+
+ /**
+ * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
+ * be written to the sink every time there are some updates.
+ *
+ * @since 2.1.1
+ */
+ public static OutputMode Update() {
+ return InternalOutputModes.Update$.MODULE$;
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
deleted file mode 100644
index 594c41c..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.streaming.OutputMode
-
-/**
- * Internal helper class to generate objects representing various `OutputMode`s,
- */
-private[sql] object InternalOutputModes {
-
- /**
- * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
- * written to the sink. This output mode can be only be used in queries that do not
- * contain any aggregation.
- */
- case object Append extends OutputMode
-
- /**
- * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
- * to the sink every time these is some updates. This output mode can only be used in queries
- * that contain aggregations.
- */
- case object Complete extends OutputMode
-
- /**
- * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
- * written to the sink every time these is some updates. This output mode can only be used in
- * queries that contain aggregations.
- */
- case object Update extends OutputMode
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c4a78f9..60d9881 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,11 +17,12 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.streaming.OutputMode
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
new file mode 100644
index 0000000..915f4a9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+import org.apache.spark.sql.streaming.OutputMode
+
+/**
+ * Internal helper class to generate objects representing various `OutputMode`s,
+ */
+private[sql] object InternalOutputModes {
+
+ /**
+ * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
+ * written to the sink. This output mode can be only be used in queries that do not
+ * contain any aggregation.
+ */
+ case object Append extends OutputMode
+
+ /**
+ * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
+ * to the sink every time these is some updates. This output mode can only be used in queries
+ * that contain aggregations.
+ */
+ case object Complete extends OutputMode
+
+ /**
+ * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
+ * written to the sink every time these is some updates. This output mode can only be used in
+ * queries that contain aggregations.
+ */
+ case object Update extends OutputMode
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 34e94c7..94a008f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -27,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.IntegerType
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5245c14..ac3f068 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -278,7 +278,7 @@ case class DataSource(
throw new IllegalArgumentException("'path' is not specified")
})
if (outputMode != OutputMode.Append) {
- throw new IllegalArgumentException(
+ throw new AnalysisException(
s"Data source $className does not support $outputMode output mode")
}
new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 7af978a..0551e4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -21,11 +21,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, Predicate}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution
-import org.apache.spark.sql.InternalOutputModes._
-import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.SparkPlan
@@ -108,6 +108,30 @@ case class StateStoreSaveExec(
"numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"))
+ /** Generate a predicate that matches data older than the watermark */
+ private lazy val watermarkPredicate: Option[Predicate] = {
+ val optionalWatermarkAttribute =
+ keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
+
+ optionalWatermarkAttribute.map { watermarkAttribute =>
+ // If we are evicting based on a window, use the end of the window. Otherwise just
+ // use the attribute itself.
+ val evictionExpression =
+ if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+ LessThanOrEqual(
+ GetStructField(watermarkAttribute, 1),
+ Literal(eventTimeWatermark.get * 1000))
+ } else {
+ LessThanOrEqual(
+ watermarkAttribute,
+ Literal(eventTimeWatermark.get * 1000))
+ }
+
+ logInfo(s"Filtering state store on: $evictionExpression")
+ newPredicate(evictionExpression, keyExpressions)
+ }
+ }
+
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
assert(outputMode.nonEmpty,
@@ -151,25 +175,8 @@ case class StateStoreSaveExec(
numUpdatedStateRows += 1
}
- val watermarkAttribute =
- keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
- // If we are evicting based on a window, use the end of the window. Otherwise just
- // use the attribute itself.
- val evictionExpression =
- if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
- LessThanOrEqual(
- GetStructField(watermarkAttribute, 1),
- Literal(eventTimeWatermark.get * 1000))
- } else {
- LessThanOrEqual(
- watermarkAttribute,
- Literal(eventTimeWatermark.get * 1000))
- }
-
- logInfo(s"Filtering state store on: $evictionExpression")
- val predicate = newPredicate(evictionExpression, keyExpressions)
- store.remove(predicate.eval)
-
+ // Assumption: Append mode can be done only when watermark has been specified
+ store.remove(watermarkPredicate.get.eval)
store.commit()
numTotalStateRows += store.numKeys()
@@ -180,11 +187,19 @@ case class StateStoreSaveExec(
// Update and output modified rows from the StateStore.
case Some(Update) =>
+
new Iterator[InternalRow] {
- private[this] val baseIterator = iter
+
+ // Filter late date using watermark if specified
+ private[this] val baseIterator = watermarkPredicate match {
+ case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
+ case None => iter
+ }
override def hasNext: Boolean = {
if (!baseIterator.hasNext) {
+ // Remove old aggregates if watermark specified
+ if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval)
store.commit()
numTotalStateRows += store.numKeys()
false
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b699be2..91da6b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -193,11 +194,11 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
if (notCommitted) {
logDebug(s"Committing batch $batchId to $this")
outputMode match {
- case InternalOutputModes.Append | InternalOutputModes.Update =>
+ case Append | Update =>
val rows = AddedData(batchId, data.collect())
synchronized { batches += rows }
- case InternalOutputModes.Complete =>
+ case Complete =>
val rows = AddedData(batchId, data.collect())
synchronized {
batches.clear()
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b7fc336..6c0c5e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
@@ -65,9 +66,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
OutputMode.Append
case "complete" =>
OutputMode.Complete
+ case "update" =>
+ OutputMode.Update
case _ =>
throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
- "Accepted output modes are 'append' and 'complete'")
+ "Accepted output modes are 'append', 'complete', 'update'")
}
this
}
@@ -99,7 +102,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
this
}
-
/**
* Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
@@ -219,7 +221,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
}
-
+ val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
+ outputMode match {
+ case Append | Complete => // allowed
+ case Update =>
+ throw new AnalysisException(
+ s"Update output mode is not supported for memory sink. $supportedModes")
+ case _ =>
+ throw new AnalysisException(
+ s"$outputMode is not supported for memory sink. $supportedModes")
+ }
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
new file mode 100644
index 0000000..ca724fc
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.language.implicitConversions
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+class MemorySinkSuite extends StreamTest with BeforeAndAfter {
+
+ import testImplicits._
+
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("directly add data in Append output mode") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, OutputMode.Append)
+
+ // Before adding data, check output
+ assert(sink.latestBatchId === None)
+ checkAnswer(sink.latestBatchData, Seq.empty)
+ checkAnswer(sink.allData, Seq.empty)
+
+ // Add batch 0 and check outputs
+ sink.addBatch(0, 1 to 3)
+ assert(sink.latestBatchId === Some(0))
+ checkAnswer(sink.latestBatchData, 1 to 3)
+ checkAnswer(sink.allData, 1 to 3)
+
+ // Add batch 1 and check outputs
+ sink.addBatch(1, 4 to 6)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
+
+ // Re-add batch 1 with different data, should not be added and outputs should not be changed
+ sink.addBatch(1, 7 to 9)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6)
+
+ // Add batch 2 and check outputs
+ sink.addBatch(2, 7 to 9)
+ assert(sink.latestBatchId === Some(2))
+ checkAnswer(sink.latestBatchData, 7 to 9)
+ checkAnswer(sink.allData, 1 to 9)
+ }
+
+ test("directly add data in Update output mode") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, OutputMode.Update)
+
+ // Before adding data, check output
+ assert(sink.latestBatchId === None)
+ checkAnswer(sink.latestBatchData, Seq.empty)
+ checkAnswer(sink.allData, Seq.empty)
+
+ // Add batch 0 and check outputs
+ sink.addBatch(0, 1 to 3)
+ assert(sink.latestBatchId === Some(0))
+ checkAnswer(sink.latestBatchData, 1 to 3)
+ checkAnswer(sink.allData, 1 to 3)
+
+ // Add batch 1 and check outputs
+ sink.addBatch(1, 4 to 6)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
+
+ // Re-add batch 1 with different data, should not be added and outputs should not be changed
+ sink.addBatch(1, 7 to 9)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 1 to 6)
+
+ // Add batch 2 and check outputs
+ sink.addBatch(2, 7 to 9)
+ assert(sink.latestBatchId === Some(2))
+ checkAnswer(sink.latestBatchData, 7 to 9)
+ checkAnswer(sink.allData, 1 to 9)
+ }
+
+ test("directly add data in Complete output mode") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, OutputMode.Complete)
+
+ // Before adding data, check output
+ assert(sink.latestBatchId === None)
+ checkAnswer(sink.latestBatchData, Seq.empty)
+ checkAnswer(sink.allData, Seq.empty)
+
+ // Add batch 0 and check outputs
+ sink.addBatch(0, 1 to 3)
+ assert(sink.latestBatchId === Some(0))
+ checkAnswer(sink.latestBatchData, 1 to 3)
+ checkAnswer(sink.allData, 1 to 3)
+
+ // Add batch 1 and check outputs
+ sink.addBatch(1, 4 to 6)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 4 to 6) // new data should replace old data
+
+ // Re-add batch 1 with different data, should not be added and outputs should not be changed
+ sink.addBatch(1, 7 to 9)
+ assert(sink.latestBatchId === Some(1))
+ checkAnswer(sink.latestBatchData, 4 to 6)
+ checkAnswer(sink.allData, 4 to 6)
+
+ // Add batch 2 and check outputs
+ sink.addBatch(2, 7 to 9)
+ assert(sink.latestBatchId === Some(2))
+ checkAnswer(sink.latestBatchData, 7 to 9)
+ checkAnswer(sink.allData, 7 to 9)
+ }
+
+
+ test("registering as a table in Append output mode - supported") {
+ val input = MemoryStream[Int]
+ val query = input.toDF().writeStream
+ .format("memory")
+ .outputMode("append")
+ .queryName("memStream")
+ .start()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDataset(
+ spark.table("memStream").as[Int],
+ 1, 2, 3)
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDataset(
+ spark.table("memStream").as[Int],
+ 1, 2, 3, 4, 5, 6)
+
+ query.stop()
+ }
+
+ test("registering as a table in Complete output mode - supported") {
+ val input = MemoryStream[Int]
+ val query = input.toDF()
+ .groupBy("value")
+ .count()
+ .writeStream
+ .format("memory")
+ .outputMode("complete")
+ .queryName("memStream")
+ .start()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDatasetUnorderly(
+ spark.table("memStream").as[(Int, Long)],
+ (1, 1L), (2, 1L), (3, 1L))
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDatasetUnorderly(
+ spark.table("memStream").as[(Int, Long)],
+ (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L))
+
+ query.stop()
+ }
+
+ test("registering as a table in Update output mode - not supported") {
+ val input = MemoryStream[Int]
+ val df = input.toDF()
+ .groupBy("value")
+ .count()
+ intercept[AnalysisException] {
+ df.writeStream
+ .format("memory")
+ .outputMode("update")
+ .queryName("memStream")
+ .start()
+ }
+ }
+
+ test("MemoryPlan statistics") {
+ implicit val schema = new StructType().add(new StructField("value", IntegerType))
+ val sink = new MemorySink(schema, OutputMode.Append)
+ val plan = new MemoryPlan(sink)
+
+ // Before adding data, check output
+ checkAnswer(sink.allData, Seq.empty)
+ assert(plan.statistics.sizeInBytes === 0)
+
+ sink.addBatch(0, 1 to 3)
+ assert(plan.statistics.sizeInBytes === 12)
+
+ sink.addBatch(1, 4 to 6)
+ assert(plan.statistics.sizeInBytes === 24)
+ }
+
+ ignore("stress test") {
+ // Ignore the stress test as it takes several minutes to run
+ (0 until 1000).foreach { _ =>
+ val input = MemoryStream[Int]
+ val query = input.toDF().writeStream
+ .format("memory")
+ .queryName("memStream")
+ .start()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDataset(
+ spark.table("memStream").as[Int],
+ 1, 2, 3)
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDataset(
+ spark.table("memStream").as[Int],
+ 1, 2, 3, 4, 5, 6)
+
+ query.stop()
+ }
+ }
+
+ test("error when no name is specified") {
+ val error = intercept[AnalysisException] {
+ val input = MemoryStream[Int]
+ val query = input.toDF().writeStream
+ .format("memory")
+ .start()
+ }
+
+ assert(error.message contains "queryName must be specified")
+ }
+
+ test("error if attempting to resume specific checkpoint") {
+ val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
+
+ val input = MemoryStream[Int]
+ val query = input.toDF().writeStream
+ .format("memory")
+ .queryName("memStream")
+ .option("checkpointLocation", location)
+ .start()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+ query.stop()
+
+ intercept[AnalysisException] {
+ input.toDF().writeStream
+ .format("memory")
+ .queryName("memStream")
+ .option("checkpointLocation", location)
+ .start()
+ }
+ }
+
+ private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = {
+ checkAnswer(
+ sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema),
+ intsToDF(expected)(schema))
+ }
+
+ private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
+ require(schema.fields.size === 1)
+ sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index bdfba95..23f51ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.streaming
import java.{util => ju}
import java.text.SimpleDateFormat
-import java.util.{Calendar, Date}
+import java.util.Date
import org.scalatest.BeforeAndAfter
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
-import org.apache.spark.sql.InternalOutputModes.Complete
+import org.apache.spark.sql.streaming.OutputMode._
class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
@@ -117,7 +117,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}
- test("append-mode watermark aggregation") {
+ test("append mode") {
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
@@ -129,11 +129,42 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
testStream(windowedAggregation)(
AddData(inputData, 10, 11, 12, 13, 14, 15),
- CheckAnswer(),
- AddData(inputData, 25), // Advance watermark to 15 seconds
- CheckAnswer(),
- AddData(inputData, 25), // Evict items less than previous watermark.
- CheckAnswer((10, 5))
+ CheckLastBatch(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckLastBatch(),
+ assertNumStateRows(3),
+ AddData(inputData, 25), // Emit items less than watermark and drop their state
+ CheckLastBatch((10, 5)),
+ assertNumStateRows(2),
+ AddData(inputData, 10), // Should not emit anything as data less than watermark
+ CheckLastBatch(),
+ assertNumStateRows(2)
+ )
+ }
+
+ test("update mode") {
+ val inputData = MemoryStream[Int]
+ spark.conf.set("spark.sql.shuffle.partitions", "10")
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation, OutputMode.Update)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15),
+ CheckLastBatch((10, 5), (15, 1)),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckLastBatch((25, 1)),
+ assertNumStateRows(3),
+ AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
+ CheckLastBatch((25, 2)),
+ assertNumStateRows(2),
+ AddData(inputData, 10), // Should not emit anything as data less than watermark
+ CheckLastBatch(),
+ assertNumStateRows(2)
)
}
@@ -271,6 +302,12 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}
+ private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
+ val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
+ assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
+ true
+ }
+
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 54efae3..22f59f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
@@ -210,6 +210,26 @@ class FileStreamSinkSuite extends StreamTest {
}
}
+ test("Update and Complete output mode not supported") {
+ val df = MemoryStream[Int].toDF().groupBy().count()
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+
+ withTempDir { dir =>
+
+ def testOutputMode(mode: String): Unit = {
+ val e = intercept[AnalysisException] {
+ df.writeStream.format("parquet").outputMode(mode).start(dir.getCanonicalPath)
+ }
+ Seq(mode, "not support").foreach { w =>
+ assert(e.getMessage.toLowerCase.contains(w))
+ }
+ }
+
+ testOutputMode("update")
+ testOutputMode("complete")
+ }
+ }
+
test("parquet") {
testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2d218f4..55d927a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -899,7 +899,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// This is to avoid actually running a Spark job with 10000 tasks
val df = files.filter("1 == 0").groupBy().count()
- testStream(df, InternalOutputModes.Complete)(
+ testStream(df, OutputMode.Complete)(
AddTextFileData("0", src, tmp),
CheckAnswer(0)
)
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
deleted file mode 100644
index 4e9fba9..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import scala.language.implicitConversions
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-import org.apache.spark.util.Utils
-
-class MemorySinkSuite extends StreamTest with BeforeAndAfter {
-
- import testImplicits._
-
- after {
- sqlContext.streams.active.foreach(_.stop())
- }
-
- test("directly add data in Append output mode") {
- implicit val schema = new StructType().add(new StructField("value", IntegerType))
- val sink = new MemorySink(schema, InternalOutputModes.Append)
-
- // Before adding data, check output
- assert(sink.latestBatchId === None)
- checkAnswer(sink.latestBatchData, Seq.empty)
- checkAnswer(sink.allData, Seq.empty)
-
- // Add batch 0 and check outputs
- sink.addBatch(0, 1 to 3)
- assert(sink.latestBatchId === Some(0))
- checkAnswer(sink.latestBatchData, 1 to 3)
- checkAnswer(sink.allData, 1 to 3)
-
- // Add batch 1 and check outputs
- sink.addBatch(1, 4 to 6)
- assert(sink.latestBatchId === Some(1))
- checkAnswer(sink.latestBatchData, 4 to 6)
- checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
-
- // Re-add batch 1 with different data, should not be added and outputs should not be changed
- sink.addBatch(1, 7 to 9)
- assert(sink.latestBatchId === Some(1))
- checkAnswer(sink.latestBatchData, 4 to 6)
- checkAnswer(sink.allData, 1 to 6)
-
- // Add batch 2 and check outputs
- sink.addBatch(2, 7 to 9)
- assert(sink.latestBatchId === Some(2))
- checkAnswer(sink.latestBatchData, 7 to 9)
- checkAnswer(sink.allData, 1 to 9)
- }
-
- test("directly add data in Update output mode") {
- implicit val schema = new StructType().add(new StructField("value", IntegerType))
- val sink = new MemorySink(schema, InternalOutputModes.Update)
-
- // Before adding data, check output
- assert(sink.latestBatchId === None)
- checkAnswer(sink.latestBatchData, Seq.empty)
- checkAnswer(sink.allData, Seq.empty)
-
- // Add batch 0 and check outputs
- sink.addBatch(0, 1 to 3)
- assert(sink.latestBatchId === Some(0))
- checkAnswer(sink.latestBatchData, 1 to 3)
- checkAnswer(sink.allData, 1 to 3)
-
- // Add batch 1 and check outputs
- sink.addBatch(1, 4 to 6)
- assert(sink.latestBatchId === Some(1))
- checkAnswer(sink.latestBatchData, 4 to 6)
- checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
-
- // Re-add batch 1 with different data, should not be added and outputs should not be changed
- sink.addBatch(1, 7 to 9)
- assert(sink.latestBatchId === Some(1))
- checkAnswer(sink.latestBatchData, 4 to 6)
- checkAnswer(sink.allData, 1 to 6)
-
- // Add batch 2 and check outputs
- sink.addBatch(2, 7 to 9)
- assert(sink.latestBatchId === Some(2))
- checkAnswer(sink.latestBatchData, 7 to 9)
- checkAnswer(sink.allData, 1 to 9)
- }
-
- test("directly add data in Complete output mode") {
- implicit val schema = new StructType().add(new StructField("value", IntegerType))
- val sink = new MemorySink(schema, InternalOutputModes.Complete)
-
- // Before adding data, check output
- assert(sink.latestBatchId === None)
- checkAnswer(sink.latestBatchData, Seq.empty)
- checkAnswer(sink.allData, Seq.empty)
-
- // Add batch 0 and check outputs
- sink.addBatch(0, 1 to 3)
- assert(sink.latestBatchId === Some(0))
- checkAnswer(sink.latestBatchData, 1 to 3)
- checkAnswer(sink.allData, 1 to 3)
-
- // Add batch 1 and check outputs
- sink.addBatch(1, 4 to 6)
- assert(sink.latestBatchId === Some(1))
- checkAnswer(sink.latestBatchData, 4 to 6)
- checkAnswer(sink.allData, 4 to 6) // new data should replace old data
-
- // Re-add batch 1 with different data, should not be added and outputs should not be changed
- sink.addBatch(1, 7 to 9)
- assert(sink.latestBatchId === Some(1))
- checkAnswer(sink.latestBatchData, 4 to 6)
- checkAnswer(sink.allData, 4 to 6)
-
- // Add batch 2 and check outputs
- sink.addBatch(2, 7 to 9)
- assert(sink.latestBatchId === Some(2))
- checkAnswer(sink.latestBatchData, 7 to 9)
- checkAnswer(sink.allData, 7 to 9)
- }
-
-
- test("registering as a table in Append output mode") {
- val input = MemoryStream[Int]
- val query = input.toDF().writeStream
- .format("memory")
- .outputMode("append")
- .queryName("memStream")
- .start()
- input.addData(1, 2, 3)
- query.processAllAvailable()
-
- checkDataset(
- spark.table("memStream").as[Int],
- 1, 2, 3)
-
- input.addData(4, 5, 6)
- query.processAllAvailable()
- checkDataset(
- spark.table("memStream").as[Int],
- 1, 2, 3, 4, 5, 6)
-
- query.stop()
- }
-
- test("registering as a table in Complete output mode") {
- val input = MemoryStream[Int]
- val query = input.toDF()
- .groupBy("value")
- .count()
- .writeStream
- .format("memory")
- .outputMode("complete")
- .queryName("memStream")
- .start()
- input.addData(1, 2, 3)
- query.processAllAvailable()
-
- checkDatasetUnorderly(
- spark.table("memStream").as[(Int, Long)],
- (1, 1L), (2, 1L), (3, 1L))
-
- input.addData(4, 5, 6)
- query.processAllAvailable()
- checkDatasetUnorderly(
- spark.table("memStream").as[(Int, Long)],
- (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L))
-
- query.stop()
- }
-
- test("MemoryPlan statistics") {
- implicit val schema = new StructType().add(new StructField("value", IntegerType))
- val sink = new MemorySink(schema, InternalOutputModes.Append)
- val plan = new MemoryPlan(sink)
-
- // Before adding data, check output
- checkAnswer(sink.allData, Seq.empty)
- assert(plan.statistics.sizeInBytes === 0)
-
- sink.addBatch(0, 1 to 3)
- assert(plan.statistics.sizeInBytes === 12)
-
- sink.addBatch(1, 4 to 6)
- assert(plan.statistics.sizeInBytes === 24)
- }
-
- ignore("stress test") {
- // Ignore the stress test as it takes several minutes to run
- (0 until 1000).foreach { _ =>
- val input = MemoryStream[Int]
- val query = input.toDF().writeStream
- .format("memory")
- .queryName("memStream")
- .start()
- input.addData(1, 2, 3)
- query.processAllAvailable()
-
- checkDataset(
- spark.table("memStream").as[Int],
- 1, 2, 3)
-
- input.addData(4, 5, 6)
- query.processAllAvailable()
- checkDataset(
- spark.table("memStream").as[Int],
- 1, 2, 3, 4, 5, 6)
-
- query.stop()
- }
- }
-
- test("error when no name is specified") {
- val error = intercept[AnalysisException] {
- val input = MemoryStream[Int]
- val query = input.toDF().writeStream
- .format("memory")
- .start()
- }
-
- assert(error.message contains "queryName must be specified")
- }
-
- test("error if attempting to resume specific checkpoint") {
- val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
-
- val input = MemoryStream[Int]
- val query = input.toDF().writeStream
- .format("memory")
- .queryName("memStream")
- .option("checkpointLocation", location)
- .start()
- input.addData(1, 2, 3)
- query.processAllAvailable()
- query.stop()
-
- intercept[AnalysisException] {
- input.toDF().writeStream
- .format("memory")
- .queryName("memStream")
- .option("checkpointLocation", location)
- .start()
- }
- }
-
- private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = {
- checkAnswer(
- sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema),
- intsToDF(expected)(schema))
- }
-
- private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
- require(schema.fields.size === 1)
- sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 4a64054..b8fa82d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import scala.util.control.ControlThrowable
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -265,10 +266,9 @@ class StreamSuite extends StreamTest {
}
test("output mode API in Scala") {
- val o1 = OutputMode.Append
- assert(o1 === InternalOutputModes.Append)
- val o2 = OutputMode.Complete
- assert(o2 === InternalOutputModes.Complete)
+ assert(OutputMode.Append === InternalOutputModes.Append)
+ assert(OutputMode.Complete === InternalOutputModes.Complete)
+ assert(OutputMode.Update === InternalOutputModes.Update)
}
test("explain") {
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index fbe560e..eca2647 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -23,13 +23,13 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.OutputMode._
object FailureSinglton {
var firstTime = true
http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 9de3da3..097dd6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.PrivateMethodTester.PrivateMethod
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
+import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -105,7 +106,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}
}
-class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
+class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with PrivateMethodTester {
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
@@ -388,19 +389,40 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
- test("check outputMode(string) throws exception on unsupported modes") {
- def testError(outputMode: String): Unit = {
+ test("supported strings in outputMode(string)") {
+ val outputModeMethod = PrivateMethod[OutputMode]('outputMode)
+
+ def testMode(outputMode: String, expected: OutputMode): Unit = {
+ val df = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+ val w = df.writeStream
+ w.outputMode(outputMode)
+ val setOutputMode = w invokePrivate outputModeMethod()
+ assert(setOutputMode === expected)
+ }
+
+ testMode("append", OutputMode.Append)
+ testMode("Append", OutputMode.Append)
+ testMode("complete", OutputMode.Complete)
+ testMode("Complete", OutputMode.Complete)
+ testMode("update", OutputMode.Update)
+ testMode("Update", OutputMode.Update)
+ }
+
+ test("unsupported strings in outputMode(string)") {
+ def testMode(outputMode: String): Unit = {
+ val acceptedModes = Seq("append", "update", "complete")
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
val w = df.writeStream
val e = intercept[IllegalArgumentException](w.outputMode(outputMode))
- Seq("output mode", "unknown", outputMode).foreach { s =>
+ (Seq("output mode", "unknown", outputMode) ++ acceptedModes).foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
}
}
- testError("Update")
- testError("Xyz")
+ testMode("Xyz")
}
test("check foreach() catches null writers") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org