You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Virgil Palanciuc (JIRA)" <ji...@apache.org> on 2015/08/19 14:12:45 UTC

[jira] [Commented] (SPARK-10109) NPE when saving Parquet To HDFS

    [ https://issues.apache.org/jira/browse/SPARK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14702917#comment-14702917 ] 

Virgil Palanciuc commented on SPARK-10109:
------------------------------------------

Full version details (if it matters):
{quote}
Spark 1.4.1 built for Hadoop 2.0.0-mr1-cdh4.2.0
Build flags: -Psparkr -Phadoop-1 -Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DzincPort=3032
{quote}

> NPE when saving Parquet To HDFS
> -------------------------------
>
>                 Key: SPARK-10109
>                 URL: https://issues.apache.org/jira/browse/SPARK-10109
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1
>         Environment: Sparc-ec2, standalone cluster on amazon
>            Reporter: Virgil Palanciuc
>
> Very simple code, trying to save a dataframe
> I get this in the driver
> {quote}
> 15/08/19 11:21:41 INFO TaskSetManager: Lost task 9.2 in stage 217.0 (TID 4748) on executor 172.xx.xx.xx: java.lang.NullPointerException (null) 
> and  (not for that task):
> 15/08/19 11:21:46 WARN TaskSetManager: Lost task 5.0 in stage 543.0 (TID 5607, 172.yy.yy.yy): java.lang.NullPointerException
>         at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
>         at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
>         at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
>         at org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
>         at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>         at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
>         at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> I get this in the executor log:
> {quote}
> 15/08/19 11:21:41 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not have any open files.
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1225)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR InsertIntoHadoopFsRelation: Aborting task.
> java.lang.RuntimeException: Failed to commit task
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.commitTask(commands.scala:546)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:266)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not have any open files.
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1225)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003)
> 	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)
> 15/08/19 11:21:41 ERROR DynamicPartitionWriterContainer: Task attempt attempt_201508191119_0217_m_000009_2 aborted.
> 15/08/19 11:21:41 ERROR Executor: Exception in task 9.2 in stage 217.0 (TID 4748)
> java.lang.NullPointerException
> 	at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
> 	at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
> 	at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
> 	at org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536)
> 	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> 	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536)
> 	at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:70)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {quote}
> this is the code that I'm using:
> {quote}
>         val ppRDD=
>               sparkContext
>               .sequenceFile[Array[Byte], String](inputPath)
>               .values
>               .repartition(numPartitions)
>               .filter( <criteria>)
>               .flatMap(line => parseGGLogLine(<params; including 2 broadcasted variables, one a "Set" and one a "Map">)
>             if(ppRDD.isEmpty())
>               logInfo(s"<message>")
>             else
>               ppRDD.toDF().write
>               .partitionBy("dpid","pid")
>               .mode( SaveMode.Append )
>               .parquet(outputPath)
> {quote}
> possibly relevant configuration:
> {quote}
>     "spark.sql.parquet" {
>       cacheMetadata = "true",
>       compression.codec = "snappy"
>     }
>     
>  "spark.serializer" = "org.apache.spark.serializer.KryoSerializer",
> {quote}
> I didn't modify the speculation setting, so I'm assuming it's disabled.
> Input path is  s3n://<path>
> Output path is hdfs:///<path> (i.e. ephemeral hdfs)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org