You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/03/19 07:29:35 UTC

git commit: SAMZA-175: Bind AM Jetty server to 0

Repository: incubator-samza
Updated Branches:
  refs/heads/master 891a6f8ef -> bfd97fce0


SAMZA-175: Bind AM Jetty server to 0


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bfd97fce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bfd97fce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bfd97fce

Branch: refs/heads/master
Commit: bfd97fce0719fc662cd456c90c6c01f1a0017c15
Parents: 891a6f8
Author: Zhijie Shen <zshen at hortonworks dot com>
Authored: Tue Mar 18 23:29:09 2014 -0700
Committer: Jakob Homan <jg...@gmail.com>
Committed: Tue Mar 18 23:29:09 2014 -0700

----------------------------------------------------------------------
 .../samza/job/yarn/SamzaAppMasterService.scala  | 40 +++++++-------------
 .../org/apache/samza/webapp/WebAppServer.scala  | 19 ++++++++--
 .../job/yarn/TestSamzaAppMasterService.scala    |  2 +
 3 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
index 82d90d4..ab13d43 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.job.yarn
 
-import org.apache.samza.util.Util
 import grizzled.slf4j.Logging
 import org.apache.samza.webapp._
 import org.apache.samza.config.Config
@@ -36,34 +35,23 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry
   var webApp: WebAppServer = null
 
   override def onInit() {
-    // try starting the samza AM dashboard. try ten times, just in case we 
-    // pick a port that's already in use.
-    for (i <- 0 until 10) {
-      val rpcPort = Util.randomBetween(10000, 50000)
-      val trackingPort = Util.randomBetween(10000, 50000)
-      info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort))
+    // try starting the samza AM dashboard at a random rpc and tracking port
+    info("Starting webapp at a random rpc and tracking port")
 
-      try {
-        rpcApp = new WebAppServer("/", rpcPort)
-        rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
-        rpcApp.start
+    rpcApp = new WebAppServer("/")
+    rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
+    rpcApp.start
 
-        webApp = new WebAppServer("/", trackingPort)
-        webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
-        webApp.start
+    webApp = new WebAppServer("/")
+    webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
+    webApp.start
 
-        state.rpcPort = rpcPort
-        state.trackingPort = trackingPort
-        return
-      } catch {
-        case e: Exception => {
-          warn("Unable to start webapp on rpc port %d, tracking port %d .. retrying" format (rpcPort, trackingPort))
-        }
-      }
-    }
-
-    if (state.rpcPort == 0 || state.trackingPort == 0) {
-      throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use")
+    state.rpcPort = rpcApp.port
+    state.trackingPort = webApp.port
+    if (state.rpcPort > 0 && state.trackingPort > 0) {
+      info("Webapp is started at rpc %d, tracking port %d" format (state.rpcPort, state.trackingPort))
+    } else {
+      throw new SamzaException("Unable to start webapp, since the host is out of ports")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
index bb5c297..d524996 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
@@ -20,13 +20,15 @@
 package org.apache.samza.webapp
 
 import javax.servlet.Servlet
-import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.{ Connector, Server }
 import org.eclipse.jetty.servlet.{ DefaultServlet, ServletHolder }
 import org.eclipse.jetty.webapp.WebAppContext
+import org.apache.samza.SamzaException
 
-class WebAppServer(rootPath: String, port: Int) {
-  val server = new Server(port)
+class WebAppServer(rootPath: String) {
+  val server = new Server(0)
   val context = new WebAppContext
+  var port: Int = 0
 
   // add a default holder to deal with static files
   val defaultHolder = new ServletHolder(classOf[DefaultServlet])
@@ -43,9 +45,18 @@ class WebAppServer(rootPath: String, port: Int) {
   }
 
   def start {
-    context.setContextPath("/");
+    context.setContextPath("/")
     context.setResourceBase(getClass.getClassLoader.getResource("scalate").toExternalForm)
     server.setHandler(context)
     server.start
+    // retrieve the real port
+    try {
+      val connector : Connector = server.getConnectors()(0).asInstanceOf[Connector]
+      port = connector.getLocalPort
+    } catch {
+      case e: Throwable => {
+        throw new SamzaException("Error when getting the port", e)
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index 1099ca3..811c996 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -33,6 +33,8 @@ class TestSamzaAppMasterService {
 
     // start the dashboard
     service.onInit
+    assert(state.rpcPort > 0)
+    assert(state.trackingPort > 0)
 
     // check to see if it's running
     val url = new URL("http://127.0.0.1:%d/am" format state.rpcPort)