You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/06/03 20:43:34 UTC

spark git commit: [SPARK-7161] [HISTORY SERVER] Provide REST api to download event logs fro...

Repository: spark
Updated Branches:
  refs/heads/master d053a31be -> d2a86eb8f


[SPARK-7161] [HISTORY SERVER] Provide REST api to download event logs fro...

...m History Server

This PR adds a new API that allows the user to download event logs for an application as a zip file. APIs have been added to download all logs for a given application or just for a specific attempt.

This also add an additional method to the ApplicationHistoryProvider to get the raw files, zipped.

Author: Hari Shreedharan <hs...@apache.org>

Closes #5792 from harishreedharan/eventlog-download and squashes the following commits:

221cc26 [Hari Shreedharan] Update docs with new API information.
a131be6 [Hari Shreedharan] Fix style issues.
5528bd8 [Hari Shreedharan] Merge branch 'master' into eventlog-download
6e8156e [Hari Shreedharan] Simplify tests, use Guava stream copy methods.
d8ddede [Hari Shreedharan] Remove unnecessary case in EventLogDownloadResource.
ffffb53 [Hari Shreedharan] Changed interface to use zip stream. Added more tests.
1100b40 [Hari Shreedharan] Ensure that `Path` does not appear in interfaces, by rafactoring interfaces.
5a5f3e2 [Hari Shreedharan] Fix test ordering issue.
0b66948 [Hari Shreedharan] Minor formatting/import fixes.
4fc518c [Hari Shreedharan] Fix rat failures.
a48b91f [Hari Shreedharan] Refactor to make attemptId optional in the API. Also added tests.
0fc1424 [Hari Shreedharan] File download now works for individual attempts and the entire application.
350d7e8 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into eventlog-download
fd6ab00 [Hari Shreedharan] Fix style issues
32b7662 [Hari Shreedharan] Use UIRoot directly in ApiRootResource. Also, use `Response` class to set headers.
7b362b2 [Hari Shreedharan] Almost working.
3d18ebc [Hari Shreedharan] [WIP] Try getting the event log download to work.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2a86eb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2a86eb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2a86eb8

Branch: refs/heads/master
Commit: d2a86eb8f0fcc02304604da56c589ea58c77587a
Parents: d053a31
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Jun 3 13:43:13 2015 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Wed Jun 3 13:43:13 2015 -0500

----------------------------------------------------------------------
 .rat-excludes                                   |  2 +
 .../history/ApplicationHistoryProvider.scala    | 11 +++
 .../deploy/history/FsHistoryProvider.scala      | 63 +++++++++++++-
 .../spark/deploy/history/HistoryServer.scala    |  8 ++
 .../spark/status/api/v1/ApiRootResource.scala   | 20 +++++
 .../api/v1/EventLogDownloadResource.scala       | 70 ++++++++++++++++
 .../application_list_json_expectation.json      | 16 ++++
 .../completed_app_list_json_expectation.json    | 16 ++++
 .../minDate_app_list_json_expectation.json      | 34 ++++++--
 .../spark-events/local-1430917381535_1          |  5 ++
 .../spark-events/local-1430917381535_2          |  5 ++
 .../deploy/history/FsHistoryProviderSuite.scala | 40 ++++++++-
 .../deploy/history/HistoryServerSuite.scala     | 88 ++++++++++++++++++--
 docs/monitoring.md                              |  8 ++
 14 files changed, 367 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 8f2722c..994c7e8 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -80,6 +80,8 @@ local-1425081759269/*
 local-1426533911241/*
 local-1426633911242/*
 local-1430917381534/*
+local-1430917381535_1
+local-1430917381535_2
 DESCRIPTION
 NAMESPACE
 test_support/*

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 298a820..5f5e0fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.deploy.history
 
+import java.util.zip.ZipOutputStream
+
+import org.apache.spark.SparkException
 import org.apache.spark.ui.SparkUI
 
 private[spark] case class ApplicationAttemptInfo(
@@ -62,4 +65,12 @@ private[history] abstract class ApplicationHistoryProvider {
    */
   def getConfig(): Map[String, String] = Map()
 
+  /**
+   * Writes out the event logs to the output stream provided. The logs will be compressed into a
+   * single zip file and written out.
+   * @throws SparkException if the logs for the app id cannot be found.
+   */
+  @throws(classOf[SparkException])
+  def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 45c2be3..52b149b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,16 +17,18 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
+import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.mutable
 
+import com.google.common.io.ByteStreams
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.fs.permission.AccessControlException
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
@@ -59,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     .map { d => Utils.resolveURI(d).toString }
     .getOrElse(DEFAULT_LOG_DIR)
 
-  private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)
 
   // Used by check event thread and clean log thread.
   // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -219,6 +222,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
+  override def writeEventLogs(
+      appId: String,
+      attemptId: Option[String],
+      zipStream: ZipOutputStream): Unit = {
+
+    /**
+     * This method compresses the files passed in, and writes the compressed data out into the
+     * [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
+     * the name of the file being compressed.
+     */
+    def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
+      val fs = FileSystem.get(hadoopConf)
+      val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
+      try {
+        outputStream.putNextEntry(new ZipEntry(entryName))
+        ByteStreams.copy(inputStream, outputStream)
+        outputStream.closeEntry()
+      } finally {
+        inputStream.close()
+      }
+    }
+
+    applications.get(appId) match {
+      case Some(appInfo) =>
+        try {
+          // If no attempt is specified, or there is no attemptId for attempts, return all attempts
+          appInfo.attempts.filter { attempt =>
+            attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
+          }.foreach { attempt =>
+            val logPath = new Path(logDir, attempt.logPath)
+            // If this is a legacy directory, then add the directory to the zipStream and add
+            // each file to that directory.
+            if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
+              val files = fs.listFiles(logPath, false)
+              zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
+              zipStream.closeEntry()
+              while (files.hasNext) {
+                val file = files.next().getPath
+                zipFileToStream(file, attempt.logPath + Path.SEPARATOR + file.getName, zipStream)
+              }
+            } else {
+              zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
+            }
+          }
+        } finally {
+          zipStream.close()
+        }
+      case None => throw new SparkException(s"Logs for $appId not found.")
+    }
+  }
+
+
   /**
    * Replay the log files in the list and merge the list of old applications with new ones
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 5a0eb58..10638af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.history
 
 import java.util.NoSuchElementException
+import java.util.zip.ZipOutputStream
 import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 
 import com.google.common.cache._
@@ -173,6 +174,13 @@ class HistoryServer(
     getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }
 
+  override def writeEventLogs(
+      appId: String,
+      attemptId: Option[String],
+      zipStream: ZipOutputStream): Unit = {
+    provider.writeEventLogs(appId, attemptId, zipStream)
+  }
+
   /**
    * Returns the provider configuration to show in the listing page.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index f73c742..9af90ee 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.status.api.v1
 
+import java.util.zip.ZipOutputStream
 import javax.servlet.ServletContext
 import javax.ws.rs._
 import javax.ws.rs.core.{Context, Response}
@@ -164,6 +165,18 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
     }
   }
 
+  @Path("applications/{appId}/logs")
+  def getEventLogs(
+    @PathParam("appId") appId: String): EventLogDownloadResource = {
+    new EventLogDownloadResource(uiRoot, appId, None)
+  }
+
+  @Path("applications/{appId}/{attemptId}/logs")
+  def getEventLogs(
+    @PathParam("appId") appId: String,
+    @PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
+    new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
+  }
 }
 
 private[spark] object ApiRootResource {
@@ -193,6 +206,13 @@ private[spark] trait UIRoot {
   def getSparkUI(appKey: String): Option[SparkUI]
   def getApplicationInfoList: Iterator[ApplicationInfo]
 
+  def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit = {
+    Response.serverError()
+      .entity("Event logs are only available through the history server.")
+      .status(Response.Status.SERVICE_UNAVAILABLE)
+      .build()
+  }
+
   /**
    * Get the spark UI with the given appID, and apply a function
    * to it.  If there is no such app, throw an appropriate exception

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
new file mode 100644
index 0000000..d416dba
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.io.OutputStream
+import java.util.zip.ZipOutputStream
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+
+@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
+private[v1] class EventLogDownloadResource(
+    val uIRoot: UIRoot,
+    val appId: String,
+    val attemptId: Option[String]) extends Logging {
+  val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)
+
+  @GET
+  def getEventLogs(): Response = {
+    try {
+      val fileName = {
+        attemptId match {
+          case Some(id) => s"eventLogs-$appId-$id.zip"
+          case None => s"eventLogs-$appId.zip"
+        }
+      }
+
+      val stream = new StreamingOutput {
+        override def write(output: OutputStream) = {
+          val zipStream = new ZipOutputStream(output)
+          try {
+            uIRoot.writeEventLogs(appId, attemptId, zipStream)
+          } finally {
+            zipStream.close()
+          }
+
+        }
+      }
+
+      Response.ok(stream)
+        .header("Content-Disposition", s"attachment; filename=$fileName")
+        .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
+        .build()
+    } catch {
+      case NonFatal(e) =>
+        Response.serverError()
+          .entity(s"Event logs are not available for app: $appId.")
+          .status(Response.Status.SERVICE_UNAVAILABLE)
+          .build()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index ce4fe80..d575bf2 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -8,6 +8,22 @@
     "completed" : true
   } ]
 }, {
+  "id" : "local-1430917381535",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "attemptId" : "2",
+    "startTime" : "2015-05-06T13:03:00.893GMT",
+    "endTime" : "2015-05-06T13:03:00.950GMT",
+    "sparkUser" : "irashid",
+    "completed" : true
+  }, {
+    "attemptId" : "1",
+    "startTime" : "2015-05-06T13:03:00.880GMT",
+    "endTime" : "2015-05-06T13:03:00.890GMT",
+    "sparkUser" : "irashid",
+    "completed" : true
+  } ]
+}, {
   "id" : "local-1426533911241",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index ce4fe80..d575bf2 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -8,6 +8,22 @@
     "completed" : true
   } ]
 }, {
+  "id" : "local-1430917381535",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "attemptId" : "2",
+    "startTime" : "2015-05-06T13:03:00.893GMT",
+    "endTime" : "2015-05-06T13:03:00.950GMT",
+    "sparkUser" : "irashid",
+    "completed" : true
+  }, {
+    "attemptId" : "1",
+    "startTime" : "2015-05-06T13:03:00.880GMT",
+    "endTime" : "2015-05-06T13:03:00.890GMT",
+    "sparkUser" : "irashid",
+    "completed" : true
+  } ]
+}, {
   "id" : "local-1426533911241",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index dca86fe..15c2de8 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -7,6 +7,22 @@
     "sparkUser" : "irashid",
     "completed" : true
   } ]
+},  {
+  "id" : "local-1430917381535",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "attemptId" : "2",
+    "startTime" : "2015-05-06T13:03:00.893GMT",
+    "endTime" : "2015-05-06T13:03:00.950GMT",
+    "sparkUser" : "irashid",
+    "completed" : true
+  }, {
+    "attemptId" : "1",
+    "startTime" : "2015-05-06T13:03:00.880GMT",
+    "endTime" : "2015-05-06T13:03:00.890GMT",
+    "sparkUser" : "irashid",
+    "completed" : true
+  } ]
 }, {
   "id" : "local-1426533911241",
   "name" : "Spark shell",
@@ -24,12 +40,14 @@
     "completed" : true
   } ]
 }, {
-  "id" : "local-1425081759269",
-  "name" : "Spark shell",
-  "attempts" : [ {
-    "startTime" : "2015-02-28T00:02:38.277GMT",
-    "endTime" : "2015-02-28T00:02:46.912GMT",
-    "sparkUser" : "irashid",
-    "completed" : true
-  } ]
+    "id": "local-1425081759269",
+    "name": "Spark shell",
+    "attempts": [
+      {
+        "startTime": "2015-02-28T00:02:38.277GMT",
+        "endTime": "2015-02-28T00:02:46.912GMT",
+        "sparkUser": "irashid",
+        "completed": true
+      }
+    ]
 } ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/resources/spark-events/local-1430917381535_1
----------------------------------------------------------------------
diff --git a/core/src/test/resources/spark-events/local-1430917381535_1 b/core/src/test/resources/spark-events/local-1430917381535_1
new file mode 100644
index 0000000..d5a1303
--- /dev/null
+++ b/core/src/test/resources/spark-events/local-1430917381535_1
@@ -0,0 +1,5 @@
+{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380880}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","su
 n.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMac
 hines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Conte
 nts/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/ir
 ashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380880,"User":"irashid","App Attempt ID":"1"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380890}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/resources/spark-events/local-1430917381535_2
----------------------------------------------------------------------
diff --git a/core/src/test/resources/spark-events/local-1430917381535_2 b/core/src/test/resources/spark-events/local-1430917381535_2
new file mode 100644
index 0000000..abb637a
--- /dev/null
+++ b/core/src/test/resources/spark-events/local-1430917381535_2
@@ -0,0 +1,5 @@
+{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380893}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","su
 n.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMac
 hines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Conte
 nts/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/ir
 ashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380893,"User":"irashid","App Attempt ID":"2"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380950}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0f6933d..09075ee 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
+  FileOutputStream, OutputStreamWriter}
 import java.net.URI
 import java.util.concurrent.TimeUnit
+import java.util.zip.{ZipInputStream, ZipOutputStream}
 
 import scala.io.Source
 
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.BeforeAndAfter
@@ -335,6 +339,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     assert(!log2.exists())
   }
 
+  test("Event log copy") {
+    val provider = new FsHistoryProvider(createTestConf())
+    val logs = (1 to 2).map { i =>
+      val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
+      writeFile(log, true, None,
+        SparkListenerApplicationStart(
+          "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
+        SparkListenerApplicationEnd(5001 * i)
+      )
+      log
+    }
+    provider.checkForLogs()
+
+    (1 to 2).foreach { i =>
+      val underlyingStream = new ByteArrayOutputStream()
+      val outputStream = new ZipOutputStream(underlyingStream)
+      provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
+      outputStream.close()
+      val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
+      var totalEntries = 0
+      var entry = inputStream.getNextEntry
+      entry should not be null
+      while (entry != null) {
+        val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
+        val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
+        actual should be (expected)
+        totalEntries += 1
+        entry = inputStream.getNextEntry
+      }
+      totalEntries should be (1)
+      inputStream.close()
+    }
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform checks on the updated
    * app list. Example:

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 14f2d1a..e5b5e1b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -16,10 +16,13 @@
  */
 package org.apache.spark.deploy.history
 
-import java.io.{File, FileInputStream, FileWriter, IOException}
+import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
 import java.net.{HttpURLConnection, URL}
+import java.util.zip.ZipInputStream
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
 import org.mockito.Mockito.when
 import org.scalatest.{BeforeAndAfter, Matchers}
@@ -147,6 +150,70 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     }
   }
 
+  test("download all logs for app with multiple attempts") {
+    doDownloadTest("local-1430917381535", None)
+  }
+
+  test("download one log for app with multiple attempts") {
+    (1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
+  }
+
+  test("download legacy logs - all attempts") {
+    doDownloadTest("local-1426533911241", None, legacy = true)
+  }
+
+  test("download legacy logs - single  attempts") {
+    (1 to 2). foreach {
+      attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true)
+    }
+  }
+
+  // Test that the files are downloaded correctly, and validate them.
+  def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
+
+    val url = attemptId match {
+      case Some(id) =>
+        new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
+      case None =>
+        new URL(s"${generateURL(s"applications/$appId")}/logs")
+    }
+
+    val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url)
+    code should be (HttpServletResponse.SC_OK)
+    inputStream should not be None
+    error should be (None)
+
+    val zipStream = new ZipInputStream(inputStream.get)
+    var entry = zipStream.getNextEntry
+    entry should not be null
+    val totalFiles = {
+      if (legacy) {
+        attemptId.map { x => 3 }.getOrElse(6)
+      } else {
+        attemptId.map { x => 1 }.getOrElse(2)
+      }
+    }
+    var filesCompared = 0
+    while (entry != null) {
+      if (!entry.isDirectory) {
+        val expectedFile = {
+          if (legacy) {
+            val splits = entry.getName.split("/")
+            new File(new File(logDir, splits(0)), splits(1))
+          } else {
+            new File(logDir, entry.getName)
+          }
+        }
+        val expected = Files.toString(expectedFile, Charsets.UTF_8)
+        val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
+        actual should be (expected)
+        filesCompared += 1
+      }
+      entry = zipStream.getNextEntry
+    }
+    filesCompared should be (totalFiles)
+  }
+
   test("response codes on bad paths") {
     val badAppId = getContentAndCode("applications/foobar")
     badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
@@ -202,7 +269,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
   }
 
   def getUrl(path: String): String = {
-    HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path"))
+    HistoryServerSuite.getUrl(generateURL(path))
+  }
+
+  def generateURL(path: String): URL = {
+    new URL(s"http://localhost:$port/api/v1/$path")
   }
 
   def generateExpectation(name: String, path: String): Unit = {
@@ -233,13 +304,18 @@ object HistoryServerSuite {
   }
 
   def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
+    val (code, in, errString) = connectAndGetInputStream(url)
+    val inString = in.map(IOUtils.toString)
+    (code, inString, errString)
+  }
+
+  def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = {
     val connection = url.openConnection().asInstanceOf[HttpURLConnection]
     connection.setRequestMethod("GET")
     connection.connect()
     val code = connection.getResponseCode()
-    val inString = try {
-      val in = Option(connection.getInputStream())
-      in.map(IOUtils.toString)
+    val inStream = try {
+      Option(connection.getInputStream())
     } catch {
       case io: IOException => None
     }
@@ -249,7 +325,7 @@ object HistoryServerSuite {
     } catch {
       case io: IOException => None
     }
-    (code, inString, errString)
+    (code, inStream, errString)
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d2a86eb8/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index e750184..31ecddc 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -228,6 +228,14 @@ for a running application, at `http://localhost:4040/api/v1`.
     <td><code>/applications/[app-id]/storage/rdd/[rdd-id]</code></td>
     <td>Details for the storage status of a given RDD</td>
   </tr>
+  <tr>
+    <td><code>/applications/[app-id]/logs</code></td>
+    <td>Download the event logs for all attempts of the given application as a zip file</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/[attempt-id/logs</code></td>
+    <td>Download the event logs for the specified attempt of the given application as a zip file</td>
+  </tr>
 </table>
 
 When running on Yarn, each application has multiple attempts, so `[app-id]` is actually


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org