You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/06/13 19:21:12 UTC

[spark] branch master updated: [SPARK-39439][CORE] Check final file if in-progress event log file does not exist

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5649c721289 [SPARK-39439][CORE] Check final file if in-progress event log file does not exist
5649c721289 is described below

commit 5649c7212897a52530e5d79259f7c551c29473cd
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Jun 13 12:20:46 2022 -0700

    [SPARK-39439][CORE] Check final file if in-progress event log file does not exist
    
    ### What changes were proposed in this pull request?
    
    Check the final event log if the in-progress event log file does not exist.
    
    ### Why are the changes needed?
    
    We see lots of the following errors in SHS log on a busy cluster. It's actually not an error, just because the application was completed during SHS processing the event log.
    
    ```
    java.io.FileNotFoundException: File does not exist: /spark2-history/application_1651280650063_4556105_1.lz4.inprogress
            at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:72)
            at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:62)
            at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:170)
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1860)
            at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:697)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:381)
            at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
            at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
            at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:422)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2606)
    
            at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
            at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
            at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
            at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:854)
            at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:841)
            at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:830)
            at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1069)
            at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:303)
            at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:299)
            at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
            at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:311)
            at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
            at org.apache.spark.deploy.history.EventLogFileReader$.openEventLog(EventLogFileReaders.scala:133)
            at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$2(FsHistoryProvider.scala:1131)
            at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2625)
            at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$1(FsHistoryProvider.scala:1131)
            at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$1$adapted(FsHistoryProvider.scala:1129)
            at scala.collection.immutable.List.foreach(List.scala:392)
            at org.apache.spark.deploy.history.FsHistoryProvider.parseAppEventLogs(FsHistoryProvider.scala:1129)
            at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:778)
            at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:715)
            at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:581)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /spark2-history/application_1651280650063_4556105_1.lz4.inprogress
            at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:72)
            at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:62)
            at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:170)
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1860)
            at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:697)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:381)
            at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
            at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
            at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:422)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2606)
    
            at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507)
            at org.apache.hadoop.ipc.Client.call(Client.java:1453)
            at org.apache.hadoop.ipc.Client.call(Client.java:1363)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
            at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
            at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
            at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
            at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
            at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
            at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
            at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:852)
            ... 23 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT
    
    Closes #36832 from pan3793/SPARK-39439.
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 11 +++++
 .../deploy/history/FsHistoryProviderSuite.scala    | 49 +++++++++++++++++++++-
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a2b162468de..333299f6c11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -747,6 +747,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         listing.synchronized {
           listing.delete(classOf[LogInfo], rootPath.toString)
         }
+      case _: FileNotFoundException
+          if reader.rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) =>
+        val finalFileName = reader.rootPath.getName.stripSuffix(EventLogFileWriter.IN_PROGRESS)
+        val finalFilePath = new Path(reader.rootPath.getParent, finalFileName)
+        if (fs.exists(finalFilePath)) {
+          // Do nothing, the application completed during processing, the final event log file
+          // will be processed by next around.
+        } else {
+          logWarning(s"In-progress event log file does not exist: ${reader.rootPath}, " +
+            s"neither does the final event log file: $finalFilePath.")
+        }
       case e: Exception =>
         logError("Exception while merging application listings", e)
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 8589e948fd1..541b283c13f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.AccessControlException
 import org.json4s.jackson.JsonMethods._
 import org.mockito.ArgumentMatchers.{any, argThat}
 import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
+import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
@@ -57,7 +58,9 @@ import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 import org.apache.spark.util.kvstore.InMemoryStore
 import org.apache.spark.util.logging.DriverLogger
 
-abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
+abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging
+  with PrivateMethodTester {
+
   private var testDir: File = null
 
   override def beforeEach(): Unit = {
@@ -221,6 +224,50 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with L
     }
   }
 
+  test("SPARK-39439: Check final file if in-progress event log file does not exist") {
+    withTempDir { dir =>
+      val conf = createTestConf()
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+      conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+      val provider = new FsHistoryProvider(conf)
+
+      val mergeApplicationListing = PrivateMethod[Unit]('mergeApplicationListing)
+
+      val inProgressFile = newLogFile("app1", None, inProgress = true)
+      val logAppender1 = new LogAppender("in-progress and final event log files does not exist")
+      withLogAppender(logAppender1) {
+        provider invokePrivate mergeApplicationListing(
+          EventLogFileReader(fs, new Path(inProgressFile.toURI), None),
+          System.currentTimeMillis,
+          true
+        )
+      }
+      val logs1 = logAppender1.loggingEvents.map(_.getMessage.getFormattedMessage)
+        .filter(_.contains("In-progress event log file does not exist: "))
+      assert(logs1.size === 1)
+
+      writeFile(inProgressFile, None,
+        SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
+        SparkListenerApplicationEnd(2L))
+      val finalFile = newLogFile("app1", None, inProgress = false)
+      inProgressFile.renameTo(finalFile)
+      val logAppender2 = new LogAppender("in-progress event log file has been renamed to final")
+      withLogAppender(logAppender2) {
+        provider invokePrivate mergeApplicationListing(
+          EventLogFileReader(fs, new Path(inProgressFile.toURI), None),
+          System.currentTimeMillis,
+          true
+        )
+      }
+      val logs2 = logAppender2.loggingEvents.map(_.getMessage.getFormattedMessage)
+        .filter(_.contains("In-progress event log file does not exist: "))
+      assert(logs2.isEmpty)
+    }
+  }
+
   test("Parse logs that application is not started") {
     val provider = new FsHistoryProvider(createTestConf())
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org