You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 09:16:55 UTC
[2/3] flink git commit: [FLINK-2766] [core] Add proper handling of
IPv6 address literals in URLs
[FLINK-2766] [core] Add proper handling of IPv6 address literals in URLs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfde1b73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfde1b73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfde1b73
Branch: refs/heads/master
Commit: bfde1b7379f0a0aef21c836e5ac0842474c7f98f
Parents: a3c0b44
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 29 17:45:06 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:56 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/util/NetUtils.java | 80 +++++++++
.../org/apache/flink/util/NetUtilsTest.java | 97 +++++++++++
.../apache/flink/runtime/client/JobClient.java | 7 +-
.../flink/runtime/net/ConnectionUtils.java | 2 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 42 +++--
.../flink/runtime/jobmanager/JobManager.scala | 20 +--
.../runtime/minicluster/FlinkMiniCluster.scala | 6 +-
.../flink/runtime/taskmanager/TaskManager.scala | 7 +-
.../TaskManagerProcessReapingTest.java | 9 +-
.../flink/runtime/akka/AkkaUtilsTest.scala | 46 +++++-
.../fs/RollingSinkFaultTolerance2ITCase.java | 9 +-
.../fs/RollingSinkFaultToleranceITCase.java | 9 +-
.../connectors/fs/RollingSinkITCase.java | 5 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 27 ++-
.../connectors/kafka/KafkaTestBase.java | 3 +-
.../test/util/ForkableFlinkMiniCluster.scala | 2 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 165 +++++++++++++++++++
17 files changed, 480 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 03721c5..da445ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -15,13 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.util;
+import com.google.common.net.InetAddresses;
import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
+import java.net.UnknownHostException;
public class NetUtils {
@@ -68,6 +75,10 @@ public class NetUtils {
}
}
+ // ------------------------------------------------------------------------
+ // Lookup of to free ports
+ // ------------------------------------------------------------------------
+
/**
* Find a non-occupied port.
*
@@ -86,4 +97,73 @@ public class NetUtils {
throw new RuntimeException("Could not find a free permitted port on the machine.");
}
+
+
+ // ------------------------------------------------------------------------
+ // Encoding of IP addresses for URLs
+ // ------------------------------------------------------------------------
+
+ /**
+ * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
+ * have the proper formatting to be included in URLs.
+ * <p>
+ * This method internally uses Guava's functionality to properly encode IPv6 addresses.
+ *
+ * @param address The IP address to encode.
+ * @return The proper URL string encoded IP address.
+ */
+ public static String ipAddressToUrlString(InetAddress address) {
+ if (address == null) {
+ throw new NullPointerException("address is null");
+ }
+ else if (address instanceof Inet4Address) {
+ return address.getHostAddress();
+ }
+ else if (address instanceof Inet6Address) {
+ return '[' + InetAddresses.toAddrString(address) + ']';
+ }
+ else {
+ throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);
+ }
+ }
+
+ /**
+ * Encodes an IP address and port to be included in URL. in particular, this method makes
+ * sure that IPv6 addresses have the proper formatting to be included in URLs.
+ *
+ * @param address The address to be included in the URL.
+ * @param port The port for the URL address.
+ * @return The proper URL string encoded IP address and port.
+ */
+ public static String ipAddressAndPortToUrlString(InetAddress address, int port) {
+ return ipAddressToUrlString(address) + ':' + port;
+ }
+
+ /**
+ * Encodes an IP address and port to be included in URL. in particular, this method makes
+ * sure that IPv6 addresses have the proper formatting to be included in URLs.
+ *
+ * @param address The socket address with the IP address and port.
+ * @return The proper URL string encoded IP address and port.
+ */
+ public static String socketAddressToUrlString(InetSocketAddress address) {
+ if (address.isUnresolved()) {
+ throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());
+ }
+ return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());
+ }
+
+ /**
+ * Normalizes and encodes a hostname and port to be included in URL.
+ * In particular, this method makes sure that IPv6 address literals have the proper
+ * formatting to be included in URLs.
+ *
+ * @param host The address to be included in the URL.
+ * @param port The port for the URL address.
+ * @return The proper URL string encoded IP address and port.
+ * @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into a URL.
+ */
+ public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
+ return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
new file mode 100644
index 0000000..cd2c13b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.*;
+
+public class NetUtilsTest {
+
+ @Test
+ public void testIPv4toURL() {
+ try {
+ final String addressString = "192.168.0.1";
+
+ InetAddress address = InetAddress.getByName(addressString);
+ assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIPv6toURL() {
+ try {
+ final String addressString = "2001:01db8:00:0:00:ff00:42:8329";
+ final String normalizedAddress = "[2001:1db8::ff00:42:8329]";
+
+ InetAddress address = InetAddress.getByName(addressString);
+ assertEquals(normalizedAddress, NetUtils.ipAddressToUrlString(address));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIPv4URLEncoding() {
+ try {
+ final String addressString = "10.244.243.12";
+ final int port = 23453;
+
+ InetAddress address = InetAddress.getByName(addressString);
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+
+ assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+ assertEquals(addressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address, port));
+ assertEquals(addressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIPv6URLEncoding() {
+ try {
+ final String addressString = "2001:db8:10:11:12:ff00:42:8329";
+ final String bracketedAddressString = '[' + addressString + ']';
+ final int port = 23453;
+
+ InetAddress address = InetAddress.getByName(addressString);
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+
+ assertEquals(bracketedAddressString, NetUtils.ipAddressToUrlString(address));
+ assertEquals(bracketedAddressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address, port));
+ assertEquals(bracketedAddressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index c51bc7c..a436881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,9 +75,11 @@ public class JobClient {
ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
Address address = system.provider().getDefaultAddress();
- String host = address.host().isDefined() ? address.host().get() : "(unknown)";
+ String hostAddress = address.host().isDefined() ?
+ NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
+ "(unknown)";
int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
- LOG.info("Started JobClient actor system at " + host + ':' + port);
+ LOG.info("Started JobClient actor system at " + hostAddress + ':' + port);
return system;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 0ed9345..542e69e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -365,7 +365,7 @@ public class ConnectionUtils {
boolean logging = elapsedTime >= startLoggingAfter.toMillis();
if (logging) {
- LOG.info("Trying to connect to address {}." + targetAddress);
+ LOG.info("Trying to connect to address {}", targetAddress);
}
do {
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 8007ef6..bf679c9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -19,13 +19,14 @@
package org.apache.flink.runtime.akka
import java.io.IOException
-import java.net.{InetSocketAddress, InetAddress}
+import java.net._
import java.util.concurrent.{TimeUnit, Callable}
import akka.actor._
import akka.pattern.{ask => akkaAsk}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.util.NetUtils
import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
import org.slf4j.LoggerFactory
import scala.concurrent._
@@ -102,6 +103,7 @@ object AkkaUtils {
* then an Akka config for local actor system will be returned
* @return Akka config
*/
+ @throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
listeningAddress: Option[(String, Int)]): Config = {
val defaultConfig = getBasicAkkaConfig(configuration)
@@ -109,8 +111,9 @@ object AkkaUtils {
listeningAddress match {
case Some((hostname, port)) =>
- val ipAddress = "\"" + InetAddress.getByName(hostname).getHostAddress() + "\""
- val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port)
+ val ipAddress = InetAddress.getByName(hostname)
+ val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\""
+ val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port)
remoteConfig.withFallback(defaultConfig)
case None =>
@@ -513,23 +516,30 @@ object AkkaUtils {
* the Akka URL does not contain the hostname and port information, e.g. a local Akka URL is
* provided, then an [[Exception]] is thrown.
*
- * @param akkaURL
- * @throws java.lang.Exception
- * @return
+ * @param akkaURL The URL to extract the host and port from.
+ * @throws java.lang.Exception Thrown, if the given string does not represent a proper url
+ * @return The InetSocketAddress with teh extracted host and port.
*/
@throws(classOf[Exception])
def getInetSockeAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
// AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL
- val hostPortRegex = """@([^/:]*):(\d*)""".r
-
- hostPortRegex.findFirstMatchIn(akkaURL) match {
- case Some(m) =>
- val host = m.group(1)
- val port = m.group(2).toInt
-
- new InetSocketAddress(host, port)
- case None => throw new Exception("Could not retrieve InetSocketAddress from " +
- s"Akka URL $akkaURL")
+ try {
+ // we need to manually strip the protocol, because "akka.tcp" is not
+ // a valid protocol for Java's URL class
+ val protocolonPos = akkaURL.indexOf("://")
+ if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) {
+ throw new MalformedURLException()
+ }
+
+ val url = new URL("http://" + akkaURL.substring(protocolonPos + 3))
+ if (url.getHost == null || url.getPort == -1) {
+ throw new MalformedURLException()
+ }
+ new InetSocketAddress(url.getHost, url.getPort)
+ }
+ catch {
+ case _ : MalformedURLException =>
+ throw new Exception(s"Could not retrieve InetSocketAddress from Akka URL $akkaURL")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 07a5977..b7f76ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,12 +44,8 @@ import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-
-import org.apache.flink.runtime.messages.accumulators.{AccumulatorResultsErroneous,
-AccumulatorResultsFound, RequestAccumulatorResults, AccumulatorMessage,
-AccumulatorResultStringsFound, RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
-AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
@@ -67,7 +63,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
+import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil}
import scala.concurrent._
import scala.concurrent.duration._
@@ -1237,7 +1233,8 @@ object JobManager {
LOG.info("Starting JobManager")
// Bring up the job manager actor system first, bind it to the given address.
- LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
+ val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
+ LOG.info(s"Starting JobManager actor system at $hostPortUrl")
val jobManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1451,8 +1448,9 @@ object JobManager {
val executionMode = config.getJobManagerMode
val streamingMode = config.getStreamingMode
-
- LOG.info(s"Starting JobManager on $host:$port with execution mode $executionMode and " +
+ val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port)
+
+ LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode and " +
s"streaming mode $streamingMode")
(configuration, executionMode, streamingMode, host, port)
@@ -1657,7 +1655,7 @@ object JobManager {
address: InetSocketAddress,
name: Option[String] = None)
: String = {
- val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
+ val hostPort = NetUtils.socketAddressToUrlString(address)
getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 839193b..b3cff51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -76,7 +76,9 @@ abstract class FlinkMiniCluster(
// NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
- val hostname = InetAddress.getByName("localhost").getHostAddress()
+ val hostname = userConfiguration.getString(
+ ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
+ InetAddress.getByName("localhost").getHostAddress())
val configuration = generateConfiguration(userConfiguration)
@@ -243,7 +245,7 @@ abstract class FlinkMiniCluster(
jobManagerActorSystems = Some(jmActorSystems)
jobManagerActors = Some(jmActors)
- val lrs = createLeaderRetrievalService();
+ val lrs = createLeaderRetrievalService()
leaderRetrievalService = Some(lrs)
lrs.start(this)
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7a1bec5..bf23021 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1425,8 +1425,9 @@ object TaskManager {
LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
// Bring up the TaskManager actor system first, bind it to the given address.
-
- LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort))
val taskManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1443,7 +1444,7 @@ object TaskManager {
if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
val cause = t.getCause()
if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
- val address = taskManagerHostname + ":" + actorSystemPort
+ val address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)
throw new IOException("Unable to bind TaskManager actor system to address " +
address + " - " + cause.getMessage(), t)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 1334bcc..ed03ae7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -40,6 +40,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
+import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -78,10 +79,11 @@ public class TaskManagerProcessReapingTest {
tempLogFile.deleteOnExit();
CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+ final InetAddress localhost = InetAddress.getByName("localhost");
final int jobManagerPort = NetUtils.getAvailablePort();
// start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
jmActorSystem = AkkaUtils.createActorSystem(
new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
@@ -109,8 +111,9 @@ public class TaskManagerProcessReapingTest {
// grab the reference to the TaskManager. try multiple times, until the process
// is started and the TaskManager is up
- String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s",
- "127.0.0.1", taskManagerPort, TaskManager.TASK_MANAGER_NAME());
+ String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
+ org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+ TaskManager.TASK_MANAGER_NAME());
ActorRef taskManagerRef = null;
Throwable lastError = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 59477db..4e08857 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.akka
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
import org.apache.flink.runtime.jobmanager.JobManager
import org.junit.runner.RunWith
@@ -64,4 +64,48 @@ class AkkaUtilsTest
result should equal(expected)
}
+ test("getHostFromAkkaURL should handle 'akka.tcp' as protocol") {
+ val url = "akka.tcp://flink@localhost:1234/user/jobmanager"
+ val expected = new InetSocketAddress("localhost", 1234)
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(expected)
+ }
+
+ test("getHostFromAkkaURL should properly handle IPv4 addresses in URLs") {
+ val IPv4AddressString = "192.168.0.1"
+ val port = 1234
+ val address = new InetSocketAddress(IPv4AddressString, port)
+
+ val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(address)
+ }
+
+ test("getHostFromAkkaURL should properly handle IPv6 addresses in URLs") {
+ val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+ val port = 1234
+ val address = new InetSocketAddress(IPv6AddressString, port)
+
+ val url = s"akka://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(address)
+ }
+
+ test("getHostFromAkkaURL should properly handle IPv6 addresses in 'akka.tcp' URLs") {
+ val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+ val port = 1234
+ val address = new InetSocketAddress(IPv6AddressString, port)
+
+ val url = s"akka.tcp://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(address)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
index 9c70ed2..7d127ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -69,7 +70,6 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
private static MiniDFSCluster hdfsCluster;
private static org.apache.hadoop.fs.FileSystem dfs;
- private static String hdfsURI;
private static String outPath;
@@ -87,10 +87,9 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
dfs = hdfsCluster.getFileSystem();
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
- outPath = hdfsURI + "/string-non-rolling-out-no-checkpoint";
-
+ outPath = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/string-non-rolling-out-no-checkpoint";
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index e0592e9..65904d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -64,7 +65,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
private static MiniDFSCluster hdfsCluster;
private static org.apache.hadoop.fs.FileSystem dfs;
- private static String hdfsURI;
private static String outPath;
@@ -82,10 +82,9 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
dfs = hdfsCluster.getFileSystem();
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
- outPath = hdfsURI + "/string-non-rolling-out";
-
+ outPath = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/string-non-rolling-out";
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 008b4b6..9770f41 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -79,7 +80,9 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
dfs = hdfsCluster.getFileSystem();
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+ hdfsURI = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/";
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 5016e7e..e9a5728 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -69,6 +69,7 @@ import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Assert;
@@ -841,7 +842,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while (firstPart.errorCode() != 0);
zkClient.close();
- final String leaderToShutDown = firstPart.leader().get().connectionString();
+ final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
+ final String leaderToShutDownConnection =
+ NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
+
+
final int leaderIdToShutDown = firstPart.leader().get().id();
LOG.info("Leader to shutdown {}", leaderToShutDown);
@@ -863,7 +868,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env
.addSource(kafkaSource)
.map(new PartitionValidatingMapper(parallelism, 1))
- .map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+ .map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
BrokerKillingMapper.killedLeaderBefore = false;
@@ -1068,14 +1073,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// shut down a Kafka broker
KafkaServer toShutDown = null;
for (KafkaServer kafkaServer : brokers) {
- if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+ String connectionUrl =
+ NetUtils.hostAndPortToUrlString(
+ kafkaServer.config().advertisedHostName(),
+ kafkaServer.config().advertisedPort());
+ if (leaderToShutDown.equals(connectionUrl)) {
toShutDown = kafkaServer;
break;
}
}
if (toShutDown == null) {
- throw new Exception("Cannot find broker to shut down");
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer kafkaServer : brokers) {
+ listOfBrokers.append(
+ NetUtils.hostAndPortToUrlString(
+ kafkaServer.config().advertisedHostName(),
+ kafkaServer.config().advertisedPort()));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
+ + " ; available brokers: " + listOfBrokers.toString());
}
else {
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 61f384a..d511796 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -59,6 +59,7 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -148,7 +149,7 @@ public abstract class KafkaTestBase extends TestLogger {
SocketServer socketServer = brokers.get(i).socketServer();
String host = socketServer.host() == null ? "localhost" : socketServer.host();
- brokerConnectionStrings += host+":"+socketServer.port()+",";
+ brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
}
LOG.info("ZK and KafkaServer started.");
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c8b0e0c..5c5a465 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -96,7 +96,7 @@ class ForkableFlinkMiniCluster(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
- if(jobManagerPort > 0) {
+ if (jobManagerPort > 0) {
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
new file mode 100644
index 0000000..af51ed6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.Some;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.util.Enumeration;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class IPv6HostnamesITCase extends TestLogger {
+
+ @Test
+ public void testClusterWithIPv6host() {
+
+ final Inet6Address ipv6address = getLocalIPv6Address();
+ if (ipv6address == null) {
+ System.err.println("--- Cannot find a non-loopback local IPv6 address, skipping IPv6HostnamesITCase");
+ return;
+ }
+
+
+
+ ForkableFlinkMiniCluster flink = null;
+ try {
+ final String addressString = ipv6address.getHostAddress();
+ log.info("Test will use IPv6 address " + addressString + " for connection tests");
+
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
+ conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
+ conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+
+ flink = new ForkableFlinkMiniCluster(conf, false, StreamingMode.BATCH_ONLY);
+ flink.start();
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());
+
+ // get input data
+ DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+ DataSet<Tuple2<String, Integer>> counts =text
+ .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (String token : value.toLowerCase().split("\\W+")) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ })
+ .groupBy(0).sum(1);
+
+ List<Tuple2<String, Integer>> result = counts.collect();
+
+ TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (flink != null) {
+ flink.shutdown();
+ }
+ }
+ }
+
+
+ private Inet6Address getLocalIPv6Address() {
+ try {
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+ NetworkInterface netInterface = e.nextElement();
+
+ // for each address of the network interface
+ Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress addr = ee.nextElement();
+
+
+ if (addr instanceof Inet6Address && (!addr.isLoopbackAddress()) && (!addr.isAnyLocalAddress())) {
+ // see if it is possible to bind to the address
+ InetSocketAddress socketAddress = new InetSocketAddress(addr, 0);
+
+ try {
+ log.info("Considering address " + addr);
+
+ // test whether we can bind a socket to that address
+ log.info("Testing whether sockets can bind to " + addr);
+ ServerSocket sock = new ServerSocket();
+ sock.bind(socketAddress);
+ sock.close();
+
+ // test whether Akka's netty can bind to the address
+ log.info("Testing whether Akka can use " + addr);
+ int port = NetUtils.getAvailablePort();
+ ActorSystem as = AkkaUtils.createActorSystem(
+ new Configuration(),
+ new Some<scala.Tuple2<String, Object>>(new scala.Tuple2<String, Object>(addr.getHostAddress(), port)));
+ as.shutdown();
+
+ log.info("Using address " + addr);
+ return (Inet6Address) addr;
+ }
+ catch (IOException ignored) {
+ // fall through the loop
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+}