You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/02/05 22:44:40 UTC
spark git commit: [SPARK-13166][SQL] Rename
DataStreamReaderWriterSuite to DataFrameReaderWriterSuite
Repository: spark
Updated Branches:
refs/heads/master 82d84ff2d -> 7b73f1719
[SPARK-13166][SQL] Rename DataStreamReaderWriterSuite to DataFrameReaderWriterSuite
A follow up PR for #11062 because it didn't rename the test suite.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #11096 from zsxwing/rename.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b73f171
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b73f171
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b73f171
Branch: refs/heads/master
Commit: 7b73f1719cff233645c7850a5dbc8ed2dc9c9a58
Parents: 82d84ff
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Feb 5 13:44:34 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Feb 5 13:44:34 2016 -0800
----------------------------------------------------------------------
.../streaming/DataFrameReaderWriterSuite.scala | 190 +++++++++++++++++++
.../sql/streaming/DataStreamReaderSuite.scala | 190 -------------------
2 files changed, 190 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7b73f171/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
new file mode 100644
index 0000000..36212e4
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import org.apache.spark.sql.{AnalysisException, SQLContext, StreamTest}
+import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+object LastOptions {
+ var parameters: Map[String, String] = null
+ var schema: Option[StructType] = null
+ var partitionColumns: Seq[String] = Nil
+}
+
+/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
+class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+ override def createSource(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: Option[StructType]): Source = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = schema
+ new Source {
+ override def getNextBatch(start: Option[Offset]): Option[Batch] = None
+ override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+ }
+ }
+
+ override def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String]): Sink = {
+ LastOptions.parameters = parameters
+ LastOptions.partitionColumns = partitionColumns
+ new Sink {
+ override def addBatch(batch: Batch): Unit = {}
+ override def currentOffset: Option[Offset] = None
+ }
+ }
+}
+
+class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("resolve default source") {
+ sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+ .write
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+ .stop()
+ }
+
+ test("resolve full class") {
+ sqlContext.read
+ .format("org.apache.spark.sql.streaming.test.DefaultSource")
+ .stream()
+ .write
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+ .stop()
+ }
+
+ test("options") {
+ val map = new java.util.HashMap[String, String]
+ map.put("opt3", "3")
+
+ val df = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .option("opt1", "1")
+ .options(Map("opt2" -> "2"))
+ .options(map)
+ .stream()
+
+ assert(LastOptions.parameters("opt1") == "1")
+ assert(LastOptions.parameters("opt2") == "2")
+ assert(LastOptions.parameters("opt3") == "3")
+
+ LastOptions.parameters = null
+
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .option("opt1", "1")
+ .options(Map("opt2" -> "2"))
+ .options(map)
+ .stream()
+ .stop()
+
+ assert(LastOptions.parameters("opt1") == "1")
+ assert(LastOptions.parameters("opt2") == "2")
+ assert(LastOptions.parameters("opt3") == "3")
+ }
+
+ test("partitioning") {
+ val df = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+ .stop()
+ assert(LastOptions.partitionColumns == Nil)
+
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .partitionBy("a")
+ .stream()
+ .stop()
+ assert(LastOptions.partitionColumns == Seq("a"))
+
+ withSQLConf("spark.sql.caseSensitive" -> "false") {
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .partitionBy("A")
+ .stream()
+ .stop()
+ assert(LastOptions.partitionColumns == Seq("a"))
+ }
+
+ intercept[AnalysisException] {
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .partitionBy("b")
+ .stream()
+ .stop()
+ }
+ }
+
+ test("stream paths") {
+ val df = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream("/test")
+
+ assert(LastOptions.parameters("path") == "/test")
+
+ LastOptions.parameters = null
+
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .stream("/test")
+ .stop()
+
+ assert(LastOptions.parameters("path") == "/test")
+ }
+
+ test("test different data types for options") {
+ val df = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .option("intOpt", 56)
+ .option("boolOpt", false)
+ .option("doubleOpt", 6.7)
+ .stream("/test")
+
+ assert(LastOptions.parameters("intOpt") == "56")
+ assert(LastOptions.parameters("boolOpt") == "false")
+ assert(LastOptions.parameters("doubleOpt") == "6.7")
+
+ LastOptions.parameters = null
+ df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .option("intOpt", 56)
+ .option("boolOpt", false)
+ .option("doubleOpt", 6.7)
+ .stream("/test")
+ .stop()
+
+ assert(LastOptions.parameters("intOpt") == "56")
+ assert(LastOptions.parameters("boolOpt") == "false")
+ assert(LastOptions.parameters("doubleOpt") == "6.7")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7b73f171/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
deleted file mode 100644
index 95a17f3..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming.test
-
-import org.apache.spark.sql.{AnalysisException, SQLContext, StreamTest}
-import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source}
-import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-
-object LastOptions {
- var parameters: Map[String, String] = null
- var schema: Option[StructType] = null
- var partitionColumns: Seq[String] = Nil
-}
-
-/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
-class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
- override def createSource(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- schema: Option[StructType]): Source = {
- LastOptions.parameters = parameters
- LastOptions.schema = schema
- new Source {
- override def getNextBatch(start: Option[Offset]): Option[Batch] = None
- override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
- }
- }
-
- override def createSink(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- partitionColumns: Seq[String]): Sink = {
- LastOptions.parameters = parameters
- LastOptions.partitionColumns = partitionColumns
- new Sink {
- override def addBatch(batch: Batch): Unit = {}
- override def currentOffset: Option[Offset] = None
- }
- }
-}
-
-class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
- import testImplicits._
-
- test("resolve default source") {
- sqlContext.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- .write
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- .stop()
- }
-
- test("resolve full class") {
- sqlContext.read
- .format("org.apache.spark.sql.streaming.test.DefaultSource")
- .stream()
- .write
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- .stop()
- }
-
- test("options") {
- val map = new java.util.HashMap[String, String]
- map.put("opt3", "3")
-
- val df = sqlContext.read
- .format("org.apache.spark.sql.streaming.test")
- .option("opt1", "1")
- .options(Map("opt2" -> "2"))
- .options(map)
- .stream()
-
- assert(LastOptions.parameters("opt1") == "1")
- assert(LastOptions.parameters("opt2") == "2")
- assert(LastOptions.parameters("opt3") == "3")
-
- LastOptions.parameters = null
-
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .option("opt1", "1")
- .options(Map("opt2" -> "2"))
- .options(map)
- .stream()
- .stop()
-
- assert(LastOptions.parameters("opt1") == "1")
- assert(LastOptions.parameters("opt2") == "2")
- assert(LastOptions.parameters("opt3") == "3")
- }
-
- test("partitioning") {
- val df = sqlContext.read
- .format("org.apache.spark.sql.streaming.test")
- .stream()
-
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .stream()
- .stop()
- assert(LastOptions.partitionColumns == Nil)
-
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .partitionBy("a")
- .stream()
- .stop()
- assert(LastOptions.partitionColumns == Seq("a"))
-
- withSQLConf("spark.sql.caseSensitive" -> "false") {
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .partitionBy("A")
- .stream()
- .stop()
- assert(LastOptions.partitionColumns == Seq("a"))
- }
-
- intercept[AnalysisException] {
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .partitionBy("b")
- .stream()
- .stop()
- }
- }
-
- test("stream paths") {
- val df = sqlContext.read
- .format("org.apache.spark.sql.streaming.test")
- .stream("/test")
-
- assert(LastOptions.parameters("path") == "/test")
-
- LastOptions.parameters = null
-
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .stream("/test")
- .stop()
-
- assert(LastOptions.parameters("path") == "/test")
- }
-
- test("test different data types for options") {
- val df = sqlContext.read
- .format("org.apache.spark.sql.streaming.test")
- .option("intOpt", 56)
- .option("boolOpt", false)
- .option("doubleOpt", 6.7)
- .stream("/test")
-
- assert(LastOptions.parameters("intOpt") == "56")
- assert(LastOptions.parameters("boolOpt") == "false")
- assert(LastOptions.parameters("doubleOpt") == "6.7")
-
- LastOptions.parameters = null
- df.write
- .format("org.apache.spark.sql.streaming.test")
- .option("intOpt", 56)
- .option("boolOpt", false)
- .option("doubleOpt", 6.7)
- .stream("/test")
- .stop()
-
- assert(LastOptions.parameters("intOpt") == "56")
- assert(LastOptions.parameters("boolOpt") == "false")
- assert(LastOptions.parameters("doubleOpt") == "6.7")
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org