You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/02/04 01:10:15 UTC

spark git commit: [SPARK-13166][SQL] Remove DataStreamReader/Writer

Repository: spark
Updated Branches:
  refs/heads/master 3221eddb8 -> 915a75398


[SPARK-13166][SQL] Remove DataStreamReader/Writer

They seem redundant and we can simply use DataFrameReader/Writer. The new usage looks like:

```scala
val df = sqlContext.read.stream("...")
val handle = df.write.stream("...")
handle.stop()
```

Author: Reynold Xin <rx...@databricks.com>

Closes #11062 from rxin/SPARK-13166.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/915a7539
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/915a7539
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/915a7539

Branch: refs/heads/master
Commit: 915a75398ecbccdbf9a1e07333104c857ae1ce5e
Parents: 3221edd
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 3 16:10:11 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Feb 3 16:10:11 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  |  10 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  29 +++-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  36 ++++-
 .../org/apache/spark/sql/DataStreamReader.scala | 127 ------------------
 .../org/apache/spark/sql/DataStreamWriter.scala | 134 -------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  11 +-
 .../datasources/ResolvedDataSource.scala        |   1 -
 .../sql/streaming/DataStreamReaderSuite.scala   |  53 ++++----
 8 files changed, 86 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6de17e5..84203bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1682,7 +1682,7 @@ class DataFrame private[sql](
 
   /**
    * :: Experimental ::
-   * Interface for saving the content of the [[DataFrame]] out into external storage.
+   * Interface for saving the content of the [[DataFrame]] out into external storage or streams.
    *
    * @group output
    * @since 1.4.0
@@ -1691,14 +1691,6 @@ class DataFrame private[sql](
   def write: DataFrameWriter = new DataFrameWriter(this)
 
   /**
-   * :: Experimental ::
-   * Interface for starting a streaming query that will continually output results to the specified
-   * external sink as new data arrives.
-   */
-  @Experimental
-  def streamTo: DataStreamWriter = new DataStreamWriter(this)
-
-  /**
    * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
    * @group rdd
    * @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 2e0c6c7..a58643a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,17 +29,17 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{CatalystQl}
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.execution.datasources.json.JSONRelation
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
+import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.types.StructType
 
 /**
  * :: Experimental ::
  * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
- * key-value stores, etc). Use [[SQLContext.read]] to access this.
+ * key-value stores, etc) or data streams. Use [[SQLContext.read]] to access this.
  *
  * @since 1.4.0
  */
@@ -137,6 +137,30 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
   }
 
   /**
+   * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
+   * (e.g. external key-value stores).
+   *
+   * @since 2.0.0
+   */
+  def stream(): DataFrame = {
+    val resolved = ResolvedDataSource.createSource(
+      sqlContext,
+      userSpecifiedSchema = userSpecifiedSchema,
+      providerName = source,
+      options = extraOptions.toMap)
+    DataFrame(sqlContext, StreamingRelation(resolved))
+  }
+
+  /**
+   * Loads input in as a [[DataFrame]], for data streams that read from some path.
+   *
+   * @since 2.0.0
+   */
+  def stream(path: String): DataFrame = {
+    option("path", path).stream()
+  }
+
+  /**
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
    * url named table and connection properties.
    *
@@ -165,7 +189,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
    *                             tag/value. Normally at least a "user" and "password" property
    *                             should be included.
-   *
    * @since 1.4.0
    */
   def jdbc(

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 12eb239..80447fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,17 +22,18 @@ import java.util.Properties
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
 import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.sources.HadoopFsRelation
 
 /**
  * :: Experimental ::
  * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
- * key-value stores, etc). Use [[DataFrame.write]] to access this.
+ * key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this.
  *
  * @since 1.4.0
  */
@@ -184,6 +185,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   /**
+   * Starts the execution of the streaming query, which will continually output results to the given
+   * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+   * the stream.
+   *
+   * @since 2.0.0
+   */
+  def stream(path: String): ContinuousQuery = {
+    option("path", path).stream()
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually output results to the given
+   * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+   * the stream.
+   *
+   * @since 2.0.0
+   */
+  def stream(): ContinuousQuery = {
+    val sink = ResolvedDataSource.createSink(
+      df.sqlContext,
+      source,
+      extraOptions.toMap,
+      normalizedParCols.getOrElse(Nil))
+
+    new StreamExecution(df.sqlContext, df.logicalPlan, sink)
+  }
+
+  /**
    * Inserts the content of the [[DataFrame]] to the specified table. It requires that
    * the schema of the [[DataFrame]] is the same as the schema of the table.
    *
@@ -255,7 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
   /**
    * The given column name may not be equal to any of the existing column names if we were in
-   * case-insensitive context.  Normalize the given column name to the real one so that we don't
+   * case-insensitive context. Normalize the given column name to the real one so that we don't
    * need to care about case sensitivity afterwards.
    */
   private def normalize(columnName: String, columnType: String): String = {
@@ -339,7 +368,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
    *                             tag/value. Normally at least a "user" and "password" property
    *                             should be included.
-   *
    * @since 1.4.0
    */
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
deleted file mode 100644
index 2febc93..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.Logging
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
-import org.apache.spark.sql.execution.streaming.StreamingRelation
-import org.apache.spark.sql.types.StructType
-
-/**
- * :: Experimental ::
- * An interface to reading streaming data.  Use `sqlContext.streamFrom` to access these methods.
- *
- * {{{
- *   val df = sqlContext.streamFrom
- *    .format("...")
- *    .open()
- * }}}
- */
-@Experimental
-class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {
-
-  /**
-   * Specifies the input data source format.
-   *
-   * @since 2.0.0
-   */
-  def format(source: String): DataStreamReader = {
-    this.source = source
-    this
-  }
-
-  /**
-   * Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema
-   * automatically from data. By specifying the schema here, the underlying data stream can
-   * skip the schema inference step, and thus speed up data reading.
-   *
-   * @since 2.0.0
-   */
-  def schema(schema: StructType): DataStreamReader = {
-    this.userSpecifiedSchema = Option(schema)
-    this
-  }
-
-  /**
-   * Adds an input option for the underlying data stream.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: String): DataStreamReader = {
-    this.extraOptions += (key -> value)
-    this
-  }
-
-  /**
-   * (Scala-specific) Adds input options for the underlying data stream.
-   *
-   * @since 2.0.0
-   */
-  def options(options: scala.collection.Map[String, String]): DataStreamReader = {
-    this.extraOptions ++= options
-    this
-  }
-
-  /**
-   * Adds input options for the underlying data stream.
-   *
-   * @since 2.0.0
-   */
-  def options(options: java.util.Map[String, String]): DataStreamReader = {
-    this.options(options.asScala)
-    this
-  }
-
-  /**
-   * Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g.
-   * external key-value stores).
-   *
-   * @since 2.0.0
-   */
-  def open(): DataFrame = {
-    val resolved = ResolvedDataSource.createSource(
-      sqlContext,
-      userSpecifiedSchema = userSpecifiedSchema,
-      providerName = source,
-      options = extraOptions.toMap)
-    DataFrame(sqlContext, StreamingRelation(resolved))
-  }
-
-  /**
-   * Loads input in as a [[DataFrame]], for data streams that read from some path.
-   *
-   * @since 2.0.0
-   */
-  def open(path: String): DataFrame = {
-    option("path", path).open()
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////
-  // Builder pattern config options
-  ///////////////////////////////////////////////////////////////////////////////////////
-
-  private var source: String = sqlContext.conf.defaultDataSourceName
-
-  private var userSpecifiedSchema: Option[StructType] = None
-
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
deleted file mode 100644
index b325d48..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.datasources.ResolvedDataSource
-import org.apache.spark.sql.execution.streaming.StreamExecution
-
-/**
- * :: Experimental ::
- * Interface used to start a streaming query query execution.
- *
- * @since 2.0.0
- */
-@Experimental
-final class DataStreamWriter private[sql](df: DataFrame) {
-
-  /**
-   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
-   *
-   * @since 2.0.0
-   */
-  def format(source: String): DataStreamWriter = {
-    this.source = source
-    this
-  }
-
-  /**
-   * Adds an output option for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: String): DataStreamWriter = {
-    this.extraOptions += (key -> value)
-    this
-  }
-
-  /**
-   * (Scala-specific) Adds output options for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
-    this.extraOptions ++= options
-    this
-  }
-
-  /**
-   * Adds output options for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def options(options: java.util.Map[String, String]): DataStreamWriter = {
-    this.options(options.asScala)
-    this
-  }
-
-  /**
-   * Partitions the output by the given columns on the file system. If specified, the output is
-   * laid out on the file system similar to Hive's partitioning scheme.\
-   * @since 2.0.0
-   */
-  @scala.annotation.varargs
-  def partitionBy(colNames: String*): DataStreamWriter = {
-    this.partitioningColumns = colNames
-    this
-  }
-
-  /**
-   * Starts the execution of the streaming query, which will continually output results to the given
-   * path as new data arrives.  The returned [[ContinuousQuery]] object can be used to interact with
-   * the stream.
-   * @since 2.0.0
-   */
-  def start(path: String): ContinuousQuery = {
-    this.extraOptions += ("path" -> path)
-    start()
-  }
-
-  /**
-   * Starts the execution of the streaming query, which will continually output results to the given
-   * path as new data arrives.  The returned [[ContinuousQuery]] object can be used to interact with
-   * the stream.
-   *
-   * @since 2.0.0
-   */
-  def start(): ContinuousQuery = {
-    val sink = ResolvedDataSource.createSink(
-      df.sqlContext,
-      source,
-      extraOptions.toMap,
-      normalizedParCols)
-
-    new StreamExecution(df.sqlContext, df.logicalPlan, sink)
-  }
-
-  private def normalizedParCols: Seq[String] = {
-    partitioningColumns.map { col =>
-      df.logicalPlan.output
-        .map(_.name)
-        .find(df.sqlContext.analyzer.resolver(_, col))
-        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
-            s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
-    }
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////
-  // Builder pattern config options
-  ///////////////////////////////////////////////////////////////////////////////////////
-
-  private var source: String = df.sqlContext.conf.defaultDataSourceName
-
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
-
-  private var partitioningColumns: Seq[String] = Nil
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 13700be..1661fdb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -579,10 +579,9 @@ class SQLContext private[sql](
     DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
   }
 
-
   /**
    * :: Experimental ::
-   * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
+   * Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]].
    * {{{
    *   sqlContext.read.parquet("/path/to/file.parquet")
    *   sqlContext.read.schema(schema).json("/path/to/file.json")
@@ -594,14 +593,6 @@ class SQLContext private[sql](
   @Experimental
   def read: DataFrameReader = new DataFrameReader(this)
 
-
-  /**
-   * :: Experimental ::
-   * Returns a [[DataStreamReader]] than can be used to access data continuously as it arrives.
-   */
-  @Experimental
-  def streamFrom: DataStreamReader = new DataStreamReader(this)
-
   /**
    * :: Experimental ::
    * Creates an external table from the given path and returns the corresponding DataFrame.

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index e3065ac..7702f53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -122,7 +122,6 @@ object ResolvedDataSource extends Logging {
     provider.createSink(sqlContext, options, partitionColumns)
   }
 
-
   /** Create a [[ResolvedDataSource]] for reading data in. */
   def apply(
       sqlContext: SQLContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/915a7539/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
index 1dab6eb..b36b41c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
@@ -60,22 +60,22 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
   import testImplicits._
 
   test("resolve default source") {
-    sqlContext.streamFrom
+    sqlContext.read
       .format("org.apache.spark.sql.streaming.test")
-      .open()
-      .streamTo
+      .stream()
+      .write
       .format("org.apache.spark.sql.streaming.test")
-      .start()
+      .stream()
       .stop()
   }
 
   test("resolve full class") {
-    sqlContext.streamFrom
+    sqlContext.read
       .format("org.apache.spark.sql.streaming.test.DefaultSource")
-      .open()
-      .streamTo
+      .stream()
+      .write
       .format("org.apache.spark.sql.streaming.test")
-      .start()
+      .stream()
       .stop()
   }
 
@@ -83,12 +83,12 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
     val map = new java.util.HashMap[String, String]
     map.put("opt3", "3")
 
-    val df = sqlContext.streamFrom
+    val df = sqlContext.read
         .format("org.apache.spark.sql.streaming.test")
         .option("opt1", "1")
         .options(Map("opt2" -> "2"))
         .options(map)
-        .open()
+        .stream()
 
     assert(LastOptions.parameters("opt1") == "1")
     assert(LastOptions.parameters("opt2") == "2")
@@ -96,12 +96,12 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
 
     LastOptions.parameters = null
 
-    df.streamTo
+    df.write
       .format("org.apache.spark.sql.streaming.test")
       .option("opt1", "1")
       .options(Map("opt2" -> "2"))
       .options(map)
-      .start()
+      .stream()
       .stop()
 
     assert(LastOptions.parameters("opt1") == "1")
@@ -110,54 +110,53 @@ class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
   }
 
   test("partitioning") {
-    val df = sqlContext.streamFrom
+    val df = sqlContext.read
       .format("org.apache.spark.sql.streaming.test")
-      .open()
+      .stream()
 
-    df.streamTo
+    df.write
       .format("org.apache.spark.sql.streaming.test")
-      .start()
+      .stream()
       .stop()
     assert(LastOptions.partitionColumns == Nil)
 
-    df.streamTo
+    df.write
       .format("org.apache.spark.sql.streaming.test")
       .partitionBy("a")
-      .start()
+      .stream()
       .stop()
     assert(LastOptions.partitionColumns == Seq("a"))
 
-
     withSQLConf("spark.sql.caseSensitive" -> "false") {
-      df.streamTo
+      df.write
         .format("org.apache.spark.sql.streaming.test")
         .partitionBy("A")
-        .start()
+        .stream()
         .stop()
       assert(LastOptions.partitionColumns == Seq("a"))
     }
 
     intercept[AnalysisException] {
-      df.streamTo
+      df.write
         .format("org.apache.spark.sql.streaming.test")
         .partitionBy("b")
-        .start()
+        .stream()
         .stop()
     }
   }
 
   test("stream paths") {
-    val df = sqlContext.streamFrom
+    val df = sqlContext.read
       .format("org.apache.spark.sql.streaming.test")
-      .open("/test")
+      .stream("/test")
 
     assert(LastOptions.parameters("path") == "/test")
 
     LastOptions.parameters = null
 
-    df.streamTo
+    df.write
       .format("org.apache.spark.sql.streaming.test")
-      .start("/test")
+      .stream("/test")
       .stop()
 
     assert(LastOptions.parameters("path") == "/test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org