You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/13 13:55:31 UTC

spark git commit: [SPARK-6440][CORE]Handle IPv6 addresses properly when constructing URI

Repository: spark
Updated Branches:
  refs/heads/master 14ce3ea2c -> 9d117cee0


[SPARK-6440][CORE]Handle IPv6 addresses properly when constructing URI

Author: nyaapa <ny...@gmail.com>

Closes #5424 from nyaapa/master and squashes the following commits:

6b717aa [nyaapa] [SPARK-6440][CORE] Remove Utils.localIpAddressHostname, Utils.localIpAddressURI and Utils.getAddressHostName; make Utils.localIpAddress private; rename Utils.localHostURI into Utils.localHostNameForURI; use Utils.localHostName in org.apache.spark.streaming.kinesis.KinesisReceiver and org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
2098081 [nyaapa] [SPARK-6440][CORE] style fixes and use getHostAddress instead of getHostName
84763d7 [nyaapa] [SPARK-6440][CORE]Handle IPv6 addresses properly when constructing URI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d117cee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d117cee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d117cee

Branch: refs/heads/master
Commit: 9d117cee0be2c73a25702d98f78211055d50babe
Parents: 14ce3ea
Author: nyaapa <ny...@gmail.com>
Authored: Mon Apr 13 12:55:25 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Apr 13 12:55:25 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpServer.scala     |  2 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |  2 +-
 .../apache/spark/deploy/client/TestClient.scala |  2 +-
 .../main/scala/org/apache/spark/ui/WebUI.scala  |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 34 ++++++++++++--------
 .../streaming/kinesis/KinesisReceiver.scala     |  3 +-
 .../sql/hive/thriftserver/SparkSQLEnv.scala     |  3 +-
 7 files changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 09a9ccc..8de3a6c 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -160,7 +160,7 @@ private[spark] class HttpServer(
       throw new ServerStateException("Server is not started")
     } else {
       val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
-      s"$scheme://${Utils.localIpAddress}:$port"
+      s"$scheme://${Utils.localHostNameForURI()}:$port"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 3ab425a..f0e77c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -53,7 +53,7 @@ class LocalSparkCluster(
     /* Start the Master */
     val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
     masterActorSystems += masterSystem
-    val masterUrl = "spark://" + localHostname + ":" + masterPort
+    val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
     val masters = Array(masterUrl)
 
     /* Start the Workers */

http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index c1c4812..40835b9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -46,7 +46,7 @@ private[spark] object TestClient {
   def main(args: Array[String]) {
     val url = args(0)
     val conf = new SparkConf
-    val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+    val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
       conf = conf, securityManager = new SecurityManager(conf))
     val desc = new ApplicationDescription("TestClient", Some(1), 512,
       Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")

http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index ea548f2..f9860d1 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -48,7 +48,7 @@ private[spark] abstract class WebUI(
   protected val handlers = ArrayBuffer[ServletContextHandler]()
   protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
   protected var serverInfo: Option[ServerInfo] = None
-  protected val localHostName = Utils.localHostName()
+  protected val localHostName = Utils.localHostNameForURI()
   protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
   private val className = Utils.getFormattedClassName(this)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0fdfaf3..a541d66 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -34,6 +34,7 @@ import scala.util.Try
 import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.{ByteStreams, Files}
+import com.google.common.net.InetAddresses
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
@@ -789,13 +790,12 @@ private[spark] object Utils extends Logging {
    * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
    * Note, this is typically not used from within core spark.
    */
-  lazy val localIpAddress: String = findLocalIpAddress()
-  lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
+  private lazy val localIpAddress: InetAddress = findLocalInetAddress()
 
-  private def findLocalIpAddress(): String = {
+  private def findLocalInetAddress(): InetAddress = {
     val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
     if (defaultIpOverride != null) {
-      defaultIpOverride
+      InetAddress.getByName(defaultIpOverride)
     } else {
       val address = InetAddress.getLocalHost
       if (address.isLoopbackAddress) {
@@ -806,15 +806,20 @@ private[spark] object Utils extends Logging {
         // It's more proper to pick ip address following system output order.
         val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
         val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse
+
         for (ni <- reOrderedNetworkIFs) {
-          for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
-               !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
+          val addresses = ni.getInetAddresses.toList
+            .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
+          if (addresses.nonEmpty) {
+            val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
+            // because of Inet6Address.toHostName may add interface at the end if it knows about it
+            val strippedAddress = InetAddress.getByAddress(addr.getAddress)
             // We've found an address that looks reasonable!
             logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
-              " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
-              " instead (on interface " + ni.getName + ")")
+              " a loopback address: " + address.getHostAddress + "; using " +
+              strippedAddress.getHostAddress + " instead (on interface " + ni.getName + ")")
             logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
-            return addr.getHostAddress
+            return strippedAddress
           }
         }
         logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
@@ -822,7 +827,7 @@ private[spark] object Utils extends Logging {
           " external IP address!")
         logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
       }
-      address.getHostAddress
+      address
     }
   }
 
@@ -842,11 +847,14 @@ private[spark] object Utils extends Logging {
    * Get the local machine's hostname.
    */
   def localHostName(): String = {
-    customHostname.getOrElse(localIpAddressHostname)
+    customHostname.getOrElse(localIpAddress.getHostAddress)
   }
 
-  def getAddressHostName(address: String): String = {
-    InetAddress.getByName(address).getHostName
+  /**
+   * Get the local machine's URI.
+   */
+  def localHostNameForURI(): String = {
+    customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
   }
 
   def checkHost(host: String, message: String = "") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 1bd1f32..a7fe447 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -23,6 +23,7 @@ import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
 
 import com.amazonaws.auth.AWSCredentialsProvider
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
@@ -118,7 +119,7 @@ private[kinesis] class KinesisReceiver(
    *    method.
    */
   override def onStart() {
-    workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID()
+    workerId = Utils.localHostName() + ":" + UUID.randomUUID()
     credentialsProvider = new DefaultAWSCredentialsProviderChain()
     kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
       credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)

http://git-wip-us.apache.org/repos/asf/spark/blob/9d117cee/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 158c225..97b46a0 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
 import org.apache.spark.scheduler.StatsReportListener
 import org.apache.spark.sql.hive.{HiveShim, HiveContext}
 import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.util.Utils
 
 /** A singleton object for the master program. The slaves should not access this. */
 private[hive] object SparkSQLEnv extends Logging {
@@ -37,7 +38,7 @@ private[hive] object SparkSQLEnv extends Logging {
       val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
 
       sparkConf
-        .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
+        .setAppName(s"SparkSQL::${Utils.localHostName()}")
         .set("spark.sql.hive.version", HiveShim.version)
         .set(
           "spark.serializer",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org