You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by zj...@apache.org on 2014/05/02 05:31:48 UTC

git commit: SAMZA-218: Show container start-time and up-time on YARN AM web

Repository: incubator-samza
Updated Branches:
  refs/heads/master 354bcdb77 -> 300ad6a8c


SAMZA-218: Show container start-time and up-time on YARN AM web


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/300ad6a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/300ad6a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/300ad6a8

Branch: refs/heads/master
Commit: 300ad6a8cfdd22f5a75260afd3a0e6fa87797b61
Parents: 354bcdb
Author: Zhijie Shen <zs...@hortonworks.com>
Authored: Thu May 1 20:29:06 2014 -0700
Committer: Zhijie Shen <zs...@hortonworks.com>
Committed: Thu May 1 20:29:06 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 gradle/dependency-versions.gradle               |  1 +
 .../resources/scalate/WEB-INF/views/index.scaml |  8 +++-
 .../samza/job/yarn/SamzaAppMasterState.scala    |  4 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  4 +-
 .../apache/samza/job/yarn/YarnContainer.scala   | 47 ++++++++++++++++++++
 .../webapp/ApplicationMasterRestServlet.scala   |  8 ++--
 7 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 055ef9b..72928b1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -146,6 +146,7 @@ project(":samza-yarn_$scalaVersion") {
       exclude module: 'scala-compiler'
       exclude module: 'slf4j-api'
     }
+    compile "joda-time:joda-time:$jodaTimeVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 4338b23..819a578 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -1,4 +1,5 @@
 ext {
+  jodaTimeVersion = "2.2"
   joptSimpleVersion = "3.2"
   jacksonVersion = "1.8.5"
   junitVersion = "4.8.1"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 6530bad..d17b9c4 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -63,9 +63,13 @@
               Task group #{taskId.toString}
               %ul
                 %li
-                  %a(target="_blank" href="http://#{container.getNodeHttpAddress}/node/containerlogs/#{container.getId.toString}/#{username}")= container.getId.toString
+                  %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
                 %li
-                  %a(target="_blank" href="http://#{container.getNodeHttpAddress}")= container.getNodeHttpAddress
+                  %a(target="_blank" href="http://#{container.nodeHttpAddress}")= container.nodeHttpAddress
+                %li
+                  Start time: #{container.startTimeStr()}
+                %li
+                  Up time: #{container.upTimeStr()}
     %tr
       %td Completed
       %td= state.completedTasks.toString

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index fa1642b..01a2683 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -18,8 +18,6 @@
  */
 
 package org.apache.samza.job.yarn
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.api.records.Container
 import org.apache.samza.config.Config
 import grizzled.slf4j.Logging
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
@@ -41,7 +39,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod
   var taskCount = 0
   var unclaimedTasks = Set[Int]()
   var finishedTasks = Set[Int]()
-  var runningTasks = Map[Int, Container]()
+  var runningTasks = Map[Int, YarnContainer]()
   var taskPartitions = Map[Int, Set[Partition]]()
   var status = FinalApplicationStatus.UNDEFINED
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 58b2d30..eb1ff54 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -127,7 +127,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
           "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
 
         state.neededContainers -= 1
-        state.runningTasks += taskId -> container
+        state.runningTasks += taskId -> new YarnContainer(container)
         state.unclaimedTasks -= taskId
         state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet
 
@@ -146,7 +146,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
 
   override def onContainerCompleted(containerStatus: ContainerStatus) {
     val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId)
-    val taskId = state.runningTasks.filter { case (_, container) => container.getId().equals(containerStatus.getContainerId()) }.keys.headOption
+    val taskId = state.runningTasks.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption
 
     taskId match {
       case Some(taskId) => {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala
new file mode 100644
index 0000000..7ab866f
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.hadoop.yarn.api.records.Container
+import org.joda.time.Period
+import org.joda.time.format.{ DateTimeFormatter, ISODateTimeFormat, ISOPeriodFormat, PeriodFormatter }
+
+object YarnContainerUtils {
+  val dateFormater = ISODateTimeFormat.dateTime
+  val periodFormater = ISOPeriodFormat.standard
+}
+
+/**
+ * YARN container information plus start time and up time
+ */
+class YarnContainer(container: Container) {
+  val id = container.getId()
+  val nodeId = container.getNodeId();
+  val nodeHttpAddress = container.getNodeHttpAddress();
+  val resource = container.getResource();
+  val priority = container.getPriority();
+  val containerToken = container.getContainerToken();
+  val startTime = System.currentTimeMillis()
+  def startTimeStr(dtFormatter: Option[DateTimeFormatter] = None) =
+    dtFormatter.getOrElse(YarnContainerUtils.dateFormater).print(startTime)
+  def upTime = System.currentTimeMillis()
+  def upTimeStr(periodFormatter: Option[PeriodFormatter] = None) =
+    periodFormatter.getOrElse(YarnContainerUtils.periodFormater).print(new Period(startTime, upTime))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/300ad6a8/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 8fce8a7..17a96f0 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -78,12 +78,14 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r
     val containers = new HashMap[String, HashMap[String, Object]]
 
     state.runningTasks.values.foreach(c => {
-      val containerIdStr = c.getId.toString
+      val containerIdStr = c.id.toString
       val containerMap = new HashMap[String, Object]
-      val taskId = state.runningTasks.filter { case (_, container) => container.getId.toString.equals(containerIdStr) }.keys.head
+      val taskId = state.runningTasks.filter { case (_, container) => container.id.toString.equals(containerIdStr) }.keys.head
       var partitions = new java.util.ArrayList(state.taskPartitions.get(taskId).get)
 
-      containerMap.put("yarn-address", c.getNodeHttpAddress)
+      containerMap.put("yarn-address", c.nodeHttpAddress)
+      containerMap.put("start-time", c.startTime.toString)
+      containerMap.put("up-time", c.upTime.toString)
       containerMap.put("partitions", partitions)
       containerMap.put("task-id", taskId.toString)
       containers.put(containerIdStr, containerMap)