You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dilip Biswal (JIRA)" <ji...@apache.org> on 2018/08/31 17:25:00 UTC

[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

    [ https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599048#comment-16599048 ] 

Dilip Biswal commented on SPARK-25279:
--------------------------------------

Hello,

Tried against the latest trunk. Seems to work fine.
{code:java}

 scala> import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.expressions.Aggregator

scala> import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.Encoder

scala> import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.Encoders

scala> import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession

scala>

scala> case class Employee(name: String, salary: Long)
 defined class Employee

scala> case class Average(var sum: Long, var count: Long)
 defined class Average

scala>

scala> object MyAverage extends Aggregator[Employee, Average, Double] {
|// A zero value for this aggregation. Should satisfy the property that any b + zero = b|
|def zero: Average = Average(0L, 0L)|
|// Combine two values to produce a new value. For performance, the function may modify `buffer`|
|// and return it instead of constructing a new object|
|def reduce(buffer: Average, employee: Employee): Average = \{ \| buffer.sum += employee.salary \| buffer.count += 1 \| buffer \| }|
|// Merge two intermediate values|
|def merge(b1: Average, b2: Average): Average = \{ \| b1.sum += b2.sum \| b1.count += b2.count \| b1 \| }|
|// Transform the output of the reduction|
|def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count|
|// Specifies the Encoder for the intermediate value type|
|def bufferEncoder: Encoder[Average] = Encoders.product|
|// Specifies the Encoder for the final output value type|
|def outputEncoder: Encoder[Double] = Encoders.scalaDouble|
|}
 defined object MyAverage|

scala>

scala> val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
 ds: org.apache.spark.sql.Dataset[Employee] = [name: string, salary: bigint]

scala> ds.show()
 +--------+-----+
|name|salary|

+--------+-----+
|Michael|3000|
|Andy|4500|
|Justin|3500|
|Berta|4000|

+--------+-----+

scala> // +--------+-----+

scala> // | name|salary|

scala> // +--------+-----+

scala> // |Michael| 3000|

scala> // | Andy| 4500|

scala> // | Justin| 3500|

scala> // | Berta| 4000|

scala> // +--------+-----+

scala>

scala> // Convert the function to a `TypedColumn` and give it a name

scala> val averageSalary = MyAverage.toColumn.name("average_salary")
 averageSalary: org.apache.spark.sql.TypedColumn[Employee,Double] = myaverage() AS `average_salary`

scala> val result = ds.select(averageSalary)
 result: org.apache.spark.sql.Dataset[Double] = [average_salary: double]

scala> result.show()
 +--------------+
|average_salary|

+--------------+
|3750.0|

+--------------+
 {code}

> Throw exception: zzcclp   java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25279
>                 URL: https://issues.apache.org/jira/browse/SPARK-25279
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, SQL
>    Affects Versions: 2.2.1
>            Reporter: Zhichao  Zhang
>            Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'], 
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@5e92c46f) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0))) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: 
> aggregateExpressions, type: interface scala.collection.Seq) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, 
> ObjectHashAggregate(keys=[], 
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) 
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: 
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<name:string,salary:bigint> 
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1, 
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1, 
> <function0>) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2, 
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2, 
> <function1>) 
>         - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, 
> name: f$23, type: interface scala.Function1) 
>         - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, 
> <function0>) 
>         - field (class: 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25, 
> name: $outer, type: class 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) 
>         - object (class 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25, 
> <function3>) 
>         - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: 
> interface scala.Function3) 
>         - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[9] 
> at show at <console>:62) 
>         - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class 
> org.apache.spark.rdd.RDD) 
>         - object (class org.apache.spark.OneToOneDependency, 
> org.apache.spark.OneToOneDependency@5bb7895) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@6e81dca3) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(org.apache.spark.OneToOneDependency@5bb7895)) 
>         - field (class: org.apache.spark.rdd.RDD, name: 
> org$apache$spark$rdd$RDD$$dependencies_, type: interface 
> scala.collection.Seq) 
>         - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10] 
> at show at <console>:62) 
>         - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
>         - object (class scala.Tuple2, (MapPartitionsRDD[10] at show at 
> <console>:62,org.apache.spark.ShuffleDependency@421cd28)) 
>   at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) {code}
>  
>   But if I use idea to run the example directly, it works. What is the  
> difference among them? How I run the example sucessfully on Spark-Shell? 
>   Thanks. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org