You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/24 15:57:53 UTC
flink git commit: [FLINK-2996] Introduce configuration parameter for
BlobServer port
Repository: flink
Updated Branches:
refs/heads/master d2b4391f4 -> 5558e7688
[FLINK-2996] Introduce configuration parameter for BlobServer port
This closes #1394
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5558e768
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5558e768
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5558e768
Branch: refs/heads/master
Commit: 5558e7688e777853a23d94edc31971aa1e230d1e
Parents: d2b4391
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Nov 23 16:55:50 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 24 15:56:36 2015 +0100
----------------------------------------------------------------------
docs/setup/config.md | 6 ++
.../flink/configuration/ConfigConstants.java | 13 +++
.../java/org/apache/flink/util/NetUtils.java | 30 ++++++
.../org/apache/flink/util/NetUtilsTest.java | 53 ++++++++++
.../apache/flink/runtime/blob/BlobServer.java | 30 +++++-
.../flink/runtime/blob/BlobServerRangeTest.java | 102 +++++++++++++++++++
6 files changed, 229 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 299b371..8abcc03 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -218,6 +218,12 @@ Note: State backend must be accessible from the JobManager, use file:// only for
- `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
+- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers.
+By default the port is set to 0, which means that the operating system is picking an ephemeral port.
+Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both.
+It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running
+on the same machine.
+
- `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values
have to be specified as strings with a unit.
http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d331548..251ea9c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -82,6 +82,14 @@ public final class ConfigConstants {
public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";
/**
+ * The config parameter defining the server port of the blob service.
+ * The port can either be a port, such as "9123",
+ * a range of ports: "50100-50200"
+ * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+ */
+ public static final String BLOB_SERVER_PORT = "blob.server.port";
+
+ /**
* The config parameter defining the cleanup interval of the library cache manager.
*/
public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
@@ -503,6 +511,11 @@ public final class ConfigConstants {
public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
/**
+ * Default BLOB server port. 0 means ephemeral port.
+ */
+ public static final String DEFAULT_BLOB_SERVER_PORT = "0";
+
+ /**
* The default network port the task manager expects incoming IPC connections. The {@code 0} means that
* the TaskManager searches for a free port.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index da445ec..0ba8820 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -29,6 +29,8 @@ import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Set;
public class NetUtils {
@@ -166,4 +168,32 @@ public class NetUtils {
public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
}
+
+ /**
+ * Returns a set of available ports defined by the range definition.
+ *
+ * @param rangeDefinition String describing a single port, a range of ports or multiple ranges.
+ * @return Set of ports from the range definition
+ * @throws NumberFormatException If an invalid string is passed.
+ */
+ public static Set<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
+ Set<Integer> finalSet = new HashSet<>();
+ final String[] ranges = rangeDefinition.trim().split(",");
+ for(String rawRange: ranges) {
+ String range = rawRange.trim();
+ int dashIdx = range.indexOf('-');
+ if (dashIdx == -1) {
+ // only one port in range:
+ finalSet.add(Integer.valueOf(range));
+ } else {
+ // evaluate range
+ int start = Integer.valueOf(range.substring(0, dashIdx));
+ int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
+ for(int i = start; i <= end; i++) {
+ finalSet.add(i);
+ }
+ }
+ }
+ return finalSet;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index cd2c13b..13a59fa 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -18,11 +18,15 @@
package org.apache.flink.util;
+import org.junit.Assert;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Set;
+import static org.hamcrest.core.IsCollectionContaining.hasItems;
+import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.*;
public class NetUtilsTest {
@@ -94,4 +98,53 @@ public class NetUtilsTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testFreePortRangeUtility() {
+ // inspired by Hadoop's example for "yarn.app.mapreduce.am.job.client.port-range"
+ String rangeDefinition = "50000-50050, 50100-50200,51234 "; // this also contains some whitespaces
+ Set<Integer> ports = NetUtils.getPortRangeFromString(rangeDefinition);
+ Assert.assertEquals(51+101+1, ports.size());
+ // check first range
+ Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050));
+ // check second range and last point
+ Assert.assertThat(ports, hasItems(50100, 50101, 50110, 50200, 51234));
+ // check that only ranges are included
+ Assert.assertThat(ports, not(hasItems(50051, 50052, 1337, 50201, 49999, 50099)));
+
+
+ // test single port "range":
+ ports = NetUtils.getPortRangeFromString(" 51234");
+ Assert.assertEquals(1, ports.size());
+ Assert.assertEquals(51234, (int)ports.iterator().next());
+
+ // test port list
+ ports = NetUtils.getPortRangeFromString("5,1,2,3,4");
+ Assert.assertEquals(5, ports.size());
+ Assert.assertThat(ports, hasItems(1,2,3,4,5));
+
+
+ Throwable error = null;
+
+ // try some wrong values: String
+ try { NetUtils.getPortRangeFromString("localhost"); } catch(Throwable t) { error = t; }
+ Assert.assertTrue(error instanceof NumberFormatException);
+ error = null;
+
+ // incomplete range
+ try { NetUtils.getPortRangeFromString("5-"); } catch(Throwable t) { error = t; }
+ Assert.assertTrue(error instanceof NumberFormatException);
+ error = null;
+
+ // incomplete range
+ try { NetUtils.getPortRangeFromString("-5"); } catch(Throwable t) { error = t; }
+ Assert.assertTrue(error instanceof NumberFormatException);
+ error = null;
+
+ // empty range
+ try { NetUtils.getPortRangeFromString(",5"); } catch(Throwable t) { error = t; }
+ Assert.assertTrue(error instanceof NumberFormatException);
+ error = null;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 851cff4..f4b6c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import java.net.ServerSocket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -137,12 +139,30 @@ public class BlobServer extends Thread implements BlobService {
this.shutdownHook = null;
}
- // start the server
- try {
- this.serverSocket = new ServerSocket(0, backlog);
+ // ----------------------- start the server -------------------
+
+ String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
+ Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange).iterator();
+
+ ServerSocket socketAttempt = null;
+ while(ports.hasNext()) {
+ int port = ports.next();
+ LOG.debug("Trying to open socket on port {}", port);
+ try {
+ socketAttempt = new ServerSocket(port, backlog);
+ break; // we were able to use the port.
+ } catch (IOException | IllegalArgumentException e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Unable to allocate socket on port", e);
+ } else {
+ LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());
+ }
+ }
}
- catch (IOException e) {
- throw new IOException("Could not create BlobServer with automatic port choice.", e);
+ if(socketAttempt == null) {
+ throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
+ } else {
+ this.serverSocket = socketAttempt;
}
// start the server thread
http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
new file mode 100644
index 0000000..36ae8cc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * Tests to ensure that the BlobServer properly starts on a specified range of available ports.
+ */
+public class BlobServerRangeTest extends TestLogger {
+ /**
+ * Start blob server on 0 = pick an ephemeral port
+ */
+ @Test
+ public void testOnEphemeralPort() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
+ BlobServer srv = new BlobServer(conf);
+ }
+
+ /**
+ * Try allocating on an unavailable port
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public void testPortUnavailable() throws IOException {
+ // allocate on an ephemeral port
+ ServerSocket socket = null;
+ try {
+ socket = new ServerSocket(0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("An exception was thrown while preparing the test " + e.getMessage());
+ }
+
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort()));
+
+ // this thing is going to throw an exception
+ try {
+ BlobServer srv = new BlobServer(conf);
+ } finally {
+ socket.close();
+ }
+ }
+
+ /**
+ * Give the BlobServer a choice of three ports, where two of them
+ * are allocated
+ */
+ @Test
+ public void testOnePortAvailable() throws IOException {
+ int numAllocated = 2;
+ ServerSocket[] sockets = new ServerSocket[numAllocated];
+ for(int i = 0; i < numAllocated; i++) {
+ ServerSocket socket = null;
+ try {
+ sockets[i] = new ServerSocket(0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("An exception was thrown while preparing the test " + e.getMessage());
+ }
+ }
+ int availablePort = NetUtils.getAvailablePort();
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
+
+ // this thing is going to throw an exception
+ try {
+ BlobServer srv = new BlobServer(conf);
+ Assert.assertEquals(availablePort, srv.getPort());
+ srv.shutdown();
+ } finally {
+ sockets[0].close();
+ sockets[1].close();
+ }
+ }
+}