You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/11/30 14:29:34 UTC

spark git commit: [SPARK-17843][WEB UI] Indicate event logs pending for processing on h…

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bdd27d1ea -> 8b33aa089


[SPARK-17843][WEB UI] Indicate event logs pending for processing on h\u2026

## What changes were proposed in this pull request?

Backport PR #15410 to branch-2.0

## How was this patch tested?

Existing unit tests. Screenshots for UI changes provided in PR #15410.

Author: Vinayak <vi...@in.ibm.com>

Closes #15991 from vijoshi/SAAS-608.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b33aa08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b33aa08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b33aa08

Branch: refs/heads/branch-2.0
Commit: 8b33aa0899d5388ab51b8daa786d0f76fe255ddf
Parents: bdd27d1
Author: Vinayak <vi...@in.ibm.com>
Authored: Wed Nov 30 08:30:19 2016 -0600
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Wed Nov 30 08:30:19 2016 -0600

----------------------------------------------------------------------
 .../spark/ui/static/historypage-common.js       | 24 ++++++++
 .../history/ApplicationHistoryProvider.scala    | 24 ++++++++
 .../deploy/history/FsHistoryProvider.scala      | 59 ++++++++++++++------
 .../spark/deploy/history/HistoryPage.scala      | 19 +++++++
 .../spark/deploy/history/HistoryServer.scala    |  8 +++
 5 files changed, 116 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
new file mode 100644
index 0000000..55d540d
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ready(function() {
+    if ($('#last-updated').length) {
+      var lastUpdatedMillis = Number($('#last-updated').text());
+      var updatedDate = new Date(lastUpdatedMillis);
+      $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
+    }
+});

http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ba42b48..f3ea541 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -73,6 +73,30 @@ private[history] case class LoadedAppUI(
 private[history] abstract class ApplicationHistoryProvider {
 
   /**
+   * Returns the count of application event logs that the provider is currently still processing.
+   * History Server UI can use this to indicate to a user that the application listing on the UI
+   * can be expected to list additional known applications once the processing of these
+   * application event logs completes.
+   *
+   * A History Provider that does not have a notion of count of event logs that may be pending
+   * for processing need not override this method.
+   *
+   * @return Count of application event logs that are currently under process
+   */
+  def getEventLogsUnderProcess(): Int = {
+    return 0;
+  }
+
+  /**
+   * Returns the time the history provider last updated the application history information
+   *
+   * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
+   */
+  def getLastUpdatedTime(): Long = {
+    return 0;
+  }
+
+  /**
    * Returns a list of applications available for the history server to show.
    *
    * @return List of all know applications.

http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a1e36c5..0d9d22a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
 
 import java.io.{FileNotFoundException, IOException, OutputStream}
 import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.mutable
@@ -107,7 +107,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   // The modification time of the newest log detected during the last scan.   Currently only
   // used for logging msgs (logs are re-scanned based on file size, rather than modtime)
-  private var lastScanTime = -1L
+  private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
 
   // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
   // into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -119,6 +119,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // List of application logs to be deleted by event log cleaner.
   private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
 
+  private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
+
   /**
    * Return a runnable that performs the given operation on the event logs.
    * This operation is expected to be executed periodically.
@@ -223,6 +225,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     applications.get(appId)
   }
 
+  override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
+
+  override def getLastUpdatedTime(): Long = lastScanTime.get()
+
   override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
     try {
       applications.get(appId).flatMap { appInfo =>
@@ -310,26 +316,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       if (logInfos.nonEmpty) {
         logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
       }
-      logInfos.map { file =>
-          replayExecutor.submit(new Runnable {
+
+      var tasks = mutable.ListBuffer[Future[_]]()
+
+      try {
+        for (file <- logInfos) {
+          tasks += replayExecutor.submit(new Runnable {
             override def run(): Unit = mergeApplicationListing(file)
           })
         }
-        .foreach { task =>
-          try {
-            // Wait for all tasks to finish. This makes sure that checkForLogs
-            // is not scheduled again while some tasks are already running in
-            // the replayExecutor.
-            task.get()
-          } catch {
-            case e: InterruptedException =>
-              throw e
-            case e: Exception =>
-              logError("Exception while merging application listings", e)
-          }
+      } catch {
+        // let the iteration over logInfos break, since an exception on
+        // replayExecutor.submit (..) indicates the ExecutorService is unable
+        // to take any more submissions at this time
+
+        case e: Exception =>
+          logError(s"Exception while submitting event log for replay", e)
+      }
+
+      pendingReplayTasksCount.addAndGet(tasks.size)
+
+      tasks.foreach { task =>
+        try {
+          // Wait for all tasks to finish. This makes sure that checkForLogs
+          // is not scheduled again while some tasks are already running in
+          // the replayExecutor.
+          task.get()
+        } catch {
+          case e: InterruptedException =>
+            throw e
+          case e: Exception =>
+            logError("Exception while merging application listings", e)
+        } finally {
+          pendingReplayTasksCount.decrementAndGet()
         }
+      }
 
-      lastScanTime = newLastScanTime
+      lastScanTime.set(newLastScanTime)
     } catch {
       case e: Exception => logError("Exception in checking for event log updates", e)
     }
@@ -346,7 +369,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     } catch {
       case e: Exception =>
         logError("Exception encountered when attempting to update last scan time", e)
-        lastScanTime
+        lastScanTime.get()
     } finally {
       if (!fs.delete(path, true)) {
         logWarning(s"Error deleting ${path}")

http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index a120b6c..faf807d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -33,14 +33,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
       .filter(_.completed != requestedIncomplete)
     val allAppsSize = allApps.size
 
+    val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
+    val lastUpdatedTime = parent.getLastUpdatedTime()
     val providerConfig = parent.getProviderConfig()
     val content =
+      <script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
       <div>
           <div class="span12">
             <ul class="unstyled">
               {providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
             </ul>
             {
+            if (eventLogsUnderProcessCount > 0) {
+              <p>There are {eventLogsUnderProcessCount} event log(s) currently being
+                processed which may result in additional applications getting listed on this page.
+                Refresh the page to view updates. </p>
+            }
+            }
+
+            {
+            if (lastUpdatedTime > 0) {
+              <p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
+            }
+            }
+
+            {
             if (allAppsSize > 0) {
               <script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
               <div id="history-summary" class="span12 pagination"></div> ++
@@ -48,6 +65,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
               <script>setAppLimit({parent.maxApplications})</script>
             } else if (requestedIncomplete) {
               <h4>No incomplete applications found!</h4>
+            } else if (eventLogsUnderProcessCount > 0) {
+              <h4>No completed applications found!</h4>
             } else {
               <h4>No completed applications found!</h4> ++
                 <p>Did you specify the correct logging directory?

http://git-wip-us.apache.org/repos/asf/spark/blob/8b33aa08/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 735aa43..996c19e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -178,6 +178,14 @@ class HistoryServer(
     provider.getListing()
   }
 
+  def getEventLogsUnderProcess(): Int = {
+    provider.getEventLogsUnderProcess()
+  }
+
+  def getLastUpdatedTime(): Long = {
+    provider.getLastUpdatedTime()
+  }
+
   def getApplicationInfoList: Iterator[ApplicationInfo] = {
     getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org