You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "kondziolka9ld (Jira)" <ji...@apache.org> on 2022/05/20 08:19:00 UTC

[jira] [Updated] (SPARK-39239) Parquet written by spark in yarn mode can not be read by spark in local[2+] mode

     [ https://issues.apache.org/jira/browse/SPARK-39239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

kondziolka9ld updated SPARK-39239:
----------------------------------
    Attachment: threaddump_spark_shell

> Parquet written by spark in yarn mode can not be read by spark in local[2+] mode
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-39239
>                 URL: https://issues.apache.org/jira/browse/SPARK-39239
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.1.2
>            Reporter: kondziolka9ld
>            Priority: Minor
>         Attachments: threaddump_spark_shell
>
>
> Hi,
> I came across a strange issue, namely data written by spark in yarn mode can not be read by spark in local[2+] mode. By saying can not be read I mean that read operations hangs forever. Strangely enough, local[1] is able to read these parquet data. Additionally, repartition of data before writing is some kind of workaround as well. I attached thread dump and in fact, thread waits on latch.
> I am not sure if it is a bug or some kind of misconfiguration or misunderstanding.
> ----
> h4. Reproduction scenario:
> h4. Writer console log:
> {code:java}
> user@host [] /tmp $ spark-shell --master yarn
> [...]
> scala> (1 to 1000).toDF.write.parquet("hdfs:///tmp/sample_1")
> scala> (1 to 1000).toDF.repartition(42).write.parquet("hdfs:///tmp/sample_2"){code}
> h4. Reader console log:
> {code:java}
> user@host [] /tmp $ spark-shell --master local[2]
> [...]
> scala> spark.read.parquet("hdfs:///tmp/sample_2").count
> res2: Long = 1000
> scala> spark.read.parquet("hdfs:///tmp/sample_1").count
> [Stage 5:=============================>                             (1 + 0) / 2]
> user@host [] /tmp $ spark-shell --master local[1]
> [...]
> scala> spark.read.parquet("hdfs:///tmp/sample_1").count
> res0: Long = 1000                                                           
>      {code}
> ----
> h4. Thread dump of locked thread
> {code:java}
> "main" #1 prio=5 os_prio=0 tid=0x00007f93b8054000 nid=0x6dce waiting on condition [0x00007f93c0658000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000eb65eab8> (a scala.concurrent.impl.Promise$CompletionLatch)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>         at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
>         at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334)
>         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:859)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
>         at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
>         at org.apache.spark.rdd.RDD$$Lambda$2193/1084000875.apply(Unknown Source)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>         at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
>         at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
>         at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3006)
>         at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3005)
>         at org.apache.spark.sql.Dataset$$Lambda$2847/937335652.apply(Unknown Source)
>         at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
>         at org.apache.spark.sql.Dataset$$Lambda$2848/1831604445.apply(Unknown Source)
>         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>         at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2853/2038636888.apply(Unknown Source)
>         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>         at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2849/1622269832.apply(Unknown Source)
>         at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
>         at org.apache.spark.sql.Dataset.count(Dataset.scala:3005)
>         at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
>         at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
>         at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:30)
>         at $line19.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:32)
>         at $line19.$read$$iw$$iw$$iw$$iw.<init>(<console>:34)
>         at $line19.$read$$iw$$iw$$iw.<init>(<console>:36)
>         at $line19.$read$$iw$$iw.<init>(<console>:38)
>         at $line19.$read$$iw.<init>(<console>:40)
>         at $line19.$read.<init>(<console>:42)
>         at $line19.$read$.<init>(<console>:46)
>         at $line19.$read$.<clinit>(<console>)
>         at $line19.$eval$.$print$lzycompute(<console>:7)
>         - locked <0x00000000f67afed8> (a $line19.$eval$)
>         at $line19.$eval$.$print(<console>:6)
>         at $line19.$eval.$print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
>         at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
>         at scala.tools.nsc.interpreter.IMain$$Lambda$1216/588503940.apply(Unknown Source)
>         at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
>         at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
>         at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:894)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:762)
>         at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:464)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:485)
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:239)
>         at org.apache.spark.repl.Main$.doMain(Main.scala:78)
>         at org.apache.spark.repl.Main$.main(Main.scala:58)
>         at org.apache.spark.repl.Main.main(Main.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>         at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
>         at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>         at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>         at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>         at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>  {code}
> Dumps for all thread are available here: [https://pastebin.com/WhdpsEVq] (I can not add attachment for some reason)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org