You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "koert kuipers (JIRA)" <ji...@apache.org> on 2016/04/16 00:36:25 UTC
[jira] [Created] (SPARK-14675) ClassFormatError in codegen when
using Aggregator
koert kuipers created SPARK-14675:
-------------------------------------
Summary: ClassFormatError in codegen when using Aggregator
Key: SPARK-14675
URL: https://issues.apache.org/jira/browse/SPARK-14675
Project: Spark
Issue Type: Bug
Components: SQL
Environment: spark 2.0.0-SNAPSHOT
Reporter: koert kuipers
code:
{noformat}
val toList = new Aggregator[(String, Int), Seq[Int], Seq[Int]] {
def bufferEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
def finish(reduction: Seq[Int]): Seq[Int] = reduction
def merge(b1: Seq[Int],b2: Seq[Int]): Seq[Int] = b1 ++ b2
def outputEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
def reduce(b: Seq[Int],a: (String, Int)): Seq[Int] = b :+ a._2
def zero: Seq[Int] = Seq.empty[Int]
}
val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
val ds2 = ds1.groupByKey(_._1).agg(toList.toColumn)
ds2.show
{noformat}
this gives me:
{noformat}
6/04/15 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, localhost): java.lang.ClassFormatError: Duplicate field name&signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificMutableProjection
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass.generate(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:140)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:139)
at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:178)
at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:197)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:39)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:80)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:71)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:72)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{noformat}
when i do:
{noformat}
ds2.queryExecution.debug.codegen()
{noformat}
i get:
{noformat}
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
WholeStageCodegen
: +- Sort [value#6 ASC], false, 0
: +- INPUT
+- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
+- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [value#6 ASC], false, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private boolean sort_needToSort;
/* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */ private scala.collection.Iterator inputadapter_input;
/* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */
/* 022 */ public GeneratedIterator(Object[] references) {
/* 023 */ this.references = references;
/* 024 */ }
/* 025 */
/* 026 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 027 */ partitionIndex = index;
/* 028 */ sort_needToSort = true;
/* 029 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 030 */ sort_sorter = sort_plan.createSorter();
/* 031 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 032 */
/* 033 */ inputadapter_input = inputs[0];
/* 034 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 035 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
/* 036 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 037 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
/* 038 */ }
/* 039 */
/* 040 */ private void sort_addToSorter() throws java.io.IOException {
/* 041 */ /*** PRODUCE: INPUT */
/* 042 */
/* 043 */ while (inputadapter_input.hasNext()) {
/* 044 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 045 */ /*** CONSUME: Sort [value#6 ASC], false, 0 */
/* 046 */
/* 047 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 048 */ if (shouldStop()) return;
/* 049 */ }
/* 050 */
/* 051 */ }
/* 052 */
/* 053 */ protected void processNext() throws java.io.IOException {
/* 054 */ /*** PRODUCE: Sort [value#6 ASC], false, 0 */
/* 055 */ if (sort_needToSort) {
/* 056 */ sort_addToSorter();
/* 057 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 058 */ sort_sortedIter = sort_sorter.sort();
/* 059 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 060 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 061 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 062 */ sort_needToSort = false;
/* 063 */ }
/* 064 */
/* 065 */ while (sort_sortedIter.hasNext()) {
/* 066 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 067 */
/* 068 */ /*** CONSUME: WholeStageCodegen */
/* 069 */
/* 070 */ append(sort_outputRow);
/* 071 */
/* 072 */ if (shouldStop()) return;
/* 073 */ }
/* 074 */ }
/* 075 */ }
== Subtree 2 / 2 ==
WholeStageCodegen
: +- Sort [value#6 ASC], false, 0
: +- INPUT
+- Exchange hashpartitioning(value#6, 4), None
+- SortBasedAggregate(key=[value#6], functions=[(anon$1(scala.Tuple2),mode=Partial,isDistinct=false)], output=[value#6,value#15])
+- WholeStageCodegen
: +- Sort [value#6 ASC], false, 0
: +- INPUT
+- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String], true) AS value#6]
+- LocalTableScan [_1#2,_2#3], [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [value#6 ASC], false, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private boolean sort_needToSort;
/* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */ private scala.collection.Iterator inputadapter_input;
/* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */
/* 022 */ public GeneratedIterator(Object[] references) {
/* 023 */ this.references = references;
/* 024 */ }
/* 025 */
/* 026 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 027 */ partitionIndex = index;
/* 028 */ sort_needToSort = true;
/* 029 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 030 */ sort_sorter = sort_plan.createSorter();
/* 031 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 032 */
/* 033 */ inputadapter_input = inputs[0];
/* 034 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 035 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
/* 036 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 037 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
/* 038 */ }
/* 039 */
/* 040 */ private void sort_addToSorter() throws java.io.IOException {
/* 041 */ /*** PRODUCE: INPUT */
/* 042 */
/* 043 */ while (inputadapter_input.hasNext()) {
/* 044 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 045 */ /*** CONSUME: Sort [value#6 ASC], false, 0 */
/* 046 */
/* 047 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 048 */ if (shouldStop()) return;
/* 049 */ }
/* 050 */
/* 051 */ }
/* 052 */
/* 053 */ protected void processNext() throws java.io.IOException {
/* 054 */ /*** PRODUCE: Sort [value#6 ASC], false, 0 */
/* 055 */ if (sort_needToSort) {
/* 056 */ sort_addToSorter();
/* 057 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 058 */ sort_sortedIter = sort_sorter.sort();
/* 059 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 060 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 061 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 062 */ sort_needToSort = false;
/* 063 */ }
/* 064 */
/* 065 */ while (sort_sortedIter.hasNext()) {
/* 066 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 067 */
/* 068 */ /*** CONSUME: WholeStageCodegen */
/* 069 */
/* 070 */ append(sort_outputRow);
/* 071 */
/* 072 */ if (shouldStop()) return;
/* 073 */ }
/* 074 */ }
/* 075 */ }
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org