You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/06/13 19:21:12 UTC
[spark] branch master updated: [SPARK-39439][CORE] Check final file if in-progress event log file does not exist
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5649c721289 [SPARK-39439][CORE] Check final file if in-progress event log file does not exist
5649c721289 is described below
commit 5649c7212897a52530e5d79259f7c551c29473cd
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Jun 13 12:20:46 2022 -0700
[SPARK-39439][CORE] Check final file if in-progress event log file does not exist
### What changes were proposed in this pull request?
Check the final event log if the in-progress event log file does not exist.
### Why are the changes needed?
We see lots of the following errors in SHS log on a busy cluster. It's actually not an error, just because the application was completed during SHS processing the event log.
```
java.io.FileNotFoundException: File does not exist: /spark2-history/application_1651280650063_4556105_1.lz4.inprogress
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:72)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:62)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:170)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1860)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:697)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:381)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2606)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:854)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:841)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:830)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1069)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:303)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:299)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
at org.apache.spark.deploy.history.EventLogFileReader$.openEventLog(EventLogFileReaders.scala:133)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$2(FsHistoryProvider.scala:1131)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2625)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$1(FsHistoryProvider.scala:1131)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$1$adapted(FsHistoryProvider.scala:1129)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.deploy.history.FsHistoryProvider.parseAppEventLogs(FsHistoryProvider.scala:1129)
at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:778)
at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:715)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:581)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /spark2-history/application_1651280650063_4556105_1.lz4.inprogress
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:72)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:62)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:170)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1860)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:697)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:381)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2606)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507)
at org.apache.hadoop.ipc.Client.call(Client.java:1453)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:852)
... 23 more
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT
Closes #36832 from pan3793/SPARK-39439.
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/deploy/history/FsHistoryProvider.scala | 11 +++++
.../deploy/history/FsHistoryProviderSuite.scala | 49 +++++++++++++++++++++-
2 files changed, 59 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a2b162468de..333299f6c11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -747,6 +747,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.synchronized {
listing.delete(classOf[LogInfo], rootPath.toString)
}
+ case _: FileNotFoundException
+ if reader.rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) =>
+ val finalFileName = reader.rootPath.getName.stripSuffix(EventLogFileWriter.IN_PROGRESS)
+ val finalFilePath = new Path(reader.rootPath.getParent, finalFileName)
+ if (fs.exists(finalFilePath)) {
+ // Do nothing, the application completed during processing, the final event log file
+ // will be processed by next around.
+ } else {
+ logWarning(s"In-progress event log file does not exist: ${reader.rootPath}, " +
+ s"neither does the final event log file: $finalFilePath.")
+ }
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 8589e948fd1..541b283c13f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.ArgumentMatchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
+import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
@@ -57,7 +58,9 @@ import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
import org.apache.spark.util.kvstore.InMemoryStore
import org.apache.spark.util.logging.DriverLogger
-abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
+abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging
+ with PrivateMethodTester {
+
private var testDir: File = null
override def beforeEach(): Unit = {
@@ -221,6 +224,50 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with L
}
}
+ test("SPARK-39439: Check final file if in-progress event log file does not exist") {
+ withTempDir { dir =>
+ val conf = createTestConf()
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+ conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+ val provider = new FsHistoryProvider(conf)
+
+ val mergeApplicationListing = PrivateMethod[Unit]('mergeApplicationListing)
+
+ val inProgressFile = newLogFile("app1", None, inProgress = true)
+ val logAppender1 = new LogAppender("in-progress and final event log files does not exist")
+ withLogAppender(logAppender1) {
+ provider invokePrivate mergeApplicationListing(
+ EventLogFileReader(fs, new Path(inProgressFile.toURI), None),
+ System.currentTimeMillis,
+ true
+ )
+ }
+ val logs1 = logAppender1.loggingEvents.map(_.getMessage.getFormattedMessage)
+ .filter(_.contains("In-progress event log file does not exist: "))
+ assert(logs1.size === 1)
+
+ writeFile(inProgressFile, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
+ SparkListenerApplicationEnd(2L))
+ val finalFile = newLogFile("app1", None, inProgress = false)
+ inProgressFile.renameTo(finalFile)
+ val logAppender2 = new LogAppender("in-progress event log file has been renamed to final")
+ withLogAppender(logAppender2) {
+ provider invokePrivate mergeApplicationListing(
+ EventLogFileReader(fs, new Path(inProgressFile.toURI), None),
+ System.currentTimeMillis,
+ true
+ )
+ }
+ val logs2 = logAppender2.loggingEvents.map(_.getMessage.getFormattedMessage)
+ .filter(_.contains("In-progress event log file does not exist: "))
+ assert(logs2.isEmpty)
+ }
+ }
+
test("Parse logs that application is not started") {
val provider = new FsHistoryProvider(createTestConf())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org