You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "koert kuipers (JIRA)" <ji...@apache.org> on 2016/04/16 00:36:25 UTC

[jira] [Created] (SPARK-14675) ClassFormatError in codegen when using Aggregator

koert kuipers created SPARK-14675:
-------------------------------------

             Summary: ClassFormatError in codegen when using Aggregator
                 Key: SPARK-14675
                 URL: https://issues.apache.org/jira/browse/SPARK-14675
             Project: Spark
          Issue Type: Bug
          Components: SQL
         Environment: spark 2.0.0-SNAPSHOT
            Reporter: koert kuipers


code:
{noformat}
  val toList = new Aggregator[(String, Int), Seq[Int], Seq[Int]] {
    def bufferEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
    def finish(reduction: Seq[Int]): Seq[Int] = reduction
    def merge(b1: Seq[Int],b2: Seq[Int]): Seq[Int] = b1 ++ b2
    def outputEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
    def reduce(b: Seq[Int],a: (String, Int)): Seq[Int] = b :+ a._2
    def zero: Seq[Int] = Seq.empty[Int]
  }

  val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
  val ds2 = ds1.groupByKey(_._1).agg(toList.toColumn)
  ds2.show
{noformat}
this gives me:
{noformat}
6/04/15 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, localhost): java.lang.ClassFormatError: Duplicate field name&signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificMutableProjection
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
	at org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:66)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass.generate(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:140)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:139)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:178)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:197)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:39)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:80)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:72)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
{noformat}

when i do:
{noformat}
 ds2.queryExecution.debug.codegen()
{noformat}
i get:
{noformat}
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
WholeStageCodegen
:  +- Sort [value#6 ASC], false, 0
:     +- INPUT
+- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
   +- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */ 
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [value#6 ASC], false, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */   
/* 022 */   public GeneratedIterator(Object[] references) {
/* 023 */     this.references = references;
/* 024 */   }
/* 025 */   
/* 026 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 027 */     partitionIndex = index;
/* 028 */     sort_needToSort = true;
/* 029 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 030 */     sort_sorter = sort_plan.createSorter();
/* 031 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 032 */     
/* 033 */     inputadapter_input = inputs[0];
/* 034 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 035 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
/* 036 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 037 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
/* 038 */   }
/* 039 */   
/* 040 */   private void sort_addToSorter() throws java.io.IOException {
/* 041 */     /*** PRODUCE: INPUT */
/* 042 */     
/* 043 */     while (inputadapter_input.hasNext()) {
/* 044 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 045 */       /*** CONSUME: Sort [value#6 ASC], false, 0 */
/* 046 */       
/* 047 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 048 */       if (shouldStop()) return;
/* 049 */     }
/* 050 */     
/* 051 */   }
/* 052 */   
/* 053 */   protected void processNext() throws java.io.IOException {
/* 054 */     /*** PRODUCE: Sort [value#6 ASC], false, 0 */
/* 055 */     if (sort_needToSort) {
/* 056 */       sort_addToSorter();
/* 057 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 058 */       sort_sortedIter = sort_sorter.sort();
/* 059 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 060 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 061 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 062 */       sort_needToSort = false;
/* 063 */     }
/* 064 */     
/* 065 */     while (sort_sortedIter.hasNext()) {
/* 066 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 067 */       
/* 068 */       /*** CONSUME: WholeStageCodegen */
/* 069 */       
/* 070 */       append(sort_outputRow);
/* 071 */       
/* 072 */       if (shouldStop()) return;
/* 073 */     }
/* 074 */   }
/* 075 */ }

== Subtree 2 / 2 ==
WholeStageCodegen
:  +- Sort [value#6 ASC], false, 0
:     +- INPUT
+- Exchange hashpartitioning(value#6, 4), None
   +- SortBasedAggregate(key=[value#6], functions=[(anon$1(scala.Tuple2),mode=Partial,isDistinct=false)], output=[value#6,value#15])
      +- WholeStageCodegen
         :  +- Sort [value#6 ASC], false, 0
         :     +- INPUT
         +- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
            +- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */ 
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [value#6 ASC], false, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */   
/* 022 */   public GeneratedIterator(Object[] references) {
/* 023 */     this.references = references;
/* 024 */   }
/* 025 */   
/* 026 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 027 */     partitionIndex = index;
/* 028 */     sort_needToSort = true;
/* 029 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 030 */     sort_sorter = sort_plan.createSorter();
/* 031 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 032 */     
/* 033 */     inputadapter_input = inputs[0];
/* 034 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 035 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
/* 036 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 037 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
/* 038 */   }
/* 039 */   
/* 040 */   private void sort_addToSorter() throws java.io.IOException {
/* 041 */     /*** PRODUCE: INPUT */
/* 042 */     
/* 043 */     while (inputadapter_input.hasNext()) {
/* 044 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 045 */       /*** CONSUME: Sort [value#6 ASC], false, 0 */
/* 046 */       
/* 047 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 048 */       if (shouldStop()) return;
/* 049 */     }
/* 050 */     
/* 051 */   }
/* 052 */   
/* 053 */   protected void processNext() throws java.io.IOException {
/* 054 */     /*** PRODUCE: Sort [value#6 ASC], false, 0 */
/* 055 */     if (sort_needToSort) {
/* 056 */       sort_addToSorter();
/* 057 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 058 */       sort_sortedIter = sort_sorter.sort();
/* 059 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 060 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 061 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 062 */       sort_needToSort = false;
/* 063 */     }
/* 064 */     
/* 065 */     while (sort_sortedIter.hasNext()) {
/* 066 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 067 */       
/* 068 */       /*** CONSUME: WholeStageCodegen */
/* 069 */       
/* 070 */       append(sort_outputRow);
/* 071 */       
/* 072 */       if (shouldStop()) return;
/* 073 */     }
/* 074 */   }
/* 075 */ }
{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org