You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/10/20 23:24:40 UTC

incubator-gearpump git commit: [GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H…

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 73de3ce22 -> 8064313af


[GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H…

…adoop/Yarn set-up

Author: Timea Magyar <Ti...@etas.com>

Closes #231 from titikakatoo/yarn_spnego_authentication.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/8064313a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/8064313a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/8064313a

Branch: refs/heads/master
Commit: 8064313afeee1d966ef033a637cfd58d1cca6617
Parents: 73de3ce
Author: Timea Magyar <Ti...@etas.com>
Authored: Sat Oct 21 07:24:13 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Oct 21 07:24:23 2017 +0800

----------------------------------------------------------------------
 .../yarn/appmaster/YarnAppMaster.scala          | 23 +-------
 .../yarn/client/AppMasterResolver.scala         | 62 ++++++++++++++------
 2 files changed, 46 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8064313a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index 1907a95..53e93f9 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -18,14 +18,10 @@
 
 package org.apache.gearpump.experiments.yarn.appmaster
 
-import java.io.IOException
 import java.util.concurrent.TimeUnit
-
 import akka.actor._
 import akka.util.Timeout
 import com.typesafe.config.ConfigValueFactory
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
 import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.ClusterConfig
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
@@ -35,7 +31,6 @@ import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util._
 import org.slf4j.Logger
-
 import scala.collection.JavaConverters._
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
@@ -364,22 +359,8 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
   case class WorkerInfo(id: ContainerId, nodeId: NodeId)
 
   def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
-    val client = new HttpClient()
-    val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
-    val get = new GetMethod(appMasterPath)
-    var status = client.executeMethod(get)
-
-    if (status != 200) {
-      // Sleeps a little bit, and try again
-      Thread.sleep(3000)
-      status = client.executeMethod(get)
-    }
+    import org.apache.gearpump.experiments.yarn.client.AppMasterResolver
 
-    if (status == 200) {
-      AkkaHelper.actorFor(system, get.getResponseBodyAsString)
-    } else {
-      throw new IOException("Fail to resolve AppMaster address, please make sure " +
-        s"${report.getTrackingURL} is accessible...")
-    }
+    AppMasterResolver.resolveAppMasterAddress(report, system)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8064313a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
index 90653e1..c05b4e2 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -19,14 +19,15 @@
 package org.apache.gearpump.experiments.yarn.client
 
 import java.io.IOException
-
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
 import akka.actor.{ActorRef, ActorSystem}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.commons.io.IOUtils
+import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, ApplicationReport}
 import org.apache.gearpump.experiments.yarn.glue.YarnClient
 import org.apache.gearpump.util.{AkkaHelper, LogUtil}
-
+import org.apache.hadoop.hdfs.web.URLConnectionFactory
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import scala.util.Try
 
 /**
@@ -43,19 +44,8 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
 
   private def connect(appId: ApplicationId): ActorRef = {
     val report = yarnClient.getApplicationReport(appId)
-    val client = new HttpClient()
-    val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
-    LOG.info(s"appMasterPath=$appMasterPath")
-    val get = new GetMethod(appMasterPath)
-    val status = client.executeMethod(get)
-    if (status == 200) {
-      val response = get.getResponseBodyAsString
-      LOG.info("Successfully resolved AppMaster address: " + response)
-      AkkaHelper.actorFor(system, response)
-    } else {
-      throw new IOException("Fail to resolve AppMaster address, please make sure " +
-        s"${report.getTrackingURL} is accessible...")
-    }
+
+    AppMasterResolver.resolveAppMasterAddress(report, system)
   }
 
   private def retry(fun: => ActorRef, times: Int): ActorRef = {
@@ -75,3 +65,39 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
     result
   }
 }
+
+object AppMasterResolver {
+  val LOG = LogUtil.getLogger(getClass)
+
+  def resolveAppMasterAddress(report: ApplicationReport, system: ActorSystem): ActorRef = {
+    val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
+    LOG.info(s"appMasterPath=$appMasterPath")
+
+    val connectionFactory: URLConnectionFactory = URLConnectionFactory
+      .newDefaultURLConnectionFactory(new YarnConfiguration())
+    val url: URL = new URL(appMasterPath)
+    val connection: HttpURLConnection = connectionFactory.openConnection(url)
+      .asInstanceOf[HttpURLConnection]
+    connection.setInstanceFollowRedirects(true)
+
+    try {
+      connection.connect()
+    } catch {
+      case e: IOException =>
+        LOG.error(s"Failed to connect to AppMaster" + e.getMessage)
+    }
+
+    val status = connection.getResponseCode
+    if (status == 200) {
+      val stream: java.io.InputStream = connection.getInputStream
+      val response = IOUtils.toString(stream, StandardCharsets.UTF_8)
+      LOG.info("Successfully resolved AppMaster address: " + response)
+      connection.disconnect()
+      AkkaHelper.actorFor(system, response)
+    } else {
+      connection.disconnect()
+      throw new IOException("Fail to resolve AppMaster address, please make sure " +
+        s"${report.getTrackingURL} is accessible...")
+    }
+  }
+}