You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dilip Biswal (JIRA)" <ji...@apache.org> on 2018/08/31 17:25:00 UTC
[jira] [Commented] (SPARK-25279) Throw exception: zzcclp
java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in
Spark-shell when run example of doc
[ https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599048#comment-16599048 ]
Dilip Biswal commented on SPARK-25279:
--------------------------------------
Hello,
Tried against the latest trunk. Seems to work fine.
{code:java}
scala> import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.expressions.Aggregator
scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder
scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala>
scala> case class Employee(name: String, salary: Long)
defined class Employee
scala> case class Average(var sum: Long, var count: Long)
defined class Average
scala>
scala> object MyAverage extends Aggregator[Employee, Average, Double] {
|// A zero value for this aggregation. Should satisfy the property that any b + zero = b|
|def zero: Average = Average(0L, 0L)|
|// Combine two values to produce a new value. For performance, the function may modify `buffer`|
|// and return it instead of constructing a new object|
|def reduce(buffer: Average, employee: Employee): Average = \{ \| buffer.sum += employee.salary \| buffer.count += 1 \| buffer \| }|
|// Merge two intermediate values|
|def merge(b1: Average, b2: Average): Average = \{ \| b1.sum += b2.sum \| b1.count += b2.count \| b1 \| }|
|// Transform the output of the reduction|
|def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count|
|// Specifies the Encoder for the intermediate value type|
|def bufferEncoder: Encoder[Average] = Encoders.product|
|// Specifies the Encoder for the final output value type|
|def outputEncoder: Encoder[Double] = Encoders.scalaDouble|
|}
defined object MyAverage|
scala>
scala> val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds: org.apache.spark.sql.Dataset[Employee] = [name: string, salary: bigint]
scala> ds.show()
+--------+-----+
|name|salary|
+--------+-----+
|Michael|3000|
|Andy|4500|
|Justin|3500|
|Berta|4000|
+--------+-----+
scala> // +--------+-----+
scala> // | name|salary|
scala> // +--------+-----+
scala> // |Michael| 3000|
scala> // | Andy| 4500|
scala> // | Justin| 3500|
scala> // | Berta| 4000|
scala> // +--------+-----+
scala>
scala> // Convert the function to a `TypedColumn` and give it a name
scala> val averageSalary = MyAverage.toColumn.name("average_salary")
averageSalary: org.apache.spark.sql.TypedColumn[Employee,Double] = myaverage() AS `average_salary`
scala> val result = ds.select(averageSalary)
result: org.apache.spark.sql.Dataset[Double] = [average_salary: double]
scala> result.show()
+--------------+
|average_salary|
+--------------+
|3750.0|
+--------------+
{code}
> Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> -----------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
> Issue Type: Bug
> Components: Spark Shell, SQL
> Affects Versions: 2.2.1
> Reporter: Zhichao Zhang
> Priority: Minor
>
> Hi dev:
> I am using Spark-Shell to run the example which is in section
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
> and there is an error:
> {code:java}
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.TypedColumn
> Serialization stack:
> - object not serializable (class: org.apache.spark.sql.TypedColumn, value:
> myaverage() AS `average_salary`)
> - field (class: $iw, name: averageSalary, type: class
> org.apache.spark.sql.TypedColumn)
> - object (class $iw, $iw@4b2f8ae9)
> - field (class: MyAverage$, name: $outer, type: class $iw)
> - object (class MyAverage$, MyAverage$@2be41d90)
> - field (class:
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression,
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator)
> - object (class
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression,
> MyAverage(Employee))
> - field (class:
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
> name: aggregateFunction, type: class
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
> - object (class
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)),
> Some(class Employee), Some(StructType(StructField(name,StringType,true),
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
> Average, true])).count AS count#26L, newInstance(class Average), input[0,
> double, false] AS value#24, DoubleType, false, 0, 0))
> - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@5e92c46f)
> - writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.$colon$colon,
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class
> Employee)), Some(class Employee),
> Some(StructType(StructField(name,StringType,true),
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
> Average, true])).count AS count#26L, newInstance(class Average), input[0,
> double, false] AS value#24, DoubleType, false, 0, 0)))
> - field (class:
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name:
> aggregateExpressions, type: interface scala.collection.Seq)
> - object (class
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec,
> ObjectHashAggregate(keys=[],
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class
> Employee)), Some(class Employee),
> Some(StructType(StructField(name,StringType,true),
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
> Average, true])).count AS count#26L, newInstance(class Average), input[0,
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37])
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location:
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<name:string,salary:bigint>
> )
> - field (class:
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
> name: $outer, type: class
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec)
> - object (class
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
> <function0>)
> - field (class:
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
> name: $outer, type: class
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1)
> - object (class
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
> <function1>)
> - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
> name: f$23, type: interface scala.Function1)
> - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
> <function0>)
> - field (class:
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25,
> name: $outer, type: class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
> - object (class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25,
> <function3>)
> - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
> interface scala.Function3)
> - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[9]
> at show at <console>:62)
> - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class
> org.apache.spark.rdd.RDD)
> - object (class org.apache.spark.OneToOneDependency,
> org.apache.spark.OneToOneDependency@5bb7895)
> - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@6e81dca3)
> - writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy)
> - object (class scala.collection.immutable.$colon$colon,
> List(org.apache.spark.OneToOneDependency@5bb7895))
> - field (class: org.apache.spark.rdd.RDD, name:
> org$apache$spark$rdd$RDD$$dependencies_, type: interface
> scala.collection.Seq)
> - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10]
> at show at <console>:62)
> - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
> - object (class scala.Tuple2, (MapPartitionsRDD[10] at show at
> <console>:62,org.apache.spark.ShuffleDependency@421cd28))
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) {code}
>
> But if I use idea to run the example directly, it works. What is the
> difference among them? How I run the example sucessfully on Spark-Shell?
> Thanks.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org