You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/17 09:58:52 UTC
spark git commit: [SPARK-22224][SQL] Override toString of
KeyValue/Relational-GroupedDataset
Repository: spark
Updated Branches:
refs/heads/master 8148f19ca -> 99e32f8ba
[SPARK-22224][SQL] Override toString of KeyValue/Relational-GroupedDataset
## What changes were proposed in this pull request?
#### before
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = org.apache.spark.sql.KeyValueGroupedDataset65214862
```
#### after
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = [key: [value: string], value: [value: string]]
```
## How was this patch tested?
existing ut
cc gatorsmile cloud-fan
Author: Kent Yao <ya...@hotmail.com>
Closes #19363 from yaooqinn/minor-dataset-tostring.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99e32f8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99e32f8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99e32f8b
Branch: refs/heads/master
Commit: 99e32f8ba5d908d5408e9857fd96ac1d7d7e5876
Parents: 8148f19
Author: Kent Yao <ya...@hotmail.com>
Authored: Tue Oct 17 17:58:45 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Oct 17 17:58:45 2017 +0800
----------------------------------------------------------------------
.../spark/sql/KeyValueGroupedDataset.scala | 22 ++++++-
.../spark/sql/RelationalGroupedDataset.scala | 19 +++++-
.../org/apache/spark/sql/DatasetSuite.scala | 61 ++++++++++++++++++++
.../scala/org/apache/spark/sql/QueryTest.scala | 12 +---
4 files changed, 100 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index cb42e9e..6bab21d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.function._
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateStruct}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.ReduceAggregator
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
@@ -564,4 +563,25 @@ class KeyValueGroupedDataset[K, V] private[sql](
encoder: Encoder[R]): Dataset[R] = {
cogroup(other)((key, left, right) => f.call(key, left.asJava, right.asJava).asScala)(encoder)
}
+
+ override def toString: String = {
+ val builder = new StringBuilder
+ val kFields = kExprEnc.schema.map {
+ case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+ }
+ val vFields = vExprEnc.schema.map {
+ case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+ }
+ builder.append("KeyValueGroupedDataset: [key: [")
+ builder.append(kFields.take(2).mkString(", "))
+ if (kFields.length > 2) {
+ builder.append(" ... " + (kFields.length - 2) + " more field(s)")
+ }
+ builder.append("], value: [")
+ builder.append(vFields.take(2).mkString(", "))
+ if (vFields.length > 2) {
+ builder.append(" ... " + (vFields.length - 2) + " more field(s)")
+ }
+ builder.append("]]").toString()
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index cd0ac1f..33ec3a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.python.PythonUDF
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{NumericType, StructField, StructType}
+import org.apache.spark.sql.types.{NumericType, StructType}
/**
* A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]],
@@ -465,6 +465,19 @@ class RelationalGroupedDataset protected[sql](
Dataset.ofRows(df.sparkSession, plan)
}
+
+ override def toString: String = {
+ val builder = new StringBuilder
+ builder.append("RelationalGroupedDataset: [grouping expressions: [")
+ val kFields = groupingExprs.map(_.asInstanceOf[NamedExpression]).map {
+ case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+ }
+ builder.append(kFields.take(2).mkString(", "))
+ if (kFields.length > 2) {
+ builder.append(" ... " + (kFields.length - 2) + " more field(s)")
+ }
+ builder.append(s"], value: ${df.toString}, type: $groupType]").toString()
+ }
}
private[sql] object RelationalGroupedDataset {
@@ -479,7 +492,9 @@ private[sql] object RelationalGroupedDataset {
/**
* The Grouping Type
*/
- private[sql] trait GroupType
+ private[sql] trait GroupType {
+ override def toString: String = getClass.getSimpleName.stripSuffix("$").stripSuffix("Type")
+ }
/**
* To indicate it's the GroupBy
http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index dace682..1537ce3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1341,8 +1341,69 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
Seq(1).toDS().map(_ => ("", TestForTypeAlias.seqOfTupleTypeAlias)),
("", Seq((1, 1), (2, 2))))
}
+
+ test("Check RelationalGroupedDataset toString: Single data") {
+ val kvDataset = (1 to 3).toDF("id").groupBy("id")
+ val expected = "RelationalGroupedDataset: [" +
+ "grouping expressions: [id: int], value: [id: int], type: GroupBy]"
+ val actual = kvDataset.toString
+ assert(expected === actual)
+ }
+
+ test("Check RelationalGroupedDataset toString: over length schema ") {
+ val kvDataset = (1 to 3).map( x => (x, x.toString, x.toLong))
+ .toDF("id", "val1", "val2").groupBy("id")
+ val expected = "RelationalGroupedDataset:" +
+ " [grouping expressions: [id: int]," +
+ " value: [id: int, val1: string ... 1 more field]," +
+ " type: GroupBy]"
+ val actual = kvDataset.toString
+ assert(expected === actual)
+ }
+
+
+ test("Check KeyValueGroupedDataset toString: Single data") {
+ val kvDataset = (1 to 3).toDF("id").as[SingleData].groupByKey(identity)
+ val expected = "KeyValueGroupedDataset: [key: [id: int], value: [id: int]]"
+ val actual = kvDataset.toString
+ assert(expected === actual)
+ }
+
+ test("Check KeyValueGroupedDataset toString: Unnamed KV-pair") {
+ val kvDataset = (1 to 3).map(x => (x, x.toString))
+ .toDF("id", "val1").as[DoubleData].groupByKey(x => (x.id, x.val1))
+ val expected = "KeyValueGroupedDataset:" +
+ " [key: [_1: int, _2: string]," +
+ " value: [id: int, val1: string]]"
+ val actual = kvDataset.toString
+ assert(expected === actual)
+ }
+
+ test("Check KeyValueGroupedDataset toString: Named KV-pair") {
+ val kvDataset = (1 to 3).map( x => (x, x.toString))
+ .toDF("id", "val1").as[DoubleData].groupByKey(x => DoubleData(x.id, x.val1))
+ val expected = "KeyValueGroupedDataset:" +
+ " [key: [id: int, val1: string]," +
+ " value: [id: int, val1: string]]"
+ val actual = kvDataset.toString
+ assert(expected === actual)
+ }
+
+ test("Check KeyValueGroupedDataset toString: over length schema ") {
+ val kvDataset = (1 to 3).map( x => (x, x.toString, x.toLong))
+ .toDF("id", "val1", "val2").as[TripleData].groupByKey(identity)
+ val expected = "KeyValueGroupedDataset:" +
+ " [key: [id: int, val1: string ... 1 more field(s)]," +
+ " value: [id: int, val1: string ... 1 more field(s)]]"
+ val actual = kvDataset.toString
+ assert(expected === actual)
+ }
}
+case class SingleData(id: Int)
+case class DoubleData(id: Int, val1: String)
+case class TripleData(id: Int, val1: String, val2: Long)
+
case class WithImmutableMap(id: String, map_test: scala.collection.immutable.Map[Long, String])
case class WithMap(id: String, map_test: scala.collection.Map[Long, String])
case class WithMapInOption(m: Option[scala.collection.Map[Int, Int]])
http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index f980883..fcaca3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -17,23 +17,13 @@
package org.apache.spark.sql
-import java.util.{ArrayDeque, Locale, TimeZone}
+import java.util.{Locale, TimeZone}
import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.streaming.MemoryPlan
-import org.apache.spark.sql.types.{Metadata, ObjectType}
abstract class QueryTest extends PlanTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org