You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Grove <an...@agildata.com> on 2016/08/05 04:25:13 UTC

Regression in Java RDD sortBy() in Spark 2.0

Hi,

I have some working Java code with Spark 1.6 that I am upgrading to Spark
2.0

I have this valid RDD:

JavaRDD<JPopulationSummary> popSummary

I want to sort using a function I provide for performing comparisons:

        popSummary
            .sortBy((Function<JPopulationSummary, Object>) p -> p.getMale()
* 1.0f / p.getFemale(), true, 1)

The code fails at runtime with the following error.

Caused by: java.lang.ClassCastException: JPopulationSummary cannot be cast
to java.lang.Comparable
at
org.spark_project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at
scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at scala.math.Ordering$$anon$4.compare(Ordering.scala:111)
at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
at org.spark_project.guava.collect.Ordering.max(Ordering.java:551)
at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:667)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at
org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1374)
at
org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1371)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Even if the POJO did implement Comparable, Spark shouldn't care since I
provided the comparator I want to sort by.

Am I doing something wrong or is this a regression?

Thanks,

Andy.

--

Andy Grove
Chief Architect
www.agildata.com

Re: Regression in Java RDD sortBy() in Spark 2.0

Posted by Andy Grove <an...@agildata.com>.
Moments after sending this I tracked down the issue to a subsequent
transformation of .top(10) which ran without error in Spark 1.6 (but who
knows how it was sorting since the POJO doesn't implement Comparable)
whereas in Spark 2.0 it now fails if the POJO is not Comparable.

The new behavior is better for sure.

Thanks,

Andy.

--

Andy Grove
Chief Architect
AgilData - Simple Streaming SQL that Scales
www.agildata.com


On Thu, Aug 4, 2016 at 10:25 PM, Andy Grove <an...@agildata.com> wrote:

> Hi,
>
> I have some working Java code with Spark 1.6 that I am upgrading to Spark
> 2.0
>
> I have this valid RDD:
>
> JavaRDD<JPopulationSummary> popSummary
>
> I want to sort using a function I provide for performing comparisons:
>
>         popSummary
>             .sortBy((Function<JPopulationSummary, Object>) p ->
> p.getMale() * 1.0f / p.getFemale(), true, 1)
>
> The code fails at runtime with the following error.
>
> Caused by: java.lang.ClassCastException: JPopulationSummary cannot be cast
> to java.lang.Comparable
> at org.spark_project.guava.collect.NaturalOrdering.
> compare(NaturalOrdering.java:28)
> at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:
> 153)
> at scala.math.Ordering$$anon$4.compare(Ordering.scala:111)
> at org.apache.spark.util.collection.Utils$$anon$1.compare(Utils.scala:35)
> at org.spark_project.guava.collect.Ordering.max(Ordering.java:551)
> at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:667)
> at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
> at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$
> anonfun$30.apply(RDD.scala:1374)
> at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$
> anonfun$30.apply(RDD.scala:1371)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:766)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:766)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Even if the POJO did implement Comparable, Spark shouldn't care since I
> provided the comparator I want to sort by.
>
> Am I doing something wrong or is this a regression?
>
> Thanks,
>
> Andy.
>
> --
>
> Andy Grove
> Chief Architect
> www.agildata.com
>
>