You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/17 09:58:52 UTC

spark git commit: [SPARK-22224][SQL] Override toString of KeyValue/Relational-GroupedDataset

Repository: spark
Updated Branches:
  refs/heads/master 8148f19ca -> 99e32f8ba


[SPARK-22224][SQL] Override toString of KeyValue/Relational-GroupedDataset

## What changes were proposed in this pull request?
#### before

```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = org.apache.spark.sql.KeyValueGroupedDataset65214862
```
#### after
```scala
scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = [key: [value: string], value: [value: string]]
```

## How was this patch tested?
existing ut

cc gatorsmile cloud-fan

Author: Kent Yao <ya...@hotmail.com>

Closes #19363 from yaooqinn/minor-dataset-tostring.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99e32f8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99e32f8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99e32f8b

Branch: refs/heads/master
Commit: 99e32f8ba5d908d5408e9857fd96ac1d7d7e5876
Parents: 8148f19
Author: Kent Yao <ya...@hotmail.com>
Authored: Tue Oct 17 17:58:45 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Oct 17 17:58:45 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/KeyValueGroupedDataset.scala      | 22 ++++++-
 .../spark/sql/RelationalGroupedDataset.scala    | 19 +++++-
 .../org/apache/spark/sql/DatasetSuite.scala     | 61 ++++++++++++++++++++
 .../scala/org/apache/spark/sql/QueryTest.scala  | 12 +---
 4 files changed, 100 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index cb42e9e..6bab21d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.function._
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateStruct}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.expressions.ReduceAggregator
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
@@ -564,4 +563,25 @@ class KeyValueGroupedDataset[K, V] private[sql](
       encoder: Encoder[R]): Dataset[R] = {
     cogroup(other)((key, left, right) => f.call(key, left.asJava, right.asJava).asScala)(encoder)
   }
+
+  override def toString: String = {
+    val builder = new StringBuilder
+    val kFields = kExprEnc.schema.map {
+      case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+    }
+    val vFields = vExprEnc.schema.map {
+      case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+    }
+    builder.append("KeyValueGroupedDataset: [key: [")
+    builder.append(kFields.take(2).mkString(", "))
+    if (kFields.length > 2) {
+      builder.append(" ... " + (kFields.length - 2) + " more field(s)")
+    }
+    builder.append("], value: [")
+    builder.append(vFields.take(2).mkString(", "))
+    if (vFields.length > 2) {
+      builder.append(" ... " + (vFields.length - 2) + " more field(s)")
+    }
+    builder.append("]]").toString()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index cd0ac1f..33ec3a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
 import org.apache.spark.sql.execution.python.PythonUDF
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{NumericType, StructField, StructType}
+import org.apache.spark.sql.types.{NumericType, StructType}
 
 /**
  * A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]],
@@ -465,6 +465,19 @@ class RelationalGroupedDataset protected[sql](
 
     Dataset.ofRows(df.sparkSession, plan)
   }
+
+  override def toString: String = {
+    val builder = new StringBuilder
+    builder.append("RelationalGroupedDataset: [grouping expressions: [")
+    val kFields = groupingExprs.map(_.asInstanceOf[NamedExpression]).map {
+      case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+    }
+    builder.append(kFields.take(2).mkString(", "))
+    if (kFields.length > 2) {
+      builder.append(" ... " + (kFields.length - 2) + " more field(s)")
+    }
+    builder.append(s"], value: ${df.toString}, type: $groupType]").toString()
+  }
 }
 
 private[sql] object RelationalGroupedDataset {
@@ -479,7 +492,9 @@ private[sql] object RelationalGroupedDataset {
   /**
    * The Grouping Type
    */
-  private[sql] trait GroupType
+  private[sql] trait GroupType {
+    override def toString: String = getClass.getSimpleName.stripSuffix("$").stripSuffix("Type")
+  }
 
   /**
    * To indicate it's the GroupBy

http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index dace682..1537ce3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1341,8 +1341,69 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       Seq(1).toDS().map(_ => ("", TestForTypeAlias.seqOfTupleTypeAlias)),
       ("", Seq((1, 1), (2, 2))))
   }
+
+  test("Check RelationalGroupedDataset toString: Single data") {
+    val kvDataset = (1 to 3).toDF("id").groupBy("id")
+    val expected = "RelationalGroupedDataset: [" +
+      "grouping expressions: [id: int], value: [id: int], type: GroupBy]"
+    val actual = kvDataset.toString
+    assert(expected === actual)
+  }
+
+  test("Check RelationalGroupedDataset toString: over length schema ") {
+    val kvDataset = (1 to 3).map( x => (x, x.toString, x.toLong))
+      .toDF("id", "val1", "val2").groupBy("id")
+    val expected = "RelationalGroupedDataset:" +
+      " [grouping expressions: [id: int]," +
+      " value: [id: int, val1: string ... 1 more field]," +
+      " type: GroupBy]"
+    val actual = kvDataset.toString
+    assert(expected === actual)
+  }
+
+
+  test("Check KeyValueGroupedDataset toString: Single data") {
+    val kvDataset = (1 to 3).toDF("id").as[SingleData].groupByKey(identity)
+    val expected = "KeyValueGroupedDataset: [key: [id: int], value: [id: int]]"
+    val actual = kvDataset.toString
+    assert(expected === actual)
+  }
+
+  test("Check KeyValueGroupedDataset toString: Unnamed KV-pair") {
+    val kvDataset = (1 to 3).map(x => (x, x.toString))
+      .toDF("id", "val1").as[DoubleData].groupByKey(x => (x.id, x.val1))
+    val expected = "KeyValueGroupedDataset:" +
+      " [key: [_1: int, _2: string]," +
+      " value: [id: int, val1: string]]"
+    val actual = kvDataset.toString
+    assert(expected === actual)
+  }
+
+  test("Check KeyValueGroupedDataset toString: Named KV-pair") {
+    val kvDataset = (1 to 3).map( x => (x, x.toString))
+      .toDF("id", "val1").as[DoubleData].groupByKey(x => DoubleData(x.id, x.val1))
+    val expected = "KeyValueGroupedDataset:" +
+      " [key: [id: int, val1: string]," +
+      " value: [id: int, val1: string]]"
+    val actual = kvDataset.toString
+    assert(expected === actual)
+  }
+
+  test("Check KeyValueGroupedDataset toString: over length schema ") {
+    val kvDataset = (1 to 3).map( x => (x, x.toString, x.toLong))
+      .toDF("id", "val1", "val2").as[TripleData].groupByKey(identity)
+    val expected = "KeyValueGroupedDataset:" +
+      " [key: [id: int, val1: string ... 1 more field(s)]," +
+      " value: [id: int, val1: string ... 1 more field(s)]]"
+    val actual = kvDataset.toString
+    assert(expected === actual)
+  }
 }
 
+case class SingleData(id: Int)
+case class DoubleData(id: Int, val1: String)
+case class TripleData(id: Int, val1: String, val2: Long)
+
 case class WithImmutableMap(id: String, map_test: scala.collection.immutable.Map[Long, String])
 case class WithMap(id: String, map_test: scala.collection.Map[Long, String])
 case class WithMapInOption(m: Option[scala.collection.Map[Int, Int]])

http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index f980883..fcaca3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -17,23 +17,13 @@
 
 package org.apache.spark.sql
 
-import java.util.{ArrayDeque, Locale, TimeZone}
+import java.util.{Locale, TimeZone}
 
 import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
 
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.streaming.MemoryPlan
-import org.apache.spark.sql.types.{Metadata, ObjectType}
 
 
 abstract class QueryTest extends PlanTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org