You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/22 00:39:53 UTC

git commit: SPARK-1284: Fix improper use of SimpleDateFormat

Repository: spark
Updated Branches:
  refs/heads/branch-0.9 d68549e8f -> 8856076b5


SPARK-1284: Fix improper use of SimpleDateFormat

`SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date.

This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one.

Author: zsxwing <zs...@gmail.com>

Closes #179 from zsxwing/SPARK-1278 and squashes the following commits:

21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat

Conflicts:

	core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
	core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
	core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
	core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
	core/src/main/scala/org/apache/spark/util/FileLogger.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8856076b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8856076b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8856076b

Branch: refs/heads/branch-0.9
Commit: 8856076b5870df64f21d400c67b17bda5a336627
Parents: d68549e
Author: zsxwing <zs...@gmail.com>
Authored: Fri Mar 21 16:07:22 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Mar 21 16:39:23 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/WebUI.scala   | 47 ------------------
 .../org/apache/spark/deploy/master/Master.scala |  6 +--
 .../spark/deploy/master/ui/IndexPage.scala      |  8 ++--
 .../org/apache/spark/deploy/worker/Worker.scala |  4 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |  5 +-
 .../main/scala/org/apache/spark/ui/WebUI.scala  | 50 ++++++++++++++++++++
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  1 -
 .../apache/spark/ui/jobs/JobProgressUI.scala    |  1 -
 .../org/apache/spark/ui/jobs/StagePage.scala    |  4 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |  5 +-
 10 files changed, 67 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
deleted file mode 100644
index ae258b5..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-/**
- * Utilities used throughout the web UI.
- */
-private[spark] object DeployWebUI {
-  val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
-  def formatDate(date: Date): String = DATE_FORMAT.format(date)
-
-  def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
-
-  def formatDuration(milliseconds: Long): String = {
-    val seconds = milliseconds.toDouble / 1000
-    if (seconds < 60) {
-      return "%.0f s".format(seconds)
-    }
-    val minutes = seconds / 60
-    if (minutes < 10) {
-      return "%.1f min".format(minutes)
-    } else if (minutes < 60) {
-      return "%.0f min".format(minutes)
-    }
-    val hours = minutes / 60
-    return "%.1f h".format(hours)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 3897156..d72bb7a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -45,7 +45,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   val conf = new SparkConf
 
-  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
+  def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
   val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
   val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
   val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
@@ -621,7 +621,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   /** Generate a new app ID given a app's submission date */
   def newApplicationId(submitDate: Date): String = {
-    val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
+    val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
     nextAppNumber += 1
     appId
   }
@@ -644,7 +644,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   }
 
   def newDriverId(submitDate: Date): String = {
-    val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
+    val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
     nextDriverNumber += 1
     appId
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index a9af8df..b549825 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -25,10 +25,10 @@ import akka.pattern.ask
 import javax.servlet.http.HttpServletRequest
 import net.liftweb.json.JsonAST.JValue
 
-import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
+import org.apache.spark.deploy.JsonProtocol
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
 import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUI, UIUtils}
 import org.apache.spark.util.Utils
 
 private[spark] class IndexPage(parent: MasterWebUI) {
@@ -164,10 +164,10 @@ private[spark] class IndexPage(parent: MasterWebUI) {
       <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
         {Utils.megabytesToString(app.desc.memoryPerSlave)}
       </td>
-      <td>{DeployWebUI.formatDate(app.submitDate)}</td>
+      <td>{WebUI.formatDate(app.submitDate)}</td>
       <td>{app.desc.user}</td>
       <td>{app.state.toString}</td>
-      <td>{DeployWebUI.formatDuration(app.duration)}</td>
+      <td>{WebUI.formatDuration(app.duration)}</td>
     </tr>
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 36bb289..cecb2c8 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -56,7 +56,7 @@ private[spark] class Worker(
   Utils.checkHost(host, "Expected hostname")
   assert (port > 0)
 
-  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
+  def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
   val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -309,7 +309,7 @@ private[spark] class Worker(
   }
 
   def generateWorkerId(): String = {
-    "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
+    "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
   }
 
   override def postStop() {

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f8fa5a9..835e181 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -53,6 +53,9 @@ class JobLogger(val user: String, val logDirName: String)
   private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
   private val stageIDToJobID = new HashMap[Int, Int]
   private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  }
   private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
   private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
 
@@ -116,7 +119,7 @@ class JobLogger(val user: String, val logDirName: String)
     var writeInfo = info
     if (withTime) {
       val date = new Date(System.currentTimeMillis())
-      writeInfo = DATE_FORMAT.format(date) + ": " +info
+      writeInfo = dateFormat.get.format(date) + ": " + info
     }
     jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
new file mode 100644
index 0000000..459d298
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object WebUI {
+  // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+    override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  }
+
+  def formatDate(date: Date): String = dateFormat.get.format(date)
+
+  def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+  def formatDuration(milliseconds: Long): String = {
+    val seconds = milliseconds.toDouble / 1000
+    if (seconds < 60) {
+      return "%.0f s".format(seconds)
+    }
+    val minutes = seconds / 60
+    if (minutes < 10) {
+      return "%.1f min".format(minutes)
+    } else if (minutes < 60) {
+      return "%.0f min".format(minutes)
+    }
+    val hours = minutes / 60
+    return "%.1f h".format(hours)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index ab03eb5..4d30760 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -27,7 +27,6 @@ import scala.collection.mutable
 private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
 
   val listener = parent.listener
-  val dateFmt = parent.dateFmt
   val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
 
   def toNodeSeq(): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index c1ee2f3..1a2eb2a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -40,7 +40,6 @@ import org.apache.spark.util.Utils
 private[spark] class JobProgressUI(val sc: SparkContext) {
   private var _listener: Option[JobProgressListener] = None
   def listener = _listener.get
-  val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
 
   private val indexPage = new IndexPage(this)
   private val stagePage = new StagePage(this)

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index cfaf121..6207ef1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -26,6 +26,7 @@ import scala.xml.Node
 import org.apache.spark.{ExceptionFailure}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.WebUI
 import org.apache.spark.ui.Page._
 import org.apache.spark.util.{Utils, Distribution}
 import org.apache.spark.scheduler.TaskInfo
@@ -33,7 +34,6 @@ import org.apache.spark.scheduler.TaskInfo
 /** Page showing statistics and task list for a given stage */
 private[spark] class StagePage(parent: JobProgressUI) {
   def listener = parent.listener
-  val dateFmt = parent.dateFmt
 
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
@@ -248,7 +248,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
       <td>{info.status}</td>
       <td>{info.taskLocality}</td>
       <td>{info.host}</td>
-      <td>{dateFmt.format(new Date(info.launchTime))}</td>
+      <td>{WebUI.formatDate(new Date(info.launchTime))}</td>
       <td sorttable_customkey={duration.toString}>
         {formatDuration}
       </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8856076b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 8ea32db..45a0783 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,7 +23,7 @@ import scala.xml.Node
 import scala.collection.mutable.HashSet
 
 import org.apache.spark.scheduler.{SchedulingMode, StageInfo, TaskInfo}
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUI, UIUtils}
 import org.apache.spark.util.Utils
 
 
@@ -31,7 +31,6 @@ import org.apache.spark.util.Utils
 private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgressUI) {
 
   val listener = parent.listener
-  val dateFmt = parent.dateFmt
   val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
 
   def toNodeSeq(): Seq[Node] = {
@@ -75,7 +74,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
 
   private def stageRow(s: StageInfo): Seq[Node] = {
     val submissionTime = s.submissionTime match {
-      case Some(t) => dateFmt.format(new Date(t))
+      case Some(t) => WebUI.formatDate(new Date(t))
       case None => "Unknown"
     }