You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/06/22 18:19:03 UTC

spark git commit: [SPARK-20599][SS] ConsoleSink should work with (batch)

Repository: spark
Updated Branches:
  refs/heads/master 19331b8e4 -> e55a105ae


[SPARK-20599][SS] ConsoleSink should work with (batch)

## What changes were proposed in this pull request?

Currently, if we read a batch and want to display it on the console sink, it will lead a runtime exception.

Changes:

- In this PR, we add a match rule to check whether it is a ConsoleSinkProvider, we will display the Dataset
 if using console format.

## How was this patch tested?

spark.read.schema().json(path).write.format("console").save

Author: Lubo Zhang <lu...@intel.com>
Author: lubozhan <lu...@intel.com>

Closes #18347 from lubozhan/dev.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e55a105a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e55a105a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e55a105a

Branch: refs/heads/master
Commit: e55a105ae04f1d1c35ee8f02005a3ab71d789124
Parents: 19331b8
Author: Lubo Zhang <lu...@intel.com>
Authored: Thu Jun 22 11:18:58 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Jun 22 11:18:58 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/console.scala | 28 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e55a105a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 38c6319..9e889ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -19,8 +19,10 @@ package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types.StructType
 
 class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
   // Number of rows to display, by default 20 rows
@@ -51,7 +53,14 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
   }
 }
 
-class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister {
+case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
+  extends BaseRelation {
+  override def schema: StructType = data.schema
+}
+
+class ConsoleSinkProvider extends StreamSinkProvider
+  with DataSourceRegister
+  with CreatableRelationProvider {
   def createSink(
       sqlContext: SQLContext,
       parameters: Map[String, String],
@@ -60,5 +69,20 @@ class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister {
     new ConsoleSink(parameters)
   }
 
+  def createRelation(
+      sqlContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    // Number of rows to display, by default 20 rows
+    val numRowsToShow = parameters.get("numRows").map(_.toInt).getOrElse(20)
+
+    // Truncate the displayed data if it is too long, by default it is true
+    val isTruncated = parameters.get("truncate").map(_.toBoolean).getOrElse(true)
+    data.showInternal(numRowsToShow, isTruncated)
+
+    ConsoleRelation(sqlContext, data)
+  }
+
   def shortName(): String = "console"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org