You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yin Huai (JIRA)" <ji...@apache.org> on 2016/04/19 19:53:25 UTC
[jira] [Resolved] (SPARK-14675) ClassFormatError in codegen when
using Aggregator
[ https://issues.apache.org/jira/browse/SPARK-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yin Huai resolved SPARK-14675.
------------------------------
Resolution: Fixed
Assignee: Wenchen Fan
Fix Version/s: 2.0.0
This issue has been resolved by https://github.com/apache/spark/pull/12468.
> ClassFormatError in codegen when using Aggregator
> -------------------------------------------------
>
> Key: SPARK-14675
> URL: https://issues.apache.org/jira/browse/SPARK-14675
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
> Reporter: koert kuipers
> Assignee: Wenchen Fan
> Fix For: 2.0.0
>
>
> code:
> {noformat}
> val toList = new Aggregator[(String, Int), Seq[Int], Seq[Int]] {
> def bufferEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
> def finish(reduction: Seq[Int]): Seq[Int] = reduction
> def merge(b1: Seq[Int],b2: Seq[Int]): Seq[Int] = b1 ++ b2
> def outputEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
> def reduce(b: Seq[Int],a: (String, Int)): Seq[Int] = b :+ a._2
> def zero: Seq[Int] = Seq.empty[Int]
> }
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.groupByKey(_._1).agg(toList.toColumn)
> ds2.show
> {noformat}
> this gives me:
> {noformat}
> 6/04/15 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, localhost): java.lang.ClassFormatError: Duplicate field name&signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificMutableProjection
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> at org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:66)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass.generate(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:140)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:139)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:178)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:197)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:39)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:80)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:71)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:72)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> when i do:
> {noformat}
> ds2.queryExecution.debug.codegen()
> {noformat}
> i get:
> {noformat}
> Found 2 WholeStageCodegen subtrees.
> == Subtree 1 / 2 ==
> WholeStageCodegen
> : +- Sort [value#6 ASC], false, 0
> : +- INPUT
> +- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
> +- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
> Generated code:
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * Sort [value#6 ASC], false, 0
> /* 007 */ +- INPUT
> /* 008 */ */
> /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
> /* 010 */ private Object[] references;
> /* 011 */ private boolean sort_needToSort;
> /* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
> /* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
> /* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
> /* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
> /* 016 */ private scala.collection.Iterator inputadapter_input;
> /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
> /* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
> /* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
> /* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
> /* 021 */
> /* 022 */ public GeneratedIterator(Object[] references) {
> /* 023 */ this.references = references;
> /* 024 */ }
> /* 025 */
> /* 026 */ public void init(int index, scala.collection.Iterator inputs[]) {
> /* 027 */ partitionIndex = index;
> /* 028 */ sort_needToSort = true;
> /* 029 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
> /* 030 */ sort_sorter = sort_plan.createSorter();
> /* 031 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
> /* 032 */
> /* 033 */ inputadapter_input = inputs[0];
> /* 034 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
> /* 035 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
> /* 036 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
> /* 037 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
> /* 038 */ }
> /* 039 */
> /* 040 */ private void sort_addToSorter() throws java.io.IOException {
> /* 041 */ /*** PRODUCE: INPUT */
> /* 042 */
> /* 043 */ while (inputadapter_input.hasNext()) {
> /* 044 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
> /* 045 */ /*** CONSUME: Sort [value#6 ASC], false, 0 */
> /* 046 */
> /* 047 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
> /* 048 */ if (shouldStop()) return;
> /* 049 */ }
> /* 050 */
> /* 051 */ }
> /* 052 */
> /* 053 */ protected void processNext() throws java.io.IOException {
> /* 054 */ /*** PRODUCE: Sort [value#6 ASC], false, 0 */
> /* 055 */ if (sort_needToSort) {
> /* 056 */ sort_addToSorter();
> /* 057 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
> /* 058 */ sort_sortedIter = sort_sorter.sort();
> /* 059 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
> /* 060 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
> /* 061 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
> /* 062 */ sort_needToSort = false;
> /* 063 */ }
> /* 064 */
> /* 065 */ while (sort_sortedIter.hasNext()) {
> /* 066 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
> /* 067 */
> /* 068 */ /*** CONSUME: WholeStageCodegen */
> /* 069 */
> /* 070 */ append(sort_outputRow);
> /* 071 */
> /* 072 */ if (shouldStop()) return;
> /* 073 */ }
> /* 074 */ }
> /* 075 */ }
> == Subtree 2 / 2 ==
> WholeStageCodegen
> : +- Sort [value#6 ASC], false, 0
> : +- INPUT
> +- Exchange hashpartitioning(value#6, 4), None
> +- SortBasedAggregate(key=[value#6], functions=[(anon$1(scala.Tuple2),mode=Partial,isDistinct=false)], output=[value#6,value#15])
> +- WholeStageCodegen
> : +- Sort [value#6 ASC], false, 0
> : +- INPUT
> +- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
> +- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
> Generated code:
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * Sort [value#6 ASC], false, 0
> /* 007 */ +- INPUT
> /* 008 */ */
> /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
> /* 010 */ private Object[] references;
> /* 011 */ private boolean sort_needToSort;
> /* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
> /* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
> /* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
> /* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
> /* 016 */ private scala.collection.Iterator inputadapter_input;
> /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
> /* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
> /* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
> /* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
> /* 021 */
> /* 022 */ public GeneratedIterator(Object[] references) {
> /* 023 */ this.references = references;
> /* 024 */ }
> /* 025 */
> /* 026 */ public void init(int index, scala.collection.Iterator inputs[]) {
> /* 027 */ partitionIndex = index;
> /* 028 */ sort_needToSort = true;
> /* 029 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
> /* 030 */ sort_sorter = sort_plan.createSorter();
> /* 031 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
> /* 032 */
> /* 033 */ inputadapter_input = inputs[0];
> /* 034 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
> /* 035 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
> /* 036 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
> /* 037 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
> /* 038 */ }
> /* 039 */
> /* 040 */ private void sort_addToSorter() throws java.io.IOException {
> /* 041 */ /*** PRODUCE: INPUT */
> /* 042 */
> /* 043 */ while (inputadapter_input.hasNext()) {
> /* 044 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
> /* 045 */ /*** CONSUME: Sort [value#6 ASC], false, 0 */
> /* 046 */
> /* 047 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
> /* 048 */ if (shouldStop()) return;
> /* 049 */ }
> /* 050 */
> /* 051 */ }
> /* 052 */
> /* 053 */ protected void processNext() throws java.io.IOException {
> /* 054 */ /*** PRODUCE: Sort [value#6 ASC], false, 0 */
> /* 055 */ if (sort_needToSort) {
> /* 056 */ sort_addToSorter();
> /* 057 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
> /* 058 */ sort_sortedIter = sort_sorter.sort();
> /* 059 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
> /* 060 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
> /* 061 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
> /* 062 */ sort_needToSort = false;
> /* 063 */ }
> /* 064 */
> /* 065 */ while (sort_sortedIter.hasNext()) {
> /* 066 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
> /* 067 */
> /* 068 */ /*** CONSUME: WholeStageCodegen */
> /* 069 */
> /* 070 */ append(sort_outputRow);
> /* 071 */
> /* 072 */ if (shouldStop()) return;
> /* 073 */ }
> /* 074 */ }
> /* 075 */ }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org