You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/03/19 07:29:35 UTC
git commit: SAMZA-175: Bind AM Jetty server to 0
Repository: incubator-samza
Updated Branches:
refs/heads/master 891a6f8ef -> bfd97fce0
SAMZA-175: Bind AM Jetty server to 0
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bfd97fce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bfd97fce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bfd97fce
Branch: refs/heads/master
Commit: bfd97fce0719fc662cd456c90c6c01f1a0017c15
Parents: 891a6f8
Author: Zhijie Shen <zshen at hortonworks dot com>
Authored: Tue Mar 18 23:29:09 2014 -0700
Committer: Jakob Homan <jg...@gmail.com>
Committed: Tue Mar 18 23:29:09 2014 -0700
----------------------------------------------------------------------
.../samza/job/yarn/SamzaAppMasterService.scala | 40 +++++++-------------
.../org/apache/samza/webapp/WebAppServer.scala | 19 ++++++++--
.../job/yarn/TestSamzaAppMasterService.scala | 2 +
3 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
index 82d90d4..ab13d43 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -19,7 +19,6 @@
package org.apache.samza.job.yarn
-import org.apache.samza.util.Util
import grizzled.slf4j.Logging
import org.apache.samza.webapp._
import org.apache.samza.config.Config
@@ -36,34 +35,23 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry
var webApp: WebAppServer = null
override def onInit() {
- // try starting the samza AM dashboard. try ten times, just in case we
- // pick a port that's already in use.
- for (i <- 0 until 10) {
- val rpcPort = Util.randomBetween(10000, 50000)
- val trackingPort = Util.randomBetween(10000, 50000)
- info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort))
+ // try starting the samza AM dashboard at a random rpc and tracking port
+ info("Starting webapp at a random rpc and tracking port")
- try {
- rpcApp = new WebAppServer("/", rpcPort)
- rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
- rpcApp.start
+ rpcApp = new WebAppServer("/")
+ rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
+ rpcApp.start
- webApp = new WebAppServer("/", trackingPort)
- webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
- webApp.start
+ webApp = new WebAppServer("/")
+ webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
+ webApp.start
- state.rpcPort = rpcPort
- state.trackingPort = trackingPort
- return
- } catch {
- case e: Exception => {
- warn("Unable to start webapp on rpc port %d, tracking port %d .. retrying" format (rpcPort, trackingPort))
- }
- }
- }
-
- if (state.rpcPort == 0 || state.trackingPort == 0) {
- throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use")
+ state.rpcPort = rpcApp.port
+ state.trackingPort = webApp.port
+ if (state.rpcPort > 0 && state.trackingPort > 0) {
+ info("Webapp is started at rpc %d, tracking port %d" format (state.rpcPort, state.trackingPort))
+ } else {
+ throw new SamzaException("Unable to start webapp, since the host is out of ports")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
index bb5c297..d524996 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
@@ -20,13 +20,15 @@
package org.apache.samza.webapp
import javax.servlet.Servlet
-import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.{ Connector, Server }
import org.eclipse.jetty.servlet.{ DefaultServlet, ServletHolder }
import org.eclipse.jetty.webapp.WebAppContext
+import org.apache.samza.SamzaException
-class WebAppServer(rootPath: String, port: Int) {
- val server = new Server(port)
+class WebAppServer(rootPath: String) {
+ val server = new Server(0)
val context = new WebAppContext
+ var port: Int = 0
// add a default holder to deal with static files
val defaultHolder = new ServletHolder(classOf[DefaultServlet])
@@ -43,9 +45,18 @@ class WebAppServer(rootPath: String, port: Int) {
}
def start {
- context.setContextPath("/");
+ context.setContextPath("/")
context.setResourceBase(getClass.getClassLoader.getResource("scalate").toExternalForm)
server.setHandler(context)
server.start
+ // retrieve the real port
+ try {
+ val connector : Connector = server.getConnectors()(0).asInstanceOf[Connector]
+ port = connector.getLocalPort
+ } catch {
+ case e: Throwable => {
+ throw new SamzaException("Error when getting the port", e)
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index 1099ca3..811c996 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -33,6 +33,8 @@ class TestSamzaAppMasterService {
// start the dashboard
service.onInit
+ assert(state.rpcPort > 0)
+ assert(state.trackingPort > 0)
// check to see if it's running
val url = new URL("http://127.0.0.1:%d/am" format state.rpcPort)