You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/22 00:08:35 UTC
git commit: SPARK-1279: Fix improper use of SimpleDateFormat
Repository: spark
Updated Branches:
refs/heads/master 7e17fe69f -> 2c0aa22e2
SPARK-1279: Fix improper use of SimpleDateFormat
`SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date.
This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one.
Author: zsxwing <zs...@gmail.com>
Closes #179 from zsxwing/SPARK-1278 and squashes the following commits:
21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c0aa22e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c0aa22e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c0aa22e
Branch: refs/heads/master
Commit: 2c0aa22e2e26ae35b7d4caa529bc6520e362cc3c
Parents: 7e17fe6
Author: zsxwing <zs...@gmail.com>
Authored: Fri Mar 21 16:07:22 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Mar 21 16:08:18 2014 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/deploy/WebUI.scala | 47 ------------------
.../org/apache/spark/deploy/master/Master.scala | 6 +--
.../spark/deploy/master/ui/IndexPage.scala | 8 ++--
.../org/apache/spark/deploy/worker/Worker.scala | 4 +-
.../org/apache/spark/scheduler/JobLogger.scala | 6 ++-
.../main/scala/org/apache/spark/ui/WebUI.scala | 50 ++++++++++++++++++++
.../apache/spark/ui/jobs/JobProgressUI.scala | 2 -
.../org/apache/spark/ui/jobs/StagePage.scala | 5 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 5 +-
.../org/apache/spark/util/FileLogger.scala | 7 ++-
10 files changed, 72 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
deleted file mode 100644
index ae258b5..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-/**
- * Utilities used throughout the web UI.
- */
-private[spark] object DeployWebUI {
- val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
- def formatDate(date: Date): String = DATE_FORMAT.format(date)
-
- def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
-
- def formatDuration(milliseconds: Long): String = {
- val seconds = milliseconds.toDouble / 1000
- if (seconds < 60) {
- return "%.0f s".format(seconds)
- }
- val minutes = seconds / 60
- if (minutes < 10) {
- return "%.1f min".format(minutes)
- } else if (minutes < 60) {
- return "%.0f min".format(minutes)
- }
- val hours = minutes / 60
- return "%.1f h".format(hours)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9ed49e0..95bd62e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -51,7 +51,7 @@ private[spark] class Master(
val conf = new SparkConf
- val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
+ def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
@@ -682,7 +682,7 @@ private[spark] class Master(
/** Generate a new app ID given a app's submission date */
def newApplicationId(submitDate: Date): String = {
- val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
+ val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
nextAppNumber += 1
appId
}
@@ -706,7 +706,7 @@ private[spark] class Master(
}
def newDriverId(submitDate: Date): String = {
- val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
+ val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
appId
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 7ec71eb..8c1d6c7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -25,10 +25,10 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue
-import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
+import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
@@ -169,10 +169,10 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
- <td>{DeployWebUI.formatDate(app.submitDate)}</td>
+ <td>{WebUI.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
- <td>{DeployWebUI.formatDuration(app.duration)}</td>
+ <td>{WebUI.formatDuration(app.duration)}</td>
</tr>
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 5e0fc31..8a71ddd 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -56,7 +56,7 @@ private[spark] class Worker(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
- val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
+ def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -319,7 +319,7 @@ private[spark] class Worker(
}
def generateWorkerId(): String = {
- "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
+ "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
}
override def postStop() {
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index b3a67d7..5cecf94 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -55,7 +55,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIdToJobId = new HashMap[Int, Int]
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
- private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
createLogDir()
@@ -128,7 +130,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
- writeInfo = DATE_FORMAT.format(date) + ": " + info
+ writeInfo = dateFormat.get.format(date) + ": " + info
}
jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
new file mode 100644
index 0000000..a7b872f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object WebUI {
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
+
+ def formatDate(date: Date): String = dateFormat.get.format(date)
+
+ def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ return "%.1f h".format(hours)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index ee4e9c6..b2c6738 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -17,7 +17,6 @@
package org.apache.spark.ui.jobs
-import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.servlet.ServletContextHandler
@@ -32,7 +31,6 @@ import org.apache.spark.util.Utils
private[ui] class JobProgressUI(parent: SparkUI) {
val appName = parent.appName
val basePath = parent.basePath
- val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val live = parent.live
val sc = parent.sc
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index da7f202..0c55f2e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,14 +23,13 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressUI) {
private val appName = parent.appName
private val basePath = parent.basePath
- private val dateFmt = parent.dateFmt
private lazy val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
@@ -253,7 +252,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
- <td>{dateFmt.format(new Date(info.launchTime))}</td>
+ <td>{WebUI.formatDate(new Date(info.launchTime))}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 68fef52..5bf1c95 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,13 +23,12 @@ import scala.collection.mutable.HashMap
import scala.xml.Node
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
private val basePath = parent.basePath
- private val dateFmt = parent.dateFmt
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
@@ -82,7 +81,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val submissionTime = s.submissionTime match {
- case Some(t) => dateFmt.format(new Date(t))
+ case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
http://git-wip-us.apache.org/repos/asf/spark/blob/2c0aa22e/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index f079620..a0c07e3 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -44,7 +44,10 @@ class FileLogger(
overwrite: Boolean = true)
extends Logging {
- private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
+
private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
private var fileIndex = 0
@@ -111,7 +114,7 @@ class FileLogger(
def log(msg: String, withTime: Boolean = false) {
val writeInfo = if (!withTime) msg else {
val date = new Date(System.currentTimeMillis())
- DATE_FORMAT.format(date) + ": " + msg
+ dateFormat.get.format(date) + ": " + msg
}
writer.foreach(_.print(writeInfo))
}