You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/11/11 18:54:33 UTC

spark git commit: [SPARK-17843][WEB UI] Indicate event logs pending for processing on history server UI

Repository: spark
Updated Branches:
  refs/heads/master 4f15d94cf -> a531fe1a8


[SPARK-17843][WEB UI] Indicate event logs pending for processing on history server UI

## What changes were proposed in this pull request?

History Server UI's application listing to display information on currently under process event logs so a user knows that pending this processing an application may not list on the UI.

When there are no event logs under process, the application list page has a "Last Updated" date-time at the top indicating the date-time of the last _completed_ scan of the event logs. The value is displayed to the user in his/her local time zone.
## How was this patch tested?

All unit tests pass. Particularly all the suites under org.apache.spark.deploy.history.\* were run to test changes.
- Very first startup - Pending logs - no logs processed yet:

<img width="1280" alt="screen shot 2016-10-24 at 3 07 04 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640981/b8d2a96a-99fc-11e6-9b1f-2d736fe90e48.png">
- Very first startup - Pending logs - some logs processed:

<img width="1280" alt="screen shot 2016-10-24 at 3 18 42 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641087/3f8e3bae-99fd-11e6-9ef1-e0e70d71d8ef.png">
- Last updated - No currently pending logs:

<img width="1280" alt="screen shot 2016-10-17 at 8 34 37 pm" src="https://cloud.githubusercontent.com/assets/12079825/19443100/4d13946c-94a9-11e6-8ee2-c442729bb206.png">
- Last updated - With some currently pending logs:

<img width="1280" alt="screen shot 2016-10-24 at 3 09 31 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640903/7323ba3a-99fc-11e6-8359-6a45753dbb28.png">
- No applications found and No currently pending logs:

<img width="1280" alt="screen shot 2016-10-24 at 3 24 26 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641364/03a2cb04-99fe-11e6-87d6-d09587fc6201.png">

Author: Vinayak <vi...@in.ibm.com>

Closes #15410 from vijoshi/SAAS-608_master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a531fe1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a531fe1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a531fe1a

Branch: refs/heads/master
Commit: a531fe1a82ec515314f2db2e2305283fef24067f
Parents: 4f15d94
Author: Vinayak <vi...@in.ibm.com>
Authored: Fri Nov 11 12:54:16 2016 -0600
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Fri Nov 11 12:54:16 2016 -0600

----------------------------------------------------------------------
 .../spark/ui/static/historypage-common.js       | 24 ++++++++
 .../history/ApplicationHistoryProvider.scala    | 24 ++++++++
 .../deploy/history/FsHistoryProvider.scala      | 59 ++++++++++++++------
 .../spark/deploy/history/HistoryPage.scala      | 19 +++++++
 .../spark/deploy/history/HistoryServer.scala    |  8 +++
 5 files changed, 116 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a531fe1a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
new file mode 100644
index 0000000..55d540d
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+$(document).ready(function() {
+    if ($('#last-updated').length) {
+      var lastUpdatedMillis = Number($('#last-updated').text());
+      var updatedDate = new Date(lastUpdatedMillis);
+      $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
+    }
+});

http://git-wip-us.apache.org/repos/asf/spark/blob/a531fe1a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 06530ff..d7d8280 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -75,6 +75,30 @@ private[history] case class LoadedAppUI(
 private[history] abstract class ApplicationHistoryProvider {
 
   /**
+   * Returns the count of application event logs that the provider is currently still processing.
+   * History Server UI can use this to indicate to a user that the application listing on the UI
+   * can be expected to list additional known applications once the processing of these
+   * application event logs completes.
+   *
+   * A History Provider that does not have a notion of count of event logs that may be pending
+   * for processing need not override this method.
+   *
+   * @return Count of application event logs that are currently under process
+   */
+  def getEventLogsUnderProcess(): Int = {
+    return 0;
+  }
+
+  /**
+   * Returns the time the history provider last updated the application history information
+   *
+   * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
+   */
+  def getLastUpdatedTime(): Long = {
+    return 0;
+  }
+
+  /**
    * Returns a list of applications available for the history server to show.
    *
    * @return List of all know applications.

http://git-wip-us.apache.org/repos/asf/spark/blob/a531fe1a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index dfc1aad..ca38a47 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
 
 import java.io.{FileNotFoundException, IOException, OutputStream}
 import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.mutable
@@ -108,7 +108,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   // The modification time of the newest log detected during the last scan.   Currently only
   // used for logging msgs (logs are re-scanned based on file size, rather than modtime)
-  private var lastScanTime = -1L
+  private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
 
   // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
   // into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -120,6 +120,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // List of application logs to be deleted by event log cleaner.
   private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
 
+  private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
+
   /**
    * Return a runnable that performs the given operation on the event logs.
    * This operation is expected to be executed periodically.
@@ -226,6 +228,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     applications.get(appId)
   }
 
+  override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
+
+  override def getLastUpdatedTime(): Long = lastScanTime.get()
+
   override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
     try {
       applications.get(appId).flatMap { appInfo =>
@@ -329,26 +335,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       if (logInfos.nonEmpty) {
         logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
       }
-      logInfos.map { file =>
-          replayExecutor.submit(new Runnable {
+
+      var tasks = mutable.ListBuffer[Future[_]]()
+
+      try {
+        for (file <- logInfos) {
+          tasks += replayExecutor.submit(new Runnable {
             override def run(): Unit = mergeApplicationListing(file)
           })
         }
-        .foreach { task =>
-          try {
-            // Wait for all tasks to finish. This makes sure that checkForLogs
-            // is not scheduled again while some tasks are already running in
-            // the replayExecutor.
-            task.get()
-          } catch {
-            case e: InterruptedException =>
-              throw e
-            case e: Exception =>
-              logError("Exception while merging application listings", e)
-          }
+      } catch {
+        // let the iteration over logInfos break, since an exception on
+        // replayExecutor.submit (..) indicates the ExecutorService is unable
+        // to take any more submissions at this time
+
+        case e: Exception =>
+          logError(s"Exception while submitting event log for replay", e)
+      }
+
+      pendingReplayTasksCount.addAndGet(tasks.size)
+
+      tasks.foreach { task =>
+        try {
+          // Wait for all tasks to finish. This makes sure that checkForLogs
+          // is not scheduled again while some tasks are already running in
+          // the replayExecutor.
+          task.get()
+        } catch {
+          case e: InterruptedException =>
+            throw e
+          case e: Exception =>
+            logError("Exception while merging application listings", e)
+        } finally {
+          pendingReplayTasksCount.decrementAndGet()
         }
+      }
 
-      lastScanTime = newLastScanTime
+      lastScanTime.set(newLastScanTime)
     } catch {
       case e: Exception => logError("Exception in checking for event log updates", e)
     }
@@ -365,7 +388,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     } catch {
       case e: Exception =>
         logError("Exception encountered when attempting to update last scan time", e)
-        lastScanTime
+        lastScanTime.get()
     } finally {
       if (!fs.delete(path, true)) {
         logWarning(s"Error deleting ${path}")

http://git-wip-us.apache.org/repos/asf/spark/blob/a531fe1a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 96b9ecf..0e7a6c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -30,14 +30,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
       Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
 
     val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
+    val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
+    val lastUpdatedTime = parent.getLastUpdatedTime()
     val providerConfig = parent.getProviderConfig()
     val content =
+      <script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
       <div>
           <div class="span12">
             <ul class="unstyled">
               {providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
             </ul>
             {
+            if (eventLogsUnderProcessCount > 0) {
+              <p>There are {eventLogsUnderProcessCount} event log(s) currently being
+                processed which may result in additional applications getting listed on this page.
+                Refresh the page to view updates. </p>
+            }
+            }
+
+            {
+            if (lastUpdatedTime > 0) {
+              <p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
+            }
+            }
+
+            {
             if (allAppsSize > 0) {
               <script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
                 <div id="history-summary" class="span12 pagination"></div> ++
@@ -46,6 +63,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
                 <script>setAppLimit({parent.maxApplications})</script>
             } else if (requestedIncomplete) {
               <h4>No incomplete applications found!</h4>
+            } else if (eventLogsUnderProcessCount > 0) {
+              <h4>No completed applications found!</h4>
             } else {
               <h4>No completed applications found!</h4> ++ parent.emptyListingHtml
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/a531fe1a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 3175b36..7e21fa6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -179,6 +179,14 @@ class HistoryServer(
     provider.getListing()
   }
 
+  def getEventLogsUnderProcess(): Int = {
+    provider.getEventLogsUnderProcess()
+  }
+
+  def getLastUpdatedTime(): Long = {
+    provider.getLastUpdatedTime()
+  }
+
   def getApplicationInfoList: Iterator[ApplicationInfo] = {
     getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org