You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "kondziolka9ld (Jira)" <ji...@apache.org> on 2022/05/20 08:19:00 UTC
[jira] [Updated] (SPARK-39239) Parquet written by spark in yarn mode can not be read by spark in local[2+] mode
[ https://issues.apache.org/jira/browse/SPARK-39239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kondziolka9ld updated SPARK-39239:
----------------------------------
Attachment: threaddump_spark_shell
> Parquet written by spark in yarn mode can not be read by spark in local[2+] mode
> --------------------------------------------------------------------------------
>
> Key: SPARK-39239
> URL: https://issues.apache.org/jira/browse/SPARK-39239
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.1.2
> Reporter: kondziolka9ld
> Priority: Minor
> Attachments: threaddump_spark_shell
>
>
> Hi,
> I came across a strange issue, namely data written by spark in yarn mode can not be read by spark in local[2+] mode. By saying can not be read I mean that read operations hangs forever. Strangely enough, local[1] is able to read these parquet data. Additionally, repartition of data before writing is some kind of workaround as well. I attached thread dump and in fact, thread waits on latch.
> I am not sure if it is a bug or some kind of misconfiguration or misunderstanding.
> ----
> h4. Reproduction scenario:
> h4. Writer console log:
> {code:java}
> user@host [] /tmp $ spark-shell --master yarn
> [...]
> scala> (1 to 1000).toDF.write.parquet("hdfs:///tmp/sample_1")
> scala> (1 to 1000).toDF.repartition(42).write.parquet("hdfs:///tmp/sample_2"){code}
> h4. Reader console log:
> {code:java}
> user@host [] /tmp $ spark-shell --master local[2]
> [...]
> scala> spark.read.parquet("hdfs:///tmp/sample_2").count
> res2: Long = 1000
> scala> spark.read.parquet("hdfs:///tmp/sample_1").count
> [Stage 5:=============================> (1 + 0) / 2]
> user@host [] /tmp $ spark-shell --master local[1]
> [...]
> scala> spark.read.parquet("hdfs:///tmp/sample_1").count
> res0: Long = 1000
> {code}
> ----
> h4. Thread dump of locked thread
> {code:java}
> "main" #1 prio=5 os_prio=0 tid=0x00007f93b8054000 nid=0x6dce waiting on condition [0x00007f93c0658000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000eb65eab8> (a scala.concurrent.impl.Promise$CompletionLatch)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
> at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:859)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
> at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
> at org.apache.spark.rdd.RDD$$Lambda$2193/1084000875.apply(Unknown Source)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
> at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
> at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3006)
> at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3005)
> at org.apache.spark.sql.Dataset$$Lambda$2847/937335652.apply(Unknown Source)
> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
> at org.apache.spark.sql.Dataset$$Lambda$2848/1831604445.apply(Unknown Source)
> at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2853/2038636888.apply(Unknown Source)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2849/1622269832.apply(Unknown Source)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:3005)
> at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
> at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
> at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:30)
> at $line19.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:32)
> at $line19.$read$$iw$$iw$$iw$$iw.<init>(<console>:34)
> at $line19.$read$$iw$$iw$$iw.<init>(<console>:36)
> at $line19.$read$$iw$$iw.<init>(<console>:38)
> at $line19.$read$$iw.<init>(<console>:40)
> at $line19.$read.<init>(<console>:42)
> at $line19.$read$.<init>(<console>:46)
> at $line19.$read$.<clinit>(<console>)
> at $line19.$eval$.$print$lzycompute(<console>:7)
> - locked <0x00000000f67afed8> (a $line19.$eval$)
> at $line19.$eval$.$print(<console>:6)
> at $line19.$eval.$print(<console>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
> at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
> at scala.tools.nsc.interpreter.IMain$$Lambda$1216/588503940.apply(Unknown Source)
> at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
> at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
> at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:894)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:762)
> at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:464)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:485)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:239)
> at org.apache.spark.repl.Main$.doMain(Main.scala:78)
> at org.apache.spark.repl.Main$.main(Main.scala:58)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
> Dumps for all thread are available here: [https://pastebin.com/WhdpsEVq] (I can not add attachment for some reason)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org