You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/09/11 15:34:45 UTC
[jira] [Reopened] (SPARK-10358) Spark-sql throws IOException on
exit when using HDFS to store event log.
[ https://issues.apache.org/jira/browse/SPARK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen reopened SPARK-10358:
-------------------------------
> Spark-sql throws IOException on exit when using HDFS to store event log.
> ------------------------------------------------------------------------
>
> Key: SPARK-10358
> URL: https://issues.apache.org/jira/browse/SPARK-10358
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.3.1
> Environment: * spark-1.3.1-bin-hadoop2.6
> * hadoop-2.6.0
> * Red hat 2.6.32-504.el6.x86_64
> Reporter: Sioa Song
> Priority: Minor
>
> h2. Summary
> In Spark 1.3.1, if using HDFS to store event log, spark-sql will throw an "java.io.IOException: Filesystem closed" when exit.
> h2. How to reproduce
> 1. Enable event log mechanism, and configure the file location to HDFS.
> You can do this by setting these two properties in spark-defaults.conf:
> spark.eventLog.enabled true
> spark.eventLog.dir hdfs://xxxxx:xxxxx/spark-events
> 2. start spark-sql, and type exit once it starts.
> {noformat}
> spark-sql> exit;
> 15/08/14 06:29:20 ERROR scheduler.LiveListenerBus: Listener EventLoggingListener threw an exception
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
> at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
> at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:181)
> at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54)
> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
> at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1678)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
> Caused by: java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795)
> at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985)
> at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
> at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
> ... 19 more
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
> 15/08/14 06:29:20 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
> 15/08/14 06:29:20 INFO ui.SparkUI: Stopped Spark web UI at http://9.111.251.177:4040
> 15/08/14 06:29:20 INFO scheduler.DAGScheduler: Stopping DAGScheduler
> 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
> 15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
> Exception in thread "Thread-4" java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1986)
> at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
> at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
> at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
> at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:198)
> at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391)
> at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1391)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)
> {noformat}
> h2. Analysis
> Enabling DEBUG level, we can see more information:
> {noformat}
> 15/08/14 09:32:34 DEBUG SessionState: Removing resource dir /tmp/a9d3b5b8-3329-4e93-aee2-0a3496d661fa_resources
> 15/08/14 09:32:34 DEBUG SparkSQLEnv: Shutting down Spark SQL Environment
> 15/08/14 09:32:34 DEBUG DiskBlockManager: Shutdown hook called
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=2, src=/spark/events/app-20150814093225-0002.snappy.inprogress, packetSize=65532, chunksPerPacket=127,bytesCurBlock=2048
> 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 2
> 15/08/14 09:32:34 DEBUG DFSClient: Queued packet 3
> 15/08/14 09:32:34 DEBUG DFSClient: Waiting for ack for: 3
> 15/08/14 09:32:34 DEBUG Utils: Shutdown hook called
> 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet packet seqno:2 offsetInBlock:2048 lastPacketInBlock:false lastByteOffsetInBlock: 2399
> 15/08/14 09:32:34 INFO SparkContext: END POST APPLICATION END
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 2 status: SUCCESS downstreamAckTimeNanos: 0
> 15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet packet seqno:3 offsetInBlock:2399 lastPacketInBlock:true lastByteOffsetInBlock: 2399
> 15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 3 status: SUCCESS downstreamAckTimeNanos: 0
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping org.spark-project.jetty.server.Server@489bb457
> 15/08/14 09:32:34 DEBUG DFSClient: Closing old block BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping SelectChannelConnector@0.0.0.0:4040
> 15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@15e3d24a
> 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-45 Selector0,5,main] on org.spark-project.jetty.io.nio.SelectorManager$1@25964fe8
> 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to /9.111.251.177:9000 from root sending #6
> 15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to /9.111.251.177:9000 from root got value #6
> 15/08/14 09:32:34 DEBUG ProtobufRpcEngine: Call: complete took 4ms
> 15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-46 Selector1,5,main] on org.spark-project.jetty.io.nio.SelectorManager$1@2f581b9f
> 15/08/14 09:32:34 DEBUG Client: stopping client from cache: org.apache.hadoop.ipc.Client@3aca5e2
> 15/08/14 09:32:34 DEBUG Client: removing client from cache: org.apache.hadoop.ipc.Client@3aca5e2
> 15/08/14 09:32:34 DEBUG Client: stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@3aca5e2
> 15/08/14 09:32:34 DEBUG Client: Stopping client
> 15/08/14 09:32:34 *ERROR* LiveListenerBus: Listener EventLoggingListener threw an exception
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
> at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
> at scala.Option.foreach(Option.scala:236)
> {noformat}
> From the stacktrace, we can see a "Shutdown hook called" of org.apache.spark.util.Utils, and then the shutdown hook of HDFS where DFSClient is stopped, and finally the SparkSQLEnv.stop, where the exception was thrown.
> Clearly, *Spark is trying to add new event log message after HDFS client is closed, which caused the exception*.
> After checking org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver, we found that the SparkSQLEnv.stop() is invoked from a normal java runtime shutdown hook:
> {code}
> // Clean up after we exit
> Runtime.getRuntime.addShutdownHook(
> new Thread() {
> override def run() {
> SparkSQLEnv.stop()
> }
> }
> )
> {code}
> This shutdown hook has lower priority, and will be executed after the DFSClient is closed.
> h2. Solution
> We changed it by *using Hadoop's ShutdownHookManager and assigning a higher priority than HDFS's shutdown hook*. It fixed the problem.
> We'll link a pull request later for review.
> The link is [https://github.com/apache/spark/pull/8530#issuecomment-136236733]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org