You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2018/04/17 15:54:55 UTC

[GitHub] spark pull request #21086: [SPARK-24002] [SQL] Task not serializable caused ...

GitHub user gatorsmile opened a pull request:

    https://github.com/apache/spark/pull/21086

    [SPARK-24002] [SQL] Task not serializable caused by org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes

    ## What changes were proposed in this pull request?
    ```
    Py4JJavaError: An error occurred while calling o153.sql.
    : org.apache.spark.SparkException: Job aborted.
    	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
    	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
    	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
    	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
    	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
    	at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
    	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
    	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
    	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
    	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
    	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
    	at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    	at py4j.Gateway.invoke(Gateway.java:293)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:226)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Exception thrown in Future.get: 
    	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
    	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
    	at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
    	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
    	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
    	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
    	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
    	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
    	...
    	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
    	... 23 more
    Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Task not serializable
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
    	... 276 more
    Caused by: org.apache.spark.SparkException: Task not serializable
    	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
    	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
    	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    	at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    	at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
    	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
    	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
    	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
    	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
    	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
    	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
    	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
    	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
    	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
    	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
    	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
    	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
    	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
    	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
    	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
    	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
    	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
    	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
    	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	... 1 more
    Caused by: java.nio.BufferUnderflowException
    	at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
    	at java.nio.ByteBuffer.get(ByteBuffer.java:715)
    	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
    	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
    	at org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
    	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    ```
    
    The Parquet filters are serializable but not thread safe. SparkPlan.prepare() could be called in different threads (BroadcastExchange will call it in a thread pool). Thus, we could serialize the same Parquet filter at the same time. This is not easily reproduced. The fix is to avoid serializing these Parquet filters in the driver. This PR is to avoid serializing these Parquet filters by moving the parquet filter generation from the driver to executors. 
    
    ## How was this patch tested?
    Having two queries one is a 1000-line SQL query and a 3000-line SQL query. Need to run at least one hour with a heavy write workload to reproduce once. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gatorsmile/spark taskNotSerializable

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21086.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21086
    
----
commit 3bb4824225b53f0ee7900835bfc99b9bd01f7d4f
Author: gatorsmile <ga...@...>
Date:   2018-04-17T15:46:12Z

    fix.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    @ghoto Sure. Will backport it tonight.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    cc @cloud-fan @zsxwing 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    since people hit this issue, let's backport. cc @gatorsmile 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21086: [SPARK-24002] [SQL] Task not serializable caused ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21086#discussion_r188504187
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---
    @@ -351,12 +338,26 @@ class ParquetFileFormat
         val timestampConversion: Boolean =
           sparkSession.sessionState.conf.isParquetINT96TimestampConversion
         val capacity = sqlConf.parquetVectorizedReaderBatchSize
    +    val enableParquetFilterPushDown: Boolean =
    +      sparkSession.sessionState.conf.parquetFilterPushDown
         // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
         val returningBatch = supportBatch(sparkSession, resultSchema)
     
         (file: PartitionedFile) => {
           assert(file.partitionValues.numFields == partitionSchema.size)
     
    +      // Try to push down filters when filter push-down is enabled.
    --- End diff --
    
    Now the code is inside the read function, which will be executed at executor side. Thus we don't need to serialize `ParquetFilters`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21086: [SPARK-24002] [SQL] Task not serializable caused ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21086


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by ghoto <gi...@git.apache.org>.
Github user ghoto commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    I'm hitting this issue after upgrading from 2.0.2 to 2.3.0. Please backport this PR to Spark 2.3.0


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89464/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21086: [SPARK-24002] [SQL] Task not serializable caused ...

Posted by ghoto <gi...@git.apache.org>.
Github user ghoto commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21086#discussion_r188473831
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---
    @@ -351,12 +338,26 @@ class ParquetFileFormat
         val timestampConversion: Boolean =
           sparkSession.sessionState.conf.isParquetINT96TimestampConversion
         val capacity = sqlConf.parquetVectorizedReaderBatchSize
    +    val enableParquetFilterPushDown: Boolean =
    +      sparkSession.sessionState.conf.parquetFilterPushDown
         // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
         val returningBatch = supportBatch(sparkSession, resultSchema)
     
         (file: PartitionedFile) => {
           assert(file.partitionValues.numFields == partitionSchema.size)
     
    +      // Try to push down filters when filter push-down is enabled.
    --- End diff --
    
    So this code is the same as before. How can this solve the bug described in the head of the Conversation?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    Thanks! Merged to master/2.3


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    **[Test build #89464 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89464/testReport)** for PR 21086 at commit [`3bb4824`](https://github.com/apache/spark/commit/3bb4824225b53f0ee7900835bfc99b9bd01f7d4f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2399/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    **[Test build #89464 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89464/testReport)** for PR 21086 at commit [`3bb4824`](https://github.com/apache/spark/commit/3bb4824225b53f0ee7900835bfc99b9bd01f7d4f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21086: [SPARK-24002] [SQL] Task not serializable caused by org....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21086
  
    If the community hit this issue, we can then backport it to Spark 2.3. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21086: [SPARK-24002] [SQL] Task not serializable caused ...

Posted by ghoto <gi...@git.apache.org>.
Github user ghoto commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21086#discussion_r188701408
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---
    @@ -351,12 +338,26 @@ class ParquetFileFormat
         val timestampConversion: Boolean =
           sparkSession.sessionState.conf.isParquetINT96TimestampConversion
         val capacity = sqlConf.parquetVectorizedReaderBatchSize
    +    val enableParquetFilterPushDown: Boolean =
    +      sparkSession.sessionState.conf.parquetFilterPushDown
         // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
         val returningBatch = supportBatch(sparkSession, resultSchema)
     
         (file: PartitionedFile) => {
           assert(file.partitionValues.numFields == partitionSchema.size)
     
    +      // Try to push down filters when filter push-down is enabled.
    --- End diff --
    
    I see, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org