You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by zj...@apache.org on 2014/05/02 05:31:48 UTC
git commit: SAMZA-218: Show container start-time and up-time on YARN
AM web
Repository: incubator-samza
Updated Branches:
refs/heads/master 354bcdb77 -> 300ad6a8c
SAMZA-218: Show container start-time and up-time on YARN AM web
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/300ad6a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/300ad6a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/300ad6a8
Branch: refs/heads/master
Commit: 300ad6a8cfdd22f5a75260afd3a0e6fa87797b61
Parents: 354bcdb
Author: Zhijie Shen <zs...@hortonworks.com>
Authored: Thu May 1 20:29:06 2014 -0700
Committer: Zhijie Shen <zs...@hortonworks.com>
Committed: Thu May 1 20:29:06 2014 -0700
----------------------------------------------------------------------
build.gradle | 1 +
gradle/dependency-versions.gradle | 1 +
.../resources/scalate/WEB-INF/views/index.scaml | 8 +++-
.../samza/job/yarn/SamzaAppMasterState.scala | 4 +-
.../job/yarn/SamzaAppMasterTaskManager.scala | 4 +-
.../apache/samza/job/yarn/YarnContainer.scala | 47 ++++++++++++++++++++
.../webapp/ApplicationMasterRestServlet.scala | 8 ++--
7 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 055ef9b..72928b1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -146,6 +146,7 @@ project(":samza-yarn_$scalaVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
}
+ compile "joda-time:joda-time:$jodaTimeVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 4338b23..819a578 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -1,4 +1,5 @@
ext {
+ jodaTimeVersion = "2.2"
joptSimpleVersion = "3.2"
jacksonVersion = "1.8.5"
junitVersion = "4.8.1"
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 6530bad..d17b9c4 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -63,9 +63,13 @@
Task group #{taskId.toString}
%ul
%li
- %a(target="_blank" href="http://#{container.getNodeHttpAddress}/node/containerlogs/#{container.getId.toString}/#{username}")= container.getId.toString
+ %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
%li
- %a(target="_blank" href="http://#{container.getNodeHttpAddress}")= container.getNodeHttpAddress
+ %a(target="_blank" href="http://#{container.nodeHttpAddress}")= container.nodeHttpAddress
+ %li
+ Start time: #{container.startTimeStr()}
+ %li
+ Up time: #{container.upTimeStr()}
%tr
%td Completed
%td= state.completedTasks.toString
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index fa1642b..01a2683 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -18,8 +18,6 @@
*/
package org.apache.samza.job.yarn
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.api.records.Container
import org.apache.samza.config.Config
import grizzled.slf4j.Logging
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
@@ -41,7 +39,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod
var taskCount = 0
var unclaimedTasks = Set[Int]()
var finishedTasks = Set[Int]()
- var runningTasks = Map[Int, Container]()
+ var runningTasks = Map[Int, YarnContainer]()
var taskPartitions = Map[Int, Set[Partition]]()
var status = FinalApplicationStatus.UNDEFINED
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 58b2d30..eb1ff54 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -127,7 +127,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
state.neededContainers -= 1
- state.runningTasks += taskId -> container
+ state.runningTasks += taskId -> new YarnContainer(container)
state.unclaimedTasks -= taskId
state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet
@@ -146,7 +146,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
override def onContainerCompleted(containerStatus: ContainerStatus) {
val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId)
- val taskId = state.runningTasks.filter { case (_, container) => container.getId().equals(containerStatus.getContainerId()) }.keys.headOption
+ val taskId = state.runningTasks.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption
taskId match {
case Some(taskId) => {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala
new file mode 100644
index 0000000..7ab866f
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.hadoop.yarn.api.records.Container
+import org.joda.time.Period
+import org.joda.time.format.{ DateTimeFormatter, ISODateTimeFormat, ISOPeriodFormat, PeriodFormatter }
+
+object YarnContainerUtils {
+ val dateFormater = ISODateTimeFormat.dateTime
+ val periodFormater = ISOPeriodFormat.standard
+}
+
+/**
+ * YARN container information plus start time and up time
+ */
+class YarnContainer(container: Container) {
+ val id = container.getId()
+ val nodeId = container.getNodeId();
+ val nodeHttpAddress = container.getNodeHttpAddress();
+ val resource = container.getResource();
+ val priority = container.getPriority();
+ val containerToken = container.getContainerToken();
+ val startTime = System.currentTimeMillis()
+ def startTimeStr(dtFormatter: Option[DateTimeFormatter] = None) =
+ dtFormatter.getOrElse(YarnContainerUtils.dateFormater).print(startTime)
+ def upTime = System.currentTimeMillis()
+ def upTimeStr(periodFormatter: Option[PeriodFormatter] = None) =
+ periodFormatter.getOrElse(YarnContainerUtils.periodFormater).print(new Period(startTime, upTime))
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 8fce8a7..17a96f0 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -78,12 +78,14 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r
val containers = new HashMap[String, HashMap[String, Object]]
state.runningTasks.values.foreach(c => {
- val containerIdStr = c.getId.toString
+ val containerIdStr = c.id.toString
val containerMap = new HashMap[String, Object]
- val taskId = state.runningTasks.filter { case (_, container) => container.getId.toString.equals(containerIdStr) }.keys.head
+ val taskId = state.runningTasks.filter { case (_, container) => container.id.toString.equals(containerIdStr) }.keys.head
var partitions = new java.util.ArrayList(state.taskPartitions.get(taskId).get)
- containerMap.put("yarn-address", c.getNodeHttpAddress)
+ containerMap.put("yarn-address", c.nodeHttpAddress)
+ containerMap.put("start-time", c.startTime.toString)
+ containerMap.put("up-time", c.upTime.toString)
containerMap.put("partitions", partitions)
containerMap.put("task-id", taskId.toString)
containers.put(containerIdStr, containerMap)