You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sun Rui (JIRA)" <ji...@apache.org> on 2016/04/21 15:01:25 UTC
[jira] [Created] (SPARK-14803) A bug in EliminateSerialization rule
in Catalyst Optimizer
Sun Rui created SPARK-14803:
-------------------------------
Summary: A bug in EliminateSerialization rule in Catalyst Optimizer
Key: SPARK-14803
URL: https://issues.apache.org/jira/browse/SPARK-14803
Project: Spark
Issue Type: Bug
Components: Optimizer, SQL
Reporter: Sun Rui
When I rebased my PR https://github.com/apache/spark/pull/12493 to master, I found a bug in EliminateSerialization rule in Catalyst Optimizer, which was introduced in the PR https://github.com/apache/spark/pull/12260.
The related code is:
{code}
object EliminateSerialization extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case d @ DeserializeToObject(_, _, s: SerializeFromObject)
if (d.outputObjectType == s.inputObjectType) =>
// Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`.
val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId)
Project(objAttr :: Nil, s.child)
{code}
In my PR, when there are multiple successive calls to dapply(), the SerializeFromObject and DeserializeToObject logical operators will be eliminated and replaced with a Project operator. However, the involved object is Row, and there is no support for Row in UnsafeRowWriter.
Detailed error message:
{panel}
1. Error: dapply() on a DataFrame ----------------------------------------------
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1156.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1156.0 (TID 9648, localhost): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 31, Column 29: No applicable constructor/method found for actual parameters "int, org.apache.spark.sql.Row"; candidates are: "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, byte[])", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, org.apache.spark.unsafe.types.UTF8String)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, org.apache.spark.sql.types.Decimal, int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, double)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, byte[], int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, org.apache.spark.unsafe.types.CalendarInterval)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, byte)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, boolean)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, short)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, long)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, float)"
/* 001 */
/* 002 */ public java.lang.Object generate(Object[] references) {
/* 003 */ return new SpecificUnsafeProjection(references);
/* 004 */ }
/* 005 */
/* 006 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 007 */
/* 008 */ private Object[] references;
/* 009 */ private UnsafeRow result;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
/* 012 */
/* 013 */
/* 014 */ public SpecificUnsafeProjection(Object[] references) {
/* 015 */ this.references = references;
/* 016 */ result = new UnsafeRow(1);
/* 017 */ this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
/* 018 */ this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
/* 019 */ }
/* 020 */
/* 021 */ // Scala.Function1 need this
/* 022 */ public java.lang.Object apply(java.lang.Object row) {
/* 023 */ return apply((InternalRow) row);
/* 024 */ }
/* 025 */
/* 026 */ public UnsafeRow apply(InternalRow i) {
/* 027 */ holder.reset();
/* 028 */
/* 029 */ /* input[0, org.apache.spark.sql.Row] */
/* 030 */ org.apache.spark.sql.Row value = (org.apache.spark.sql.Row)i.get(0, null);
/* 031 */ rowWriter.write(0, value);
/* 032 */ result.setTotalSize(holder.totalSize());
/* 033 */ return result;
/* 034 */ }
/* 035 */ }
/* 036 */
at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:636)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:395)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:352)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
at org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:67)
at org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{panel}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org