You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Zaczkieiwcz (JIRA)" <ji...@apache.org> on 2015/12/09 16:31:11 UTC

[jira] [Commented] (SPARK-10109) NPE when saving Parquet To HDFS

    [ https://issues.apache.org/jira/browse/SPARK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048821#comment-15048821 ] 

Paul Zaczkieiwcz commented on SPARK-10109:
------------------------------------------

Still seeing this in 1.5.1. My use case is the same. I've got several independent spark jobs downloading data and appending to the same partitioned parquet directory in HDFS. None of the partitions overlap but the _temporary folder has become a bottleneck, only allowing one job to append at a time.

> NPE when saving Parquet To HDFS
> -------------------------------
>
>                 Key: SPARK-10109
>                 URL: https://issues.apache.org/jira/browse/SPARK-10109
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1
>         Environment: Sparc-ec2, standalone cluster on amazon
>            Reporter: Virgil Palanciuc
>
> Very simple code, trying to save a dataframe
> I get this in the driver
> {quote}
> 15/08/19 11:21:41 INFO TaskSetManager: Lost task 9.2 in stage 217.0 (TID 4748) on executor 172.xx.xx.xx: java.lang.NullPointerException (null) 
> and  (not for that task):
> 15/08/19 11:21:46 WARN TaskSetManager: Lost task 5.0 in stage 543.0 (TID 5607, 172.yy.yy.yy): java.lang.NullPointerException
>         at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
>         at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
>         at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
>         at org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>         at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>         at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> I get this in the executor log:
> {quote}
> 15/08/19 11:21:41 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not have any open files.
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1225)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR InsertIntoHadoopFsRelation: Aborting task.
> java.lang.RuntimeException: Failed to commit task
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.commitTask(commands.scala:546)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:266)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not have any open files.
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1225)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR DynamicPartitionWriterContainer: Task attempt attempt_201508191119_0217_m_000009_2 aborted.
> 15/08/19 11:21:41 ERROR Executor: Exception in task 9.2 in stage 217.0 (TID 4748)
> java.lang.NullPointerException
> 	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
> 	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
> 	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
> 	at org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> 	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> 	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {quote}
> this is the code that I'm using:
> {quote}
>         val ppRDD=
>               sparkContext
>               .sequenceFile[Array[Byte], String](inputPath)
>               .values
>               .repartition(numPartitions)
>               .filter( <criteria>)
>               .flatMap(line => parseGGLogLine(<params; including 2 broadcasted variables, one a "Set" and one a "Map">)
>             if(ppRDD.isEmpty())
>               logInfo(s"<message>")
>             else
>               ppRDD.toDF().write
>               .partitionBy("dpid","pid")
>               .mode( SaveMode.Append )
>               .parquet(outputPath)
> {quote}
> possibly relevant configuration:
> {quote}
>     "spark.sql.parquet" {
>       cacheMetadata = "true",
>       compression.codec = "snappy"
>     }
>     
>  "spark.serializer" = "org.apache.spark.serializer.KryoSerializer",
> {quote}
> I didn't modify the speculation setting, so I'm assuming it's disabled.
> Input path is  s3n://<path>
> Output path is hdfs:///<path> (i.e. ephemeral hdfs)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org