You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/10/20 23:24:40 UTC
incubator-gearpump git commit: [GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H…
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 73de3ce22 -> 8064313af
[GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H…
…adoop/Yarn set-up
Author: Timea Magyar <Ti...@etas.com>
Closes #231 from titikakatoo/yarn_spnego_authentication.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/8064313a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/8064313a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/8064313a
Branch: refs/heads/master
Commit: 8064313afeee1d966ef033a637cfd58d1cca6617
Parents: 73de3ce
Author: Timea Magyar <Ti...@etas.com>
Authored: Sat Oct 21 07:24:13 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Oct 21 07:24:23 2017 +0800
----------------------------------------------------------------------
.../yarn/appmaster/YarnAppMaster.scala | 23 +-------
.../yarn/client/AppMasterResolver.scala | 62 ++++++++++++++------
2 files changed, 46 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8064313a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index 1907a95..53e93f9 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -18,14 +18,10 @@
package org.apache.gearpump.experiments.yarn.appmaster
-import java.io.IOException
import java.util.concurrent.TimeUnit
-
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
@@ -35,7 +31,6 @@ import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util._
import org.slf4j.Logger
-
import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -364,22 +359,8 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
case class WorkerInfo(id: ContainerId, nodeId: NodeId)
def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
- val client = new HttpClient()
- val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
- val get = new GetMethod(appMasterPath)
- var status = client.executeMethod(get)
-
- if (status != 200) {
- // Sleeps a little bit, and try again
- Thread.sleep(3000)
- status = client.executeMethod(get)
- }
+ import org.apache.gearpump.experiments.yarn.client.AppMasterResolver
- if (status == 200) {
- AkkaHelper.actorFor(system, get.getResponseBodyAsString)
- } else {
- throw new IOException("Fail to resolve AppMaster address, please make sure " +
- s"${report.getTrackingURL} is accessible...")
- }
+ AppMasterResolver.resolveAppMasterAddress(report, system)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8064313a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
index 90653e1..c05b4e2 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -19,14 +19,15 @@
package org.apache.gearpump.experiments.yarn.client
import java.io.IOException
-
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
import akka.actor.{ActorRef, ActorSystem}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.commons.io.IOUtils
+import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, ApplicationReport}
import org.apache.gearpump.experiments.yarn.glue.YarnClient
import org.apache.gearpump.util.{AkkaHelper, LogUtil}
-
+import org.apache.hadoop.hdfs.web.URLConnectionFactory
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import scala.util.Try
/**
@@ -43,19 +44,8 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
private def connect(appId: ApplicationId): ActorRef = {
val report = yarnClient.getApplicationReport(appId)
- val client = new HttpClient()
- val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
- LOG.info(s"appMasterPath=$appMasterPath")
- val get = new GetMethod(appMasterPath)
- val status = client.executeMethod(get)
- if (status == 200) {
- val response = get.getResponseBodyAsString
- LOG.info("Successfully resolved AppMaster address: " + response)
- AkkaHelper.actorFor(system, response)
- } else {
- throw new IOException("Fail to resolve AppMaster address, please make sure " +
- s"${report.getTrackingURL} is accessible...")
- }
+
+ AppMasterResolver.resolveAppMasterAddress(report, system)
}
private def retry(fun: => ActorRef, times: Int): ActorRef = {
@@ -75,3 +65,39 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
result
}
}
+
+object AppMasterResolver {
+ val LOG = LogUtil.getLogger(getClass)
+
+ def resolveAppMasterAddress(report: ApplicationReport, system: ActorSystem): ActorRef = {
+ val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
+ LOG.info(s"appMasterPath=$appMasterPath")
+
+ val connectionFactory: URLConnectionFactory = URLConnectionFactory
+ .newDefaultURLConnectionFactory(new YarnConfiguration())
+ val url: URL = new URL(appMasterPath)
+ val connection: HttpURLConnection = connectionFactory.openConnection(url)
+ .asInstanceOf[HttpURLConnection]
+ connection.setInstanceFollowRedirects(true)
+
+ try {
+ connection.connect()
+ } catch {
+ case e: IOException =>
+ LOG.error(s"Failed to connect to AppMaster" + e.getMessage)
+ }
+
+ val status = connection.getResponseCode
+ if (status == 200) {
+ val stream: java.io.InputStream = connection.getInputStream
+ val response = IOUtils.toString(stream, StandardCharsets.UTF_8)
+ LOG.info("Successfully resolved AppMaster address: " + response)
+ connection.disconnect()
+ AkkaHelper.actorFor(system, response)
+ } else {
+ connection.disconnect()
+ throw new IOException("Fail to resolve AppMaster address, please make sure " +
+ s"${report.getTrackingURL} is accessible...")
+ }
+ }
+}