You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 09:16:54 UTC
[1/3] flink git commit: [FLINK-2781] [core] Cleanup NetUtils.
Repository: flink
Updated Branches:
refs/heads/master 28d36cc92 -> c04a77042
[FLINK-2781] [core] Cleanup NetUtils.
- The NetUtils class (in flink-core) contains all methods usable without runtime dependency
- The runtime NetUtils class (to find connectiong addresses) is now called ConnectionUtil.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3c0b446
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3c0b446
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3c0b446
Branch: refs/heads/master
Commit: a3c0b446383262a6643b0c0888d14b52ac732835
Parents: 28d36cc
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 29 16:36:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:55 2015 +0200
----------------------------------------------------------------------
.../client/program/ClientConnectionTest.java | 2 +-
.../apache/flink/client/program/ClientTest.java | 6 +-
.../contrib/streaming/DataStreamUtils.java | 4 +-
.../java/org/apache/flink/util/NetUtils.java | 21 +
.../flink/runtime/net/ConnectionUtils.java | 456 +++++++++++++++++
.../org/apache/flink/runtime/net/NetUtils.java | 495 -------------------
.../runtime/util/LeaderRetrievalUtils.java | 4 +-
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../io/network/NetworkEnvironmentTest.java | 4 +-
.../runtime/io/network/netty/NettyTestUtil.java | 14 +-
.../jobmanager/JobManagerStartupTest.java | 2 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 4 +-
.../flink/runtime/net/ConnectionUtilsTest.java | 70 +++
.../apache/flink/runtime/net/NetUtilsTest.java | 70 ---
.../TaskManagerProcessReapingTest.java | 4 +-
.../runtime/taskmanager/TaskManagerTest.java | 4 +-
.../jobmanager/JobManagerConnectionTest.scala | 2 +-
.../runtime/testingUtils/TestingCluster.scala | 3 +-
.../connectors/kafka/KafkaTestBase.java | 2 +-
.../streaming/util/SocketOutputTestBase.java | 3 +-
.../streaming/util/SocketProgramITCaseBase.java | 3 +-
.../AbstractProcessFailureRecoveryTest.java | 6 +-
.../recovery/ProcessFailureCancelingITCase.java | 2 +-
.../org/apache/flink/yarn/FlinkYarnCluster.java | 4 +-
24 files changed, 585 insertions(+), 602 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index ef2161e..8be6b03 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
import org.junit.Test;
import java.net.InetAddress;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 621ef63..a620139 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
@@ -44,20 +43,17 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.NetUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collections;
-
-
import java.util.UUID;
import static org.junit.Assert.*;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
index c965aa8..6f4e8d9 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.net.ConnectionUtils;
public final class DataStreamUtils {
@@ -46,7 +46,7 @@ public final class DataStreamUtils {
String host = ((RemoteStreamEnvironment)env).getHost();
int port = ((RemoteStreamEnvironment)env).getPort();
try {
- clientAddress = NetUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
+ clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
} catch (IOException e) {
throw new RuntimeException("IOException while trying to connect to the master", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index abbcf3c..03721c5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -18,7 +18,9 @@
package org.apache.flink.util;
+import java.io.IOException;
import java.net.MalformedURLException;
+import java.net.ServerSocket;
import java.net.URL;
public class NetUtils {
@@ -65,4 +67,23 @@ public class NetUtils {
throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);
}
}
+
+ /**
+ * Find a non-occupied port.
+ *
+ * @return A non-occupied port.
+ */
+ public static int getAvailablePort() {
+ for (int i = 0; i < 50; i++) {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ int port = serverSocket.getLocalPort();
+ if (port != 0) {
+ return port;
+ }
+ }
+ catch (IOException ignored) {}
+ }
+
+ throw new RuntimeException("Could not find a free permitted port on the machine.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
new file mode 100644
index 0000000..0ed9345
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.net;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utilities to determine the network interface and address that should be used to bind the
+ * TaskManager communication to.
+ */
+public class ConnectionUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);
+
+ private static final long MIN_SLEEP_TIME = 50;
+ private static final long MAX_SLEEP_TIME = 20000;
+
+ /**
+ * The states of address detection mechanism.
+ * There is only a state transition if the current state failed to determine the address.
+ */
+ private enum AddressDetectionState {
+ /** Connect from interface returned by InetAddress.getLocalHost() **/
+ LOCAL_HOST(50),
+ /** Detect own IP address based on the target IP address. Look for common prefix */
+ ADDRESS(50),
+ /** Try to connect on all Interfaces and all their addresses with a low timeout */
+ FAST_CONNECT(50),
+ /** Try to connect on all Interfaces and all their addresses with a long timeout */
+ SLOW_CONNECT(1000),
+ /** Choose any non-loopback address */
+ HEURISTIC(0);
+
+ private int timeout;
+
+ AddressDetectionState(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+ }
+
+
+ /**
+ * Finds the local network address from which this machine can connect to the target
+ * address. This method tries to establish a proper network connection to the
+ * given target, so it only succeeds if the target socket address actually accepts
+ * connections. The method tries various strategies multiple times and uses an exponential
+ * backoff timer between tries.
+ * <p>
+ * If no connection attempt was successful after the given maximum time, the method
+ * will choose some address based on heuristics (excluding link-local and loopback addresses.)
+ * <p>
+ * This method will initially not log on info level (to not flood the log while the
+ * backoff time is still very low). It will start logging after a certain time
+ * has passes.
+ *
+ * @param targetAddress The address that the method tries to connect to.
+ * @param maxWaitMillis The maximum time that this method tries to connect, before falling
+ * back to the heuristics.
+ * @param startLoggingAfter The time after which the method will log on INFO level.
+ */
+ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
+ long maxWaitMillis, long startLoggingAfter) throws IOException
+ {
+ if (targetAddress == null) {
+ throw new NullPointerException("targetAddress must not be null");
+ }
+ if (maxWaitMillis <= 0) {
+ throw new IllegalArgumentException("Max wait time must be positive");
+ }
+
+ final long startTime = System.currentTimeMillis();
+
+ long currentSleepTime = MIN_SLEEP_TIME;
+ long elapsedTime = 0;
+
+ // loop while there is time left
+ while (elapsedTime < maxWaitMillis) {
+ AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
+
+ boolean logging = elapsedTime >= startLoggingAfter;
+ if (logging) {
+ LOG.info("Trying to connect to " + targetAddress);
+ }
+ // go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
+ do {
+ InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
+ if (address != null) {
+ return address;
+ }
+
+ // pick the next strategy
+ switch (strategy) {
+ case LOCAL_HOST:
+ strategy = AddressDetectionState.ADDRESS;
+ break;
+ case ADDRESS:
+ strategy = AddressDetectionState.FAST_CONNECT;
+ break;
+ case FAST_CONNECT:
+ strategy = AddressDetectionState.SLOW_CONNECT;
+ break;
+ case SLOW_CONNECT:
+ strategy = null;
+ break;
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ }
+ while (strategy != null);
+
+ // we have made a pass with all strategies over all interfaces
+ // sleep for a while before we make the next pass
+ elapsedTime = System.currentTimeMillis() - startTime;
+
+ long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+ if (toWait > 0) {
+ if (logging) {
+ LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
+ } else {
+ LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
+ }
+
+ try {
+ Thread.sleep(toWait);
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Connection attempts have been interrupted.");
+ }
+ }
+
+ // increase the exponential backoff timer
+ currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+ }
+
+ // our attempts timed out. use the heuristic fallback
+ LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+ InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+ if (heuristic != null) {
+ return heuristic;
+ }
+ else {
+ LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+ return InetAddress.getLocalHost();
+ }
+ }
+
+
+ private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
+ InetSocketAddress targetAddress,
+ boolean logging) throws IOException
+ {
+ // try LOCAL_HOST strategy independent of the network interfaces
+ if(strategy == AddressDetectionState.LOCAL_HOST) {
+ InetAddress localhostName = InetAddress.getLocalHost();
+
+ if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
+ LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
+ return localhostName;
+ } else {
+ return null;
+ }
+ }
+
+ final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
+
+ // for each network interface
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+
+ NetworkInterface netInterface = e.nextElement();
+
+ // for each address of the network interface
+ Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress interfaceAddress = ee.nextElement();
+
+ switch (strategy) {
+ case ADDRESS:
+ if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
+ LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
+ targetAddress, interfaceAddress);
+
+ if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+ return interfaceAddress;
+ }
+ }
+ break;
+
+ case FAST_CONNECT:
+ case SLOW_CONNECT:
+ LOG.debug("Trying to connect to {} from local address {} with timeout {}",
+ targetAddress, interfaceAddress, strategy.getTimeout());
+
+ if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+ return interfaceAddress;
+ }
+ break;
+
+ case HEURISTIC:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
+ interfaceAddress, interfaceAddress.isLinkLocalAddress(),
+ interfaceAddress.isLoopbackAddress());
+ }
+ // pick a non-loopback non-link-local address
+ if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
+ !interfaceAddress.isLoopbackAddress())
+ {
+ return interfaceAddress;
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ } // end for each address of the interface
+ } // end for each interface
+
+ return null;
+ }
+
+ /**
+ * Checks if two addresses have a common prefix (first 2 bytes).
+ * Example: 192.168.???.???
+ * Works also with ipv6, but accepts probably too many addresses
+ */
+ private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
+ return address[0] == address2[0] && address[1] == address2[1];
+ }
+
+ /**
+ *
+ * @param fromAddress The address to connect from.
+ * @param toSocket The socket address to connect to.
+ * @param timeout The timeout fr the connection.
+ * @param logFailed Flag to indicate whether to log failed attempts on info level
+ * (failed attempts are always logged on DEBUG level).
+ * @return True, if the connection was successful, false otherwise.
+ * @throws IOException Thrown if the socket cleanup fails.
+ */
+ private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
+ int timeout, boolean logFailed) throws IOException
+ {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
+ + " with timeout " + timeout);
+ }
+ try (Socket socket = new Socket()) {
+ // port 0 = let the OS choose the port
+ SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
+ // machine
+ socket.bind(bindP);
+ socket.connect(toSocket, timeout);
+ return true;
+ }
+ catch (Exception ex) {
+ String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(message, ex);
+ } else if (logFailed) {
+ LOG.info(message);
+ }
+ return false;
+ }
+ }
+
+ public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
+
+ private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
+
+ private enum LeaderRetrievalState {
+ NOT_RETRIEVED,
+ RETRIEVED,
+ NEWLY_RETRIEVED
+ }
+
+ final private Object retrievalLock = new Object();
+
+ private String akkaURL;
+ private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
+ private Exception exception;
+
+ public InetAddress findConnectingAddress(
+ FiniteDuration timeout) throws LeaderRetrievalException {
+ return findConnectingAddress(timeout, defaultLoggingDelay);
+ }
+
+ public InetAddress findConnectingAddress(
+ FiniteDuration timeout,
+ FiniteDuration startLoggingAfter)
+ throws LeaderRetrievalException {
+ long startTime = System.currentTimeMillis();
+ long currentSleepTime = MIN_SLEEP_TIME;
+ long elapsedTime = 0;
+ InetSocketAddress targetAddress = null;
+
+ try {
+ while (elapsedTime < timeout.toMillis()) {
+
+ long maxTimeout = timeout.toMillis() - elapsedTime;
+
+ synchronized (retrievalLock) {
+ if (exception != null) {
+ throw exception;
+ }
+
+ if (retrievalState == LeaderRetrievalState.NOT_RETRIEVED) {
+ try {
+ retrievalLock.wait(maxTimeout);
+ } catch (InterruptedException e) {
+ throw new Exception("Finding connecting address was interrupted" +
+ "while waiting for the leader retrieval.");
+ }
+ } else if (retrievalState == LeaderRetrievalState.NEWLY_RETRIEVED) {
+ targetAddress = AkkaUtils.getInetSockeAddressFromAkkaURL(akkaURL);
+
+ LOG.info("Retrieved new target address {}.", targetAddress);
+
+ retrievalState = LeaderRetrievalState.RETRIEVED;
+
+ currentSleepTime = MIN_SLEEP_TIME;
+ } else {
+ currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+ }
+ }
+
+ if (targetAddress != null) {
+ AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
+
+ boolean logging = elapsedTime >= startLoggingAfter.toMillis();
+ if (logging) {
+ LOG.info("Trying to connect to address {}." + targetAddress);
+ }
+
+ do {
+ InetAddress address = ConnectionUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
+ if (address != null) {
+ return address;
+ }
+
+ // pick the next strategy
+ switch (strategy) {
+ case LOCAL_HOST:
+ strategy = AddressDetectionState.ADDRESS;
+ break;
+ case ADDRESS:
+ strategy = AddressDetectionState.FAST_CONNECT;
+ break;
+ case FAST_CONNECT:
+ strategy = AddressDetectionState.SLOW_CONNECT;
+ break;
+ case SLOW_CONNECT:
+ strategy = null;
+ break;
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ }
+ while (strategy != null);
+ }
+
+ elapsedTime = System.currentTimeMillis() - startTime;
+
+ long timeToWait = Math.min(
+ Math.max(timeout.toMillis() - elapsedTime, 0),
+ currentSleepTime);
+
+ if (timeToWait > 0) {
+ synchronized (retrievalLock) {
+ try {
+ retrievalLock.wait(timeToWait);
+ } catch (InterruptedException e) {
+ throw new Exception("Finding connecting address was interrupted while pausing.");
+ }
+ }
+
+ elapsedTime = System.currentTimeMillis() - startTime;
+ }
+ }
+
+ InetAddress heuristic = null;
+
+ if (targetAddress != null) {
+ LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+ heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+ }
+
+ if (heuristic != null) {
+ return heuristic;
+ } else {
+ LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+ return InetAddress.getLocalHost();
+ }
+ } catch (Exception e) {
+ throw new LeaderRetrievalException("Could not retrieve the connecting address to the " +
+ "current leader with the akka URL " + akkaURL + ".", e);
+ }
+ }
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ if (leaderAddress != null && !leaderAddress.equals("")) {
+ synchronized (retrievalLock) {
+ akkaURL = leaderAddress;
+ retrievalState = LeaderRetrievalState.NEWLY_RETRIEVED;
+
+ retrievalLock.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ synchronized (retrievalLock) {
+ this.exception = exception;
+ retrievalLock.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
deleted file mode 100644
index 2df0616..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Enumeration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities to determine the network interface and address that should be used to bind the
- * TaskManager communication to.
- */
-public class NetUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
-
- private static final long MIN_SLEEP_TIME = 50;
- private static final long MAX_SLEEP_TIME = 20000;
-
- /**
- * The states of address detection mechanism.
- * There is only a state transition if the current state failed to determine the address.
- */
- private enum AddressDetectionState {
- /** Connect from interface returned by InetAddress.getLocalHost() **/
- LOCAL_HOST(50),
- /** Detect own IP address based on the target IP address. Look for common prefix */
- ADDRESS(50),
- /** Try to connect on all Interfaces and all their addresses with a low timeout */
- FAST_CONNECT(50),
- /** Try to connect on all Interfaces and all their addresses with a long timeout */
- SLOW_CONNECT(1000),
- /** Choose any non-loopback address */
- HEURISTIC(0);
-
- private int timeout;
-
- AddressDetectionState(int timeout) {
- this.timeout = timeout;
- }
-
- public int getTimeout() {
- return timeout;
- }
- }
-
-
- /**
- * Finds the local network address from which this machine can connect to the target
- * address. This method tries to establish a proper network connection to the
- * given target, so it only succeeds if the target socket address actually accepts
- * connections. The method tries various strategies multiple times and uses an exponential
- * backoff timer between tries.
- * <p>
- * If no connection attempt was successful after the given maximum time, the method
- * will choose some address based on heuristics (excluding link-local and loopback addresses.)
- * <p>
- * This method will initially not log on info level (to not flood the log while the
- * backoff time is still very low). It will start logging after a certain time
- * has passes.
- *
- * @param targetAddress The address that the method tries to connect to.
- * @param maxWaitMillis The maximum time that this method tries to connect, before falling
- * back to the heuristics.
- * @param startLoggingAfter The time after which the method will log on INFO level.
- */
- public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
- long maxWaitMillis, long startLoggingAfter) throws IOException
- {
- if (targetAddress == null) {
- throw new NullPointerException("targetAddress must not be null");
- }
- if (maxWaitMillis <= 0) {
- throw new IllegalArgumentException("Max wait time must be positive");
- }
-
- final long startTime = System.currentTimeMillis();
-
- long currentSleepTime = MIN_SLEEP_TIME;
- long elapsedTime = 0;
-
- // loop while there is time left
- while (elapsedTime < maxWaitMillis) {
- AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
-
- boolean logging = elapsedTime >= startLoggingAfter;
- if (logging) {
- LOG.info("Trying to connect to " + targetAddress);
- }
- // go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
- do {
- InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
- if (address != null) {
- return address;
- }
-
- // pick the next strategy
- switch (strategy) {
- case LOCAL_HOST:
- strategy = AddressDetectionState.ADDRESS;
- break;
- case ADDRESS:
- strategy = AddressDetectionState.FAST_CONNECT;
- break;
- case FAST_CONNECT:
- strategy = AddressDetectionState.SLOW_CONNECT;
- break;
- case SLOW_CONNECT:
- strategy = null;
- break;
- default:
- throw new RuntimeException("Unsupported strategy: " + strategy);
- }
- }
- while (strategy != null);
-
- // we have made a pass with all strategies over all interfaces
- // sleep for a while before we make the next pass
- elapsedTime = System.currentTimeMillis() - startTime;
-
- long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
- if (toWait > 0) {
- if (logging) {
- LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
- } else {
- LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
- }
-
- try {
- Thread.sleep(toWait);
- }
- catch (InterruptedException e) {
- throw new IOException("Connection attempts have been interrupted.");
- }
- }
-
- // increase the exponential backoff timer
- currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
- }
-
- // our attempts timed out. use the heuristic fallback
- LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
- InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
- if (heuristic != null) {
- return heuristic;
- }
- else {
- LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
- return InetAddress.getLocalHost();
- }
- }
-
-
- private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
- InetSocketAddress targetAddress,
- boolean logging) throws IOException
- {
- // try LOCAL_HOST strategy independent of the network interfaces
- if(strategy == AddressDetectionState.LOCAL_HOST) {
- InetAddress localhostName = InetAddress.getLocalHost();
-
- if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
- LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
- return localhostName;
- } else {
- return null;
- }
- }
-
- final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
-
- // for each network interface
- Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
- while (e.hasMoreElements()) {
-
- NetworkInterface netInterface = e.nextElement();
-
- // for each address of the network interface
- Enumeration<InetAddress> ee = netInterface.getInetAddresses();
- while (ee.hasMoreElements()) {
- InetAddress interfaceAddress = ee.nextElement();
-
- switch (strategy) {
- case ADDRESS:
- if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
- LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
- targetAddress, interfaceAddress);
-
- if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
- return interfaceAddress;
- }
- }
- break;
-
- case FAST_CONNECT:
- case SLOW_CONNECT:
- LOG.debug("Trying to connect to {} from local address {} with timeout {}",
- targetAddress, interfaceAddress, strategy.getTimeout());
-
- if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
- return interfaceAddress;
- }
- break;
-
- case HEURISTIC:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
- interfaceAddress, interfaceAddress.isLinkLocalAddress(),
- interfaceAddress.isLoopbackAddress());
- }
- // pick a non-loopback non-link-local address
- if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
- !interfaceAddress.isLoopbackAddress())
- {
- return interfaceAddress;
- }
- break;
-
- default:
- throw new RuntimeException("Unsupported strategy: " + strategy);
- }
- } // end for each address of the interface
- } // end for each interface
-
- return null;
- }
-
- /**
- * Checks if two addresses have a common prefix (first 2 bytes).
- * Example: 192.168.???.???
- * Works also with ipv6, but accepts probably too many addresses
- */
- private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
- return address[0] == address2[0] && address[1] == address2[1];
- }
-
- /**
- *
- * @param fromAddress The address to connect from.
- * @param toSocket The socket address to connect to.
- * @param timeout The timeout fr the connection.
- * @param logFailed Flag to indicate whether to log failed attempts on info level
- * (failed attempts are always logged on DEBUG level).
- * @return True, if the connection was successful, false otherwise.
- * @throws IOException Thrown if the socket cleanup fails.
- */
- private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
- int timeout, boolean logFailed) throws IOException
- {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
- + " with timeout " + timeout);
- }
- Socket socket = new Socket();
- try {
- // port 0 = let the OS choose the port
- SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
- // machine
- socket.bind(bindP);
- socket.connect(toSocket, timeout);
- return true;
- }
- catch (Exception ex) {
- String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
- if (LOG.isDebugEnabled()) {
- LOG.debug(message, ex);
- } else if (logFailed) {
- LOG.info(message);
- }
- return false;
- }
- finally {
- socket.close();
- }
- }
-
- /**
- * Find a non-occupied port.
- *
- * @return A non-occupied port.
- */
- public static int getAvailablePort() {
- for (int i = 0; i < 50; i++) {
- ServerSocket serverSocket = null;
- try {
- serverSocket = new ServerSocket(0);
- int port = serverSocket.getLocalPort();
- if (port != 0) {
- return port;
- }
- }
- catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to allocate port " + e.getMessage(), e);
- }
- }
- finally {
- if (serverSocket != null) {
- try {
- serverSocket.close();
- } catch (Throwable t) {
- // ignored
- }
- }
- }
- }
-
- throw new RuntimeException("Could not find a free permitted port on the machine.");
- }
-
- public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
-
- private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
-
- private enum LeaderRetrievalState {
- NOT_RETRIEVED,
- RETRIEVED,
- NEWLY_RETRIEVED
- }
-
- final private Object retrievalLock = new Object();
-
- private String akkaURL;
- private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
- private Exception exception;
-
- public InetAddress findConnectingAddress(
- FiniteDuration timeout) throws LeaderRetrievalException {
- return findConnectingAddress(timeout, defaultLoggingDelay);
- }
-
- public InetAddress findConnectingAddress(
- FiniteDuration timeout,
- FiniteDuration startLoggingAfter)
- throws LeaderRetrievalException {
- long startTime = System.currentTimeMillis();
- long currentSleepTime = MIN_SLEEP_TIME;
- long elapsedTime = 0;
- InetSocketAddress targetAddress = null;
-
- try {
- while (elapsedTime < timeout.toMillis()) {
-
- long maxTimeout = timeout.toMillis() - elapsedTime;
-
- synchronized (retrievalLock) {
- if (exception != null) {
- throw exception;
- }
-
- if (retrievalState == LeaderRetrievalState.NOT_RETRIEVED) {
- try {
- retrievalLock.wait(maxTimeout);
- } catch (InterruptedException e) {
- throw new Exception("Finding connecting address was interrupted" +
- "while waiting for the leader retrieval.");
- }
- } else if (retrievalState == LeaderRetrievalState.NEWLY_RETRIEVED) {
- targetAddress = AkkaUtils.getInetSockeAddressFromAkkaURL(akkaURL);
-
- LOG.info("Retrieved new target address {}.", targetAddress);
-
- retrievalState = LeaderRetrievalState.RETRIEVED;
-
- currentSleepTime = MIN_SLEEP_TIME;
- } else {
- currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
- }
- }
-
- if (targetAddress != null) {
- AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
-
- boolean logging = elapsedTime >= startLoggingAfter.toMillis();
- if (logging) {
- LOG.info("Trying to connect to address {}." + targetAddress);
- }
-
- do {
- InetAddress address = NetUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
- if (address != null) {
- return address;
- }
-
- // pick the next strategy
- switch (strategy) {
- case LOCAL_HOST:
- strategy = AddressDetectionState.ADDRESS;
- break;
- case ADDRESS:
- strategy = AddressDetectionState.FAST_CONNECT;
- break;
- case FAST_CONNECT:
- strategy = AddressDetectionState.SLOW_CONNECT;
- break;
- case SLOW_CONNECT:
- strategy = null;
- break;
- default:
- throw new RuntimeException("Unsupported strategy: " + strategy);
- }
- }
- while (strategy != null);
- }
-
- elapsedTime = System.currentTimeMillis() - startTime;
-
- long timeToWait = Math.min(
- Math.max(timeout.toMillis() - elapsedTime, 0),
- currentSleepTime);
-
- if (timeToWait > 0) {
- synchronized (retrievalLock) {
- try {
- retrievalLock.wait(timeToWait);
- } catch (InterruptedException e) {
- throw new Exception("Finding connecting address was interrupted while pausing.");
- }
- }
-
- elapsedTime = System.currentTimeMillis() - startTime;
- }
- }
-
- InetAddress heuristic = null;
-
- if (targetAddress != null) {
- LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
- heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
- }
-
- if (heuristic != null) {
- return heuristic;
- } else {
- LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
- return InetAddress.getLocalHost();
- }
- } catch (Exception e) {
- throw new LeaderRetrievalException("Could not retrieve the connecting address to the " +
- "current leader with the akka URL " + akkaURL + ".", e);
- }
- }
-
- @Override
- public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
- if (leaderAddress != null && !leaderAddress.equals("")) {
- synchronized (retrievalLock) {
- akkaURL = leaderAddress;
- retrievalState = LeaderRetrievalState.NEWLY_RETRIEVED;
-
- retrievalLock.notifyAll();
- }
- }
- }
-
- @Override
- public void handleError(Exception exception) {
- synchronized (retrievalLock) {
- this.exception = exception;
- retrievalLock.notifyAll();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 704a482..5cf5bff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.net.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -149,7 +149,7 @@ public class LeaderRetrievalUtils {
public static InetAddress findConnectingAddress(
LeaderRetrievalService leaderRetrievalService,
FiniteDuration timeout) throws LeaderRetrievalException {
- NetUtils.LeaderConnectingAddressListener listener = new NetUtils.LeaderConnectingAddressListener();
+ ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener();
try {
leaderRetrievalService.start(listener);
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index aba9c60..7a1bec5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -60,7 +60,7 @@ import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 420199c..bcf8837 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -26,10 +26,12 @@ import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.NetUtils;
+
import org.junit.Test;
+
import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 538901f..bf01422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -19,8 +19,10 @@
package org.apache.flink.runtime.io.network.netty;
import io.netty.channel.Channel;
+
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
+
import scala.Tuple2;
import java.net.InetAddress;
@@ -48,10 +50,7 @@ public class NettyTestUtil {
server.init(protocol);
}
catch (Exception e) {
- if (server != null) {
- server.shutdown();
- }
-
+ server.shutdown();
throw e;
}
@@ -65,10 +64,7 @@ public class NettyTestUtil {
client.init(protocol);
}
catch (Exception e) {
- if (client != null) {
- client.shutdown();
- }
-
+ client.shutdown();
throw e;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 6125479..c52ec2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -30,7 +30,7 @@ import com.google.common.io.Files;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OperatingSystem;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index bba1460..edc1f92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
new file mode 100644
index 0000000..e003634
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the network utilities.
+ */
+public class ConnectionUtilsTest {
+
+ @Test
+ public void testFindConnectableAddress() {
+ int unusedPort;
+ try {
+ unusedPort = org.apache.flink.util.NetUtils.getAvailablePort();
+ }
+ catch (Throwable t) {
+ // if this system cannot find an available port,
+ // skip this test
+ return;
+ }
+
+ try {
+ // create an unreachable target address
+ InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
+
+ final long start = System.currentTimeMillis();
+ InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);
+
+ // check that it did not take forever
+ assertTrue(System.currentTimeMillis() - start < (OperatingSystem.isWindows() ? 30000 : 8000));
+
+ // we should have found a heuristic address
+ assertNotNull(add);
+
+ // these checks are desirable, but will not work on every machine
+ // such as machines with no connected network media, which may
+ // default to a link local address
+ // assertFalse(add.isLinkLocalAddress());
+ // assertFalse(add.isLoopbackAddress());
+ // assertFalse(add.isAnyLocalAddress());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
deleted file mode 100644
index fe4c82c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.net;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- * Tests for the network utilities.
- */
-public class NetUtilsTest {
-
- @Test
- public void testFindConnectableAddress() {
- int unusedPort;
- try {
- unusedPort = NetUtils.getAvailablePort();
- }
- catch (Throwable t) {
- // if this system cannot find an available port,
- // skip this test
- return;
- }
-
- try {
- // create an unreachable target address
- InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
-
- final long start = System.currentTimeMillis();
- InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
-
- // check that it did not take forever
- assertTrue(System.currentTimeMillis() - start < (OperatingSystem.isWindows() ? 30000 : 8000));
-
- // we should have found a heuristic address
- assertNotNull(add);
-
- // these checks are desirable, but will not work on every machine
- // such as machines with no connected network media, which may
- // default to a link local address
- // assertFalse(add.isLinkLocalAddress());
- // assertFalse(add.isLoopbackAddress());
- // assertFalse(add.isAnyLocalAddress());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 075c1c2..1334bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
-
import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+
import org.junit.Test;
import scala.Some;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 7b0b8f7..461138c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -57,10 +57,10 @@ import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.NetUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index b0fe695..6013309 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.util.NetUtils
import org.junit.Assert._
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index d762ab4..795ad4e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -28,10 +28,9 @@ import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
-import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
-import org.apache.flink.runtime.webmonitor.WebMonitor
import scala.concurrent.{Await, Future}
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index cfc104b..61f384a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -33,11 +33,11 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.common.PartitionInfo;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
index a6e1e7e..4ded0fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
@@ -19,10 +19,11 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.NetUtils;
+
import org.junit.Assert;
import java.io.BufferedReader;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
index 37f6958..d1bd64a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
@@ -17,8 +17,9 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.NetUtils;
+
import org.junit.Assert;
import java.io.PrintWriter;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index 8abe52e..affe134 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
+
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -29,13 +30,16 @@ import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f1745ee..ac308dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index bd1698a..a1a5205 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -28,7 +28,7 @@ import akka.util.Timeout;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.hadoop.conf.Configuration;
@@ -143,7 +143,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
LOG.info("Start actor system.");
// find name of own public interface, able to connect to the JM
// try to find address for 2 seconds. log after 400 ms.
- InetAddress ownHostname = NetUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
+ InetAddress ownHostname = ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
actorSystem = AkkaUtils.createActorSystem(flinkConfig,
new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
[2/3] flink git commit: [FLINK-2766] [core] Add proper handling of
IPv6 address literals in URLs
Posted by se...@apache.org.
[FLINK-2766] [core] Add proper handling of IPv6 address literals in URLs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfde1b73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfde1b73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfde1b73
Branch: refs/heads/master
Commit: bfde1b7379f0a0aef21c836e5ac0842474c7f98f
Parents: a3c0b44
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 29 17:45:06 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:56 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/util/NetUtils.java | 80 +++++++++
.../org/apache/flink/util/NetUtilsTest.java | 97 +++++++++++
.../apache/flink/runtime/client/JobClient.java | 7 +-
.../flink/runtime/net/ConnectionUtils.java | 2 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 42 +++--
.../flink/runtime/jobmanager/JobManager.scala | 20 +--
.../runtime/minicluster/FlinkMiniCluster.scala | 6 +-
.../flink/runtime/taskmanager/TaskManager.scala | 7 +-
.../TaskManagerProcessReapingTest.java | 9 +-
.../flink/runtime/akka/AkkaUtilsTest.scala | 46 +++++-
.../fs/RollingSinkFaultTolerance2ITCase.java | 9 +-
.../fs/RollingSinkFaultToleranceITCase.java | 9 +-
.../connectors/fs/RollingSinkITCase.java | 5 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 27 ++-
.../connectors/kafka/KafkaTestBase.java | 3 +-
.../test/util/ForkableFlinkMiniCluster.scala | 2 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 165 +++++++++++++++++++
17 files changed, 480 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 03721c5..da445ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -15,13 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.util;
+import com.google.common.net.InetAddresses;
import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
+import java.net.UnknownHostException;
public class NetUtils {
@@ -68,6 +75,10 @@ public class NetUtils {
}
}
+ // ------------------------------------------------------------------------
+ // Lookup of to free ports
+ // ------------------------------------------------------------------------
+
/**
* Find a non-occupied port.
*
@@ -86,4 +97,73 @@ public class NetUtils {
throw new RuntimeException("Could not find a free permitted port on the machine.");
}
+
+
+ // ------------------------------------------------------------------------
+ // Encoding of IP addresses for URLs
+ // ------------------------------------------------------------------------
+
+ /**
+ * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
+ * have the proper formatting to be included in URLs.
+ * <p>
+ * This method internally uses Guava's functionality to properly encode IPv6 addresses.
+ *
+ * @param address The IP address to encode.
+ * @return The proper URL string encoded IP address.
+ */
+ public static String ipAddressToUrlString(InetAddress address) {
+ if (address == null) {
+ throw new NullPointerException("address is null");
+ }
+ else if (address instanceof Inet4Address) {
+ return address.getHostAddress();
+ }
+ else if (address instanceof Inet6Address) {
+ return '[' + InetAddresses.toAddrString(address) + ']';
+ }
+ else {
+ throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);
+ }
+ }
+
+ /**
+ * Encodes an IP address and port to be included in URL. in particular, this method makes
+ * sure that IPv6 addresses have the proper formatting to be included in URLs.
+ *
+ * @param address The address to be included in the URL.
+ * @param port The port for the URL address.
+ * @return The proper URL string encoded IP address and port.
+ */
+ public static String ipAddressAndPortToUrlString(InetAddress address, int port) {
+ return ipAddressToUrlString(address) + ':' + port;
+ }
+
+ /**
+ * Encodes an IP address and port to be included in URL. in particular, this method makes
+ * sure that IPv6 addresses have the proper formatting to be included in URLs.
+ *
+ * @param address The socket address with the IP address and port.
+ * @return The proper URL string encoded IP address and port.
+ */
+ public static String socketAddressToUrlString(InetSocketAddress address) {
+ if (address.isUnresolved()) {
+ throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());
+ }
+ return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());
+ }
+
+ /**
+ * Normalizes and encodes a hostname and port to be included in URL.
+ * In particular, this method makes sure that IPv6 address literals have the proper
+ * formatting to be included in URLs.
+ *
+ * @param host The address to be included in the URL.
+ * @param port The port for the URL address.
+ * @return The proper URL string encoded IP address and port.
+ * @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into a URL.
+ */
+ public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
+ return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
new file mode 100644
index 0000000..cd2c13b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.*;
+
+public class NetUtilsTest {
+
+ @Test
+ public void testIPv4toURL() {
+ try {
+ final String addressString = "192.168.0.1";
+
+ InetAddress address = InetAddress.getByName(addressString);
+ assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIPv6toURL() {
+ try {
+ final String addressString = "2001:01db8:00:0:00:ff00:42:8329";
+ final String normalizedAddress = "[2001:1db8::ff00:42:8329]";
+
+ InetAddress address = InetAddress.getByName(addressString);
+ assertEquals(normalizedAddress, NetUtils.ipAddressToUrlString(address));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIPv4URLEncoding() {
+ try {
+ final String addressString = "10.244.243.12";
+ final int port = 23453;
+
+ InetAddress address = InetAddress.getByName(addressString);
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+
+ assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+ assertEquals(addressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address, port));
+ assertEquals(addressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIPv6URLEncoding() {
+ try {
+ final String addressString = "2001:db8:10:11:12:ff00:42:8329";
+ final String bracketedAddressString = '[' + addressString + ']';
+ final int port = 23453;
+
+ InetAddress address = InetAddress.getByName(addressString);
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+
+ assertEquals(bracketedAddressString, NetUtils.ipAddressToUrlString(address));
+ assertEquals(bracketedAddressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address, port));
+ assertEquals(bracketedAddressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index c51bc7c..a436881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,9 +75,11 @@ public class JobClient {
ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
Address address = system.provider().getDefaultAddress();
- String host = address.host().isDefined() ? address.host().get() : "(unknown)";
+ String hostAddress = address.host().isDefined() ?
+ NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
+ "(unknown)";
int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
- LOG.info("Started JobClient actor system at " + host + ':' + port);
+ LOG.info("Started JobClient actor system at " + hostAddress + ':' + port);
return system;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 0ed9345..542e69e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -365,7 +365,7 @@ public class ConnectionUtils {
boolean logging = elapsedTime >= startLoggingAfter.toMillis();
if (logging) {
- LOG.info("Trying to connect to address {}." + targetAddress);
+ LOG.info("Trying to connect to address {}", targetAddress);
}
do {
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 8007ef6..bf679c9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -19,13 +19,14 @@
package org.apache.flink.runtime.akka
import java.io.IOException
-import java.net.{InetSocketAddress, InetAddress}
+import java.net._
import java.util.concurrent.{TimeUnit, Callable}
import akka.actor._
import akka.pattern.{ask => akkaAsk}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.util.NetUtils
import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
import org.slf4j.LoggerFactory
import scala.concurrent._
@@ -102,6 +103,7 @@ object AkkaUtils {
* then an Akka config for local actor system will be returned
* @return Akka config
*/
+ @throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
listeningAddress: Option[(String, Int)]): Config = {
val defaultConfig = getBasicAkkaConfig(configuration)
@@ -109,8 +111,9 @@ object AkkaUtils {
listeningAddress match {
case Some((hostname, port)) =>
- val ipAddress = "\"" + InetAddress.getByName(hostname).getHostAddress() + "\""
- val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port)
+ val ipAddress = InetAddress.getByName(hostname)
+ val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\""
+ val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port)
remoteConfig.withFallback(defaultConfig)
case None =>
@@ -513,23 +516,30 @@ object AkkaUtils {
* the Akka URL does not contain the hostname and port information, e.g. a local Akka URL is
* provided, then an [[Exception]] is thrown.
*
- * @param akkaURL
- * @throws java.lang.Exception
- * @return
+ * @param akkaURL The URL to extract the host and port from.
+ * @throws java.lang.Exception Thrown, if the given string does not represent a proper url
+ * @return The InetSocketAddress with teh extracted host and port.
*/
@throws(classOf[Exception])
def getInetSockeAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
// AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL
- val hostPortRegex = """@([^/:]*):(\d*)""".r
-
- hostPortRegex.findFirstMatchIn(akkaURL) match {
- case Some(m) =>
- val host = m.group(1)
- val port = m.group(2).toInt
-
- new InetSocketAddress(host, port)
- case None => throw new Exception("Could not retrieve InetSocketAddress from " +
- s"Akka URL $akkaURL")
+ try {
+ // we need to manually strip the protocol, because "akka.tcp" is not
+ // a valid protocol for Java's URL class
+ val protocolonPos = akkaURL.indexOf("://")
+ if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) {
+ throw new MalformedURLException()
+ }
+
+ val url = new URL("http://" + akkaURL.substring(protocolonPos + 3))
+ if (url.getHost == null || url.getPort == -1) {
+ throw new MalformedURLException()
+ }
+ new InetSocketAddress(url.getHost, url.getPort)
+ }
+ catch {
+ case _ : MalformedURLException =>
+ throw new Exception(s"Could not retrieve InetSocketAddress from Akka URL $akkaURL")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 07a5977..b7f76ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,12 +44,8 @@ import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-
-import org.apache.flink.runtime.messages.accumulators.{AccumulatorResultsErroneous,
-AccumulatorResultsFound, RequestAccumulatorResults, AccumulatorMessage,
-AccumulatorResultStringsFound, RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
-AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
@@ -67,7 +63,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
+import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil}
import scala.concurrent._
import scala.concurrent.duration._
@@ -1237,7 +1233,8 @@ object JobManager {
LOG.info("Starting JobManager")
// Bring up the job manager actor system first, bind it to the given address.
- LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
+ val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
+ LOG.info(s"Starting JobManager actor system at $hostPortUrl")
val jobManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1451,8 +1448,9 @@ object JobManager {
val executionMode = config.getJobManagerMode
val streamingMode = config.getStreamingMode
-
- LOG.info(s"Starting JobManager on $host:$port with execution mode $executionMode and " +
+ val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port)
+
+ LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode and " +
s"streaming mode $streamingMode")
(configuration, executionMode, streamingMode, host, port)
@@ -1657,7 +1655,7 @@ object JobManager {
address: InetSocketAddress,
name: Option[String] = None)
: String = {
- val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
+ val hostPort = NetUtils.socketAddressToUrlString(address)
getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 839193b..b3cff51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -76,7 +76,9 @@ abstract class FlinkMiniCluster(
// NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
- val hostname = InetAddress.getByName("localhost").getHostAddress()
+ val hostname = userConfiguration.getString(
+ ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
+ InetAddress.getByName("localhost").getHostAddress())
val configuration = generateConfiguration(userConfiguration)
@@ -243,7 +245,7 @@ abstract class FlinkMiniCluster(
jobManagerActorSystems = Some(jmActorSystems)
jobManagerActors = Some(jmActors)
- val lrs = createLeaderRetrievalService();
+ val lrs = createLeaderRetrievalService()
leaderRetrievalService = Some(lrs)
lrs.start(this)
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7a1bec5..bf23021 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1425,8 +1425,9 @@ object TaskManager {
LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
// Bring up the TaskManager actor system first, bind it to the given address.
-
- LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort))
val taskManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1443,7 +1444,7 @@ object TaskManager {
if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
val cause = t.getCause()
if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
- val address = taskManagerHostname + ":" + actorSystemPort
+ val address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)
throw new IOException("Unable to bind TaskManager actor system to address " +
address + " - " + cause.getMessage(), t)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 1334bcc..ed03ae7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -40,6 +40,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
+import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -78,10 +79,11 @@ public class TaskManagerProcessReapingTest {
tempLogFile.deleteOnExit();
CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+ final InetAddress localhost = InetAddress.getByName("localhost");
final int jobManagerPort = NetUtils.getAvailablePort();
// start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
jmActorSystem = AkkaUtils.createActorSystem(
new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
@@ -109,8 +111,9 @@ public class TaskManagerProcessReapingTest {
// grab the reference to the TaskManager. try multiple times, until the process
// is started and the TaskManager is up
- String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s",
- "127.0.0.1", taskManagerPort, TaskManager.TASK_MANAGER_NAME());
+ String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
+ org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+ TaskManager.TASK_MANAGER_NAME());
ActorRef taskManagerRef = null;
Throwable lastError = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 59477db..4e08857 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.akka
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
import org.apache.flink.runtime.jobmanager.JobManager
import org.junit.runner.RunWith
@@ -64,4 +64,48 @@ class AkkaUtilsTest
result should equal(expected)
}
+ test("getHostFromAkkaURL should handle 'akka.tcp' as protocol") {
+ val url = "akka.tcp://flink@localhost:1234/user/jobmanager"
+ val expected = new InetSocketAddress("localhost", 1234)
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(expected)
+ }
+
+ test("getHostFromAkkaURL should properly handle IPv4 addresses in URLs") {
+ val IPv4AddressString = "192.168.0.1"
+ val port = 1234
+ val address = new InetSocketAddress(IPv4AddressString, port)
+
+ val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(address)
+ }
+
+ test("getHostFromAkkaURL should properly handle IPv6 addresses in URLs") {
+ val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+ val port = 1234
+ val address = new InetSocketAddress(IPv6AddressString, port)
+
+ val url = s"akka://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(address)
+ }
+
+ test("getHostFromAkkaURL should properly handle IPv6 addresses in 'akka.tcp' URLs") {
+ val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+ val port = 1234
+ val address = new InetSocketAddress(IPv6AddressString, port)
+
+ val url = s"akka.tcp://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+ val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+ result should equal(address)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
index 9c70ed2..7d127ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -69,7 +70,6 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
private static MiniDFSCluster hdfsCluster;
private static org.apache.hadoop.fs.FileSystem dfs;
- private static String hdfsURI;
private static String outPath;
@@ -87,10 +87,9 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
dfs = hdfsCluster.getFileSystem();
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
- outPath = hdfsURI + "/string-non-rolling-out-no-checkpoint";
-
+ outPath = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/string-non-rolling-out-no-checkpoint";
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index e0592e9..65904d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -64,7 +65,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
private static MiniDFSCluster hdfsCluster;
private static org.apache.hadoop.fs.FileSystem dfs;
- private static String hdfsURI;
private static String outPath;
@@ -82,10 +82,9 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
dfs = hdfsCluster.getFileSystem();
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
- outPath = hdfsURI + "/string-non-rolling-out";
-
+ outPath = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/string-non-rolling-out";
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 008b4b6..9770f41 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -79,7 +80,9 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
dfs = hdfsCluster.getFileSystem();
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+ hdfsURI = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/";
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 5016e7e..e9a5728 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -69,6 +69,7 @@ import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Assert;
@@ -841,7 +842,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while (firstPart.errorCode() != 0);
zkClient.close();
- final String leaderToShutDown = firstPart.leader().get().connectionString();
+ final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
+ final String leaderToShutDownConnection =
+ NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
+
+
final int leaderIdToShutDown = firstPart.leader().get().id();
LOG.info("Leader to shutdown {}", leaderToShutDown);
@@ -863,7 +868,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env
.addSource(kafkaSource)
.map(new PartitionValidatingMapper(parallelism, 1))
- .map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+ .map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
BrokerKillingMapper.killedLeaderBefore = false;
@@ -1068,14 +1073,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// shut down a Kafka broker
KafkaServer toShutDown = null;
for (KafkaServer kafkaServer : brokers) {
- if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+ String connectionUrl =
+ NetUtils.hostAndPortToUrlString(
+ kafkaServer.config().advertisedHostName(),
+ kafkaServer.config().advertisedPort());
+ if (leaderToShutDown.equals(connectionUrl)) {
toShutDown = kafkaServer;
break;
}
}
if (toShutDown == null) {
- throw new Exception("Cannot find broker to shut down");
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer kafkaServer : brokers) {
+ listOfBrokers.append(
+ NetUtils.hostAndPortToUrlString(
+ kafkaServer.config().advertisedHostName(),
+ kafkaServer.config().advertisedPort()));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
+ + " ; available brokers: " + listOfBrokers.toString());
}
else {
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 61f384a..d511796 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -59,6 +59,7 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -148,7 +149,7 @@ public abstract class KafkaTestBase extends TestLogger {
SocketServer socketServer = brokers.get(i).socketServer();
String host = socketServer.host() == null ? "localhost" : socketServer.host();
- brokerConnectionStrings += host+":"+socketServer.port()+",";
+ brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
}
LOG.info("ZK and KafkaServer started.");
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c8b0e0c..5c5a465 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -96,7 +96,7 @@ class ForkableFlinkMiniCluster(
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
- if(jobManagerPort > 0) {
+ if (jobManagerPort > 0) {
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
new file mode 100644
index 0000000..af51ed6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.Some;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.util.Enumeration;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class IPv6HostnamesITCase extends TestLogger {
+
+ @Test
+ public void testClusterWithIPv6host() {
+
+ final Inet6Address ipv6address = getLocalIPv6Address();
+ if (ipv6address == null) {
+ System.err.println("--- Cannot find a non-loopback local IPv6 address, skipping IPv6HostnamesITCase");
+ return;
+ }
+
+
+
+ ForkableFlinkMiniCluster flink = null;
+ try {
+ final String addressString = ipv6address.getHostAddress();
+ log.info("Test will use IPv6 address " + addressString + " for connection tests");
+
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
+ conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
+ conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+
+ flink = new ForkableFlinkMiniCluster(conf, false, StreamingMode.BATCH_ONLY);
+ flink.start();
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());
+
+ // get input data
+ DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+ DataSet<Tuple2<String, Integer>> counts =text
+ .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (String token : value.toLowerCase().split("\\W+")) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ })
+ .groupBy(0).sum(1);
+
+ List<Tuple2<String, Integer>> result = counts.collect();
+
+ TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (flink != null) {
+ flink.shutdown();
+ }
+ }
+ }
+
+
+ private Inet6Address getLocalIPv6Address() {
+ try {
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+ NetworkInterface netInterface = e.nextElement();
+
+ // for each address of the network interface
+ Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress addr = ee.nextElement();
+
+
+ if (addr instanceof Inet6Address && (!addr.isLoopbackAddress()) && (!addr.isAnyLocalAddress())) {
+ // see if it is possible to bind to the address
+ InetSocketAddress socketAddress = new InetSocketAddress(addr, 0);
+
+ try {
+ log.info("Considering address " + addr);
+
+ // test whether we can bind a socket to that address
+ log.info("Testing whether sockets can bind to " + addr);
+ ServerSocket sock = new ServerSocket();
+ sock.bind(socketAddress);
+ sock.close();
+
+ // test whether Akka's netty can bind to the address
+ log.info("Testing whether Akka can use " + addr);
+ int port = NetUtils.getAvailablePort();
+ ActorSystem as = AkkaUtils.createActorSystem(
+ new Configuration(),
+ new Some<scala.Tuple2<String, Object>>(new scala.Tuple2<String, Object>(addr.getHostAddress(), port)));
+ as.shutdown();
+
+ log.info("Using address " + addr);
+ return (Inet6Address) addr;
+ }
+ catch (IOException ignored) {
+ // fall through the loop
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+}
[3/3] flink git commit: [hotfix] [build] Add joda-convert as a
non-optional dependency, to fix Scala interoperability
Posted by se...@apache.org.
[hotfix] [build] Add joda-convert as a non-optional dependency, to fix Scala interoperability
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c04a7704
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c04a7704
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c04a7704
Branch: refs/heads/master
Commit: c04a770421c6e13273307d6aaf9de462c4398360
Parents: bfde1b7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 30 14:51:07 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:56 2015 +0200
----------------------------------------------------------------------
flink-java/pom.xml | 5 +++++
pom.xml | 6 ++++++
2 files changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c04a7704/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 49e6099..c8f88f4 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -87,6 +87,11 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.joda</groupId>
+ <artifactId>joda-convert</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
http://git-wip-us.apache.org/repos/asf/flink/blob/c04a7704/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 573e49c..6cdf7fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -251,6 +251,12 @@ under the License.
<version>2.5</version>
</dependency>
+ <dependency>
+ <groupId>org.joda</groupId>
+ <artifactId>joda-convert</artifactId>
+ <version>1.7</version>
+ </dependency>
+
<!-- stax is pulled in different versions by different transitive dependencies-->
<dependency>
<groupId>stax</groupId>