You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/11/30 14:29:34 UTC
spark git commit: [SPARK-17843][WEB UI] Indicate event logs pending for processing on h…
Repository: spark
Updated Branches:
refs/heads/branch-2.0 bdd27d1ea -> 8b33aa089
[SPARK-17843][WEB UI] Indicate event logs pending for processing on h\u2026
## What changes were proposed in this pull request?
Backport PR #15410 to branch-2.0
## How was this patch tested?
Existing unit tests. Screenshots for UI changes provided in PR #15410.
Author: Vinayak <vi...@in.ibm.com>
Closes #15991 from vijoshi/SAAS-608.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b33aa08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b33aa08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b33aa08
Branch: refs/heads/branch-2.0
Commit: 8b33aa0899d5388ab51b8daa786d0f76fe255ddf
Parents: bdd27d1
Author: Vinayak <vi...@in.ibm.com>
Authored: Wed Nov 30 08:30:19 2016 -0600
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Wed Nov 30 08:30:19 2016 -0600
----------------------------------------------------------------------
.../spark/ui/static/historypage-common.js | 24 ++++++++
.../history/ApplicationHistoryProvider.scala | 24 ++++++++
.../deploy/history/FsHistoryProvider.scala | 59 ++++++++++++++------
.../spark/deploy/history/HistoryPage.scala | 19 +++++++
.../spark/deploy/history/HistoryServer.scala | 8 +++
5 files changed, 116 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
new file mode 100644
index 0000000..55d540d
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ready(function() {
+ if ($('#last-updated').length) {
+ var lastUpdatedMillis = Number($('#last-updated').text());
+ var updatedDate = new Date(lastUpdatedMillis);
+ $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
+ }
+});
http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ba42b48..f3ea541 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -73,6 +73,30 @@ private[history] case class LoadedAppUI(
private[history] abstract class ApplicationHistoryProvider {
/**
+ * Returns the count of application event logs that the provider is currently still processing.
+ * History Server UI can use this to indicate to a user that the application listing on the UI
+ * can be expected to list additional known applications once the processing of these
+ * application event logs completes.
+ *
+ * A History Provider that does not have a notion of count of event logs that may be pending
+ * for processing need not override this method.
+ *
+ * @return Count of application event logs that are currently under process
+ */
+ def getEventLogsUnderProcess(): Int = {
+ return 0;
+ }
+
+ /**
+ * Returns the time the history provider last updated the application history information
+ *
+ * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
+ */
+ def getLastUpdatedTime(): Long = {
+ return 0;
+ }
+
+ /**
* Returns a list of applications available for the history server to show.
*
* @return List of all know applications.
http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a1e36c5..0d9d22a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.mutable
@@ -107,7 +107,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// The modification time of the newest log detected during the last scan. Currently only
// used for logging msgs (logs are re-scanned based on file size, rather than modtime)
- private var lastScanTime = -1L
+ private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -119,6 +119,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
+ private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
+
/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
@@ -223,6 +225,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
applications.get(appId)
}
+ override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
+
+ override def getLastUpdatedTime(): Long = lastScanTime.get()
+
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
try {
applications.get(appId).flatMap { appInfo =>
@@ -310,26 +316,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
- logInfos.map { file =>
- replayExecutor.submit(new Runnable {
+
+ var tasks = mutable.ListBuffer[Future[_]]()
+
+ try {
+ for (file <- logInfos) {
+ tasks += replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(file)
})
}
- .foreach { task =>
- try {
- // Wait for all tasks to finish. This makes sure that checkForLogs
- // is not scheduled again while some tasks are already running in
- // the replayExecutor.
- task.get()
- } catch {
- case e: InterruptedException =>
- throw e
- case e: Exception =>
- logError("Exception while merging application listings", e)
- }
+ } catch {
+ // let the iteration over logInfos break, since an exception on
+ // replayExecutor.submit (..) indicates the ExecutorService is unable
+ // to take any more submissions at this time
+
+ case e: Exception =>
+ logError(s"Exception while submitting event log for replay", e)
+ }
+
+ pendingReplayTasksCount.addAndGet(tasks.size)
+
+ tasks.foreach { task =>
+ try {
+ // Wait for all tasks to finish. This makes sure that checkForLogs
+ // is not scheduled again while some tasks are already running in
+ // the replayExecutor.
+ task.get()
+ } catch {
+ case e: InterruptedException =>
+ throw e
+ case e: Exception =>
+ logError("Exception while merging application listings", e)
+ } finally {
+ pendingReplayTasksCount.decrementAndGet()
}
+ }
- lastScanTime = newLastScanTime
+ lastScanTime.set(newLastScanTime)
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
@@ -346,7 +369,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
- lastScanTime
+ lastScanTime.get()
} finally {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index a120b6c..faf807d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -33,14 +33,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
.filter(_.completed != requestedIncomplete)
val allAppsSize = allApps.size
+ val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
+ val lastUpdatedTime = parent.getLastUpdatedTime()
val providerConfig = parent.getProviderConfig()
val content =
+ <script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
<div>
<div class="span12">
<ul class="unstyled">
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
+ if (eventLogsUnderProcessCount > 0) {
+ <p>There are {eventLogsUnderProcessCount} event log(s) currently being
+ processed which may result in additional applications getting listed on this page.
+ Refresh the page to view updates. </p>
+ }
+ }
+
+ {
+ if (lastUpdatedTime > 0) {
+ <p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
+ }
+ }
+
+ {
if (allAppsSize > 0) {
<script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
<div id="history-summary" class="span12 pagination"></div> ++
@@ -48,6 +65,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
<script>setAppLimit({parent.maxApplications})</script>
} else if (requestedIncomplete) {
<h4>No incomplete applications found!</h4>
+ } else if (eventLogsUnderProcessCount > 0) {
+ <h4>No completed applications found!</h4>
} else {
<h4>No completed applications found!</h4> ++
<p>Did you specify the correct logging directory?
http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 735aa43..996c19e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -178,6 +178,14 @@ class HistoryServer(
provider.getListing()
}
+ def getEventLogsUnderProcess(): Int = {
+ provider.getEventLogsUnderProcess()
+ }
+
+ def getLastUpdatedTime(): Long = {
+ provider.getLastUpdatedTime()
+ }
+
def getApplicationInfoList: Iterator[ApplicationInfo] = {
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org