You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/21 17:51:19 UTC
spark git commit: [SPARK-21147][SS] Throws an analysis exception when
a user-specified schema is given in socket/rate sources
Repository: spark
Updated Branches:
refs/heads/master ad459cfb1 -> 7a00c658d
[SPARK-21147][SS] Throws an analysis exception when a user-specified schema is given in socket/rate sources
## What changes were proposed in this pull request?
This PR proposes to throw an exception if a schema is provided by user to socket source as below:
**socket source**
```scala
import org.apache.spark.sql.types._
val userSpecifiedSchema = StructType(
StructField("name", StringType) ::
StructField("area", StringType) :: Nil)
val df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).schema(userSpecifiedSchema).load
df.printSchema
```
Before
```
root
|-- value: string (nullable = true)
```
After
```
org.apache.spark.sql.AnalysisException: The socket source does not support a user-specified schema.;
at org.apache.spark.sql.execution.streaming.TextSocketSourceProvider.sourceSchema(socket.scala:199)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
... 50 elided
```
**rate source**
```scala
spark.readStream.format("rate").schema(spark.range(1).schema).load().printSchema()
```
Before
```
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)`
```
After
```
org.apache.spark.sql.AnalysisException: The rate source does not support a user-specified schema.;
at org.apache.spark.sql.execution.streaming.RateSourceProvider.sourceSchema(RateSourceProvider.scala:57)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
... 48 elided
```
## How was this patch tested?
Unit test in `TextSocketStreamSuite` and `RateSourceSuite`.
Author: hyukjinkwon <gu...@gmail.com>
Closes #18365 from HyukjinKwon/SPARK-21147.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a00c658
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a00c658
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a00c658
Branch: refs/heads/master
Commit: 7a00c658d44139d950b7d3ecd670d79f76e2e747
Parents: ad459cf
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Jun 21 10:51:17 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jun 21 10:51:17 2017 -0700
----------------------------------------------------------------------
.../sql/execution/streaming/RateSourceProvider.scala | 9 +++++++--
.../spark/sql/execution/streaming/socket.scala | 8 ++++++--
.../sql/execution/streaming/RateSourceSuite.scala | 12 ++++++++++++
.../execution/streaming/TextSocketStreamSuite.scala | 15 +++++++++++++++
4 files changed, 40 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index e61a8eb..e76d4dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -25,7 +25,7 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
@@ -52,8 +52,13 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
- parameters: Map[String, String]): (String, StructType) =
+ parameters: Map[String, String]): (String, StructType) = {
+ if (schema.nonEmpty) {
+ throw new AnalysisException("The rate source does not support a user-specified schema.")
+ }
+
(shortName(), RateSourceProvider.SCHEMA)
+ }
override def createSource(
sqlContext: SQLContext,
http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index 58bff27..8e63207 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -195,13 +195,17 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
if (!parameters.contains("port")) {
throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
}
- val schema =
+ if (schema.nonEmpty) {
+ throw new AnalysisException("The socket source does not support a user-specified schema.")
+ }
+
+ val sourceSchema =
if (parseIncludeTimestamp(parameters)) {
TextSocketSource.SCHEMA_TIMESTAMP
} else {
TextSocketSource.SCHEMA_REGULAR
}
- ("textSocket", schema)
+ ("textSocket", sourceSchema)
}
override def createSource(
http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
index bdba536..03d0f63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.TimeUnit
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.util.ManualClock
@@ -179,4 +180,15 @@ class RateSourceSuite extends StreamTest {
testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive"))
testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive"))
}
+
+ test("user-specified schema given") {
+ val exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("rate")
+ .schema(spark.range(1).schema)
+ .load()
+ }
+ assert(exception.getMessage.contains(
+ "rate source does not support a user-specified schema"))
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
index 5174a04..9ebf4d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
@@ -148,6 +148,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
}
}
+ test("user-specified schema given") {
+ val provider = new TextSocketSourceProvider
+ val userSpecifiedSchema = StructType(
+ StructField("name", StringType) ::
+ StructField("area", StringType) :: Nil)
+ val exception = intercept[AnalysisException] {
+ provider.sourceSchema(
+ sqlContext, Some(userSpecifiedSchema),
+ "",
+ Map("host" -> "localhost", "port" -> "1234"))
+ }
+ assert(exception.getMessage.contains(
+ "socket source does not support a user-specified schema"))
+ }
+
test("no server up") {
val provider = new TextSocketSourceProvider
val parameters = Map("host" -> "localhost", "port" -> "0")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org