You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/04/06 19:05:05 UTC
spark git commit: [SPARK-14288][SQL] Memory Sink for streaming
Repository: spark
Updated Branches:
refs/heads/master 5e64dab86 -> 59236e5c5
[SPARK-14288][SQL] Memory Sink for streaming
This PR exposes the internal testing `MemorySink` though the data source API. This will allow users to easily test streaming applications in the Spark shell or other local tests.
Usage:
```scala
inputStream.write
.format("memory")
.queryName("memStream")
.startStream()
// Now you can query the result of the stream here.
sqlContext.table("memStream")
```
The most complicated part of the logic is choosing the checkpoint directory. There are a few requirements we are attempting to satisfy here:
- when working in the shell locally, it should just work with no extra configuration.
- when working on a cluster you should be able to make it easily create the checkpoint on a distributed file system so you can test aggregation (state checkpoints are also stored in this directory and must be accessible from workers).
- it should be clear that you can't resume since the data is just in memory.
The chosen algorithm proceeds as follows:
- the user gives a checkpoint directory, use it
- if the conf has a checkpoint location, use `$location/$queryName`
- if neither, create a local directory
- always check to make sure there are no offsets written to the directory
Author: Michael Armbrust <mi...@databricks.com>
Closes #12119 from marmbrus/memorySink.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59236e5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59236e5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59236e5c
Branch: refs/heads/master
Commit: 59236e5c5b9d24f90fcf8d09b23ae8b06355657e
Parents: 5e64dab
Author: Michael Armbrust <mi...@databricks.com>
Authored: Wed Apr 6 10:05:02 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Apr 6 10:05:02 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameWriter.scala | 79 ++++++++++++++-----
.../spark/sql/execution/SparkStrategies.scala | 6 ++
.../spark/sql/execution/streaming/memory.scala | 8 ++
.../scala/org/apache/spark/sql/QueryTest.scala | 2 +
.../spark/sql/streaming/MemorySinkSuite.scala | 82 ++++++++++++++++++++
5 files changed, 159 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3332a99..54d2508 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelation
+import org.apache.spark.util.Utils
/**
* :: Experimental ::
@@ -275,23 +277,64 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
def startStream(): ContinuousQuery = {
- val dataSource =
- DataSource(
- df.sqlContext,
- className = source,
- options = extraOptions.toMap,
- partitionColumns = normalizedParCols.getOrElse(Nil))
-
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
- val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
- new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
- })
- df.sqlContext.sessionState.continuousQueryManager.startQuery(
- queryName,
- checkpointLocation,
- df,
- dataSource.createSink(),
- trigger)
+ if (source == "memory") {
+ val queryName =
+ extraOptions.getOrElse(
+ "queryName", throw new AnalysisException("queryName must be specified for memory sink"))
+ val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified =>
+ new Path(userSpecified).toUri.toString
+ }.orElse {
+ val checkpointConfig: Option[String] =
+ df.sqlContext.conf.getConf(
+ SQLConf.CHECKPOINT_LOCATION,
+ None)
+
+ checkpointConfig.map { location =>
+ new Path(location, queryName).toUri.toString
+ }
+ }.getOrElse {
+ Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
+ }
+
+ // If offsets have already been created, we trying to resume a query.
+ val checkpointPath = new Path(checkpointLocation, "offsets")
+ val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+ if (fs.exists(checkpointPath)) {
+ throw new AnalysisException(
+ s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
+ } else {
+ checkpointPath.toUri.toString
+ }
+
+ val sink = new MemorySink(df.schema)
+ val resultDf = Dataset.ofRows(df.sqlContext, new MemoryPlan(sink))
+ resultDf.registerTempTable(queryName)
+ val continuousQuery = df.sqlContext.sessionState.continuousQueryManager.startQuery(
+ queryName,
+ checkpointLocation,
+ df,
+ sink,
+ trigger)
+ continuousQuery
+ } else {
+ val dataSource =
+ DataSource(
+ df.sqlContext,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
+
+ val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
+ val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
+ new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
+ })
+ df.sqlContext.sessionState.continuousQueryManager.startQuery(
+ queryName,
+ checkpointLocation,
+ df,
+ dataSource.createSink(),
+ trigger)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5f3128d..d77aba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
@@ -30,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeCommand => RunnableDescri
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
+import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
@@ -332,6 +334,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
+ case MemoryPlan(sink, output) =>
+ val encoder = RowEncoder(sink.schema)
+ LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
+
case logical.Distinct(child) =>
throw new IllegalStateException(
"logical distinct operator should have been replaced by aggregate in the optimizer")
http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b652530..351ef40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -25,6 +25,8 @@ import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.types.StructType
object MemoryStream {
@@ -136,3 +138,9 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
}
}
+/**
+ * Used to query the data that has been written into a [[MemorySink]].
+ */
+case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode {
+ def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 4e62fac..48a077d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.MemoryPlan
abstract class QueryTest extends PlanTest {
@@ -200,6 +201,7 @@ abstract class QueryTest extends PlanTest {
logicalPlan.transform {
case _: ObjectOperator => return
case _: LogicalRelation => return
+ case _: MemoryPlan => return
}.transformAllExpressions {
case a: ImperativeAggregate => return
}
http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
new file mode 100644
index 0000000..5249aa2
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Row, StreamTest}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+class MemorySinkSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("registering as a table") {
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .startStream()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDataset(
+ sqlContext.table("memStream").as[Int],
+ 1, 2, 3)
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDataset(
+ sqlContext.table("memStream").as[Int],
+ 1, 2, 3, 4, 5, 6)
+
+ query.stop()
+ }
+
+ test("error when no name is specified") {
+ val error = intercept[AnalysisException] {
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .startStream()
+ }
+
+ assert(error.message contains "queryName must be specified")
+ }
+
+ test("error if attempting to resume specific checkpoint") {
+ val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
+
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .option("checkpointLocation", location)
+ .startStream()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+ query.stop()
+
+ intercept[AnalysisException] {
+ input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .option("checkpointLocation", location)
+ .startStream()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org