You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/22 18:19:03 UTC
spark git commit: [SPARK-20599][SS] ConsoleSink should work with
(batch)
Repository: spark
Updated Branches:
refs/heads/master 19331b8e4 -> e55a105ae
[SPARK-20599][SS] ConsoleSink should work with (batch)
## What changes were proposed in this pull request?
Currently, if we read a batch and want to display it on the console sink, it will lead a runtime exception.
Changes:
- In this PR, we add a match rule to check whether it is a ConsoleSinkProvider, we will display the Dataset
if using console format.
## How was this patch tested?
spark.read.schema().json(path).write.format("console").save
Author: Lubo Zhang <lu...@intel.com>
Author: lubozhan <lu...@intel.com>
Closes #18347 from lubozhan/dev.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e55a105a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e55a105a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e55a105a
Branch: refs/heads/master
Commit: e55a105ae04f1d1c35ee8f02005a3ab71d789124
Parents: 19331b8
Author: Lubo Zhang <lu...@intel.com>
Authored: Thu Jun 22 11:18:58 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Jun 22 11:18:58 2017 -0700
----------------------------------------------------------------------
.../spark/sql/execution/streaming/console.scala | 28 ++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e55a105a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 38c6319..9e889ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -19,8 +19,10 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types.StructType
class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
// Number of rows to display, by default 20 rows
@@ -51,7 +53,14 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
}
}
-class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister {
+case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
+ extends BaseRelation {
+ override def schema: StructType = data.schema
+}
+
+class ConsoleSinkProvider extends StreamSinkProvider
+ with DataSourceRegister
+ with CreatableRelationProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
@@ -60,5 +69,20 @@ class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister {
new ConsoleSink(parameters)
}
+ def createRelation(
+ sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ // Number of rows to display, by default 20 rows
+ val numRowsToShow = parameters.get("numRows").map(_.toInt).getOrElse(20)
+
+ // Truncate the displayed data if it is too long, by default it is true
+ val isTruncated = parameters.get("truncate").map(_.toBoolean).getOrElse(true)
+ data.showInternal(numRowsToShow, isTruncated)
+
+ ConsoleRelation(sqlContext, data)
+ }
+
def shortName(): String = "console"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org