You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/24 15:57:53 UTC

flink git commit: [FLINK-2996] Introduce configuration parameter for BlobServer port

Repository: flink
Updated Branches:
  refs/heads/master d2b4391f4 -> 5558e7688


[FLINK-2996] Introduce configuration parameter for BlobServer port

This closes #1394


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5558e768
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5558e768
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5558e768

Branch: refs/heads/master
Commit: 5558e7688e777853a23d94edc31971aa1e230d1e
Parents: d2b4391
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Nov 23 16:55:50 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 24 15:56:36 2015 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   6 ++
 .../flink/configuration/ConfigConstants.java    |  13 +++
 .../java/org/apache/flink/util/NetUtils.java    |  30 ++++++
 .../org/apache/flink/util/NetUtilsTest.java     |  53 ++++++++++
 .../apache/flink/runtime/blob/BlobServer.java   |  30 +++++-
 .../flink/runtime/blob/BlobServerRangeTest.java | 102 +++++++++++++++++++
 6 files changed, 229 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 299b371..8abcc03 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -218,6 +218,12 @@ Note: State backend must be accessible from the JobManager, use file:// only for
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
 
+- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers.
+By default the port is set to 0, which means that the operating system is picking an ephemeral port.
+Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both.
+It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running
+on the same machine.
+
 - `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values
 have to be specified as strings with a unit.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d331548..251ea9c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -82,6 +82,14 @@ public final class ConfigConstants {
 	public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";
 
 	/**
+	 * The config parameter defining the server port of the blob service.
+	 * The port can either be a port, such as "9123",
+	 * a range of ports: "50100-50200"
+	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+	 */
+	public static final String BLOB_SERVER_PORT = "blob.server.port";
+
+	/**
 	 * The config parameter defining the cleanup interval of the library cache manager.
 	 */
 	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
@@ -503,6 +511,11 @@ public final class ConfigConstants {
 	public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
 
 	/**
+	 * Default BLOB server port. 0 means ephemeral port.
+	 */
+	public static final String DEFAULT_BLOB_SERVER_PORT = "0";
+
+	/**
 	 * The default network port the task manager expects incoming IPC connections. The {@code 0} means that
 	 * the TaskManager searches for a free port.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index da445ec..0ba8820 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -29,6 +29,8 @@ import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Set;
 
 public class NetUtils {
 	
@@ -166,4 +168,32 @@ public class NetUtils {
 	public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
 		return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
 	}
+
+	/**
+	 * Returns a set of available ports defined by the range definition.
+	 *
+	 * @param rangeDefinition String describing a single port, a range of ports or multiple ranges.
+	 * @return Set of ports from the range definition
+	 * @throws NumberFormatException If an invalid string is passed.
+	 */
+	public static Set<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
+		Set<Integer> finalSet = new HashSet<>();
+		final String[] ranges = rangeDefinition.trim().split(",");
+		for(String rawRange: ranges) {
+			String range = rawRange.trim();
+			int dashIdx = range.indexOf('-');
+			if (dashIdx == -1) {
+				// only one port in range:
+				finalSet.add(Integer.valueOf(range));
+			} else {
+				// evaluate range
+				int start = Integer.valueOf(range.substring(0, dashIdx));
+				int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
+				for(int i = start; i <= end; i++) {
+					finalSet.add(i);
+				}
+			}
+		}
+		return finalSet;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index cd2c13b..13a59fa 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.util;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Set;
 
+import static org.hamcrest.core.IsCollectionContaining.hasItems;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.*;
 
 public class NetUtilsTest {
@@ -94,4 +98,53 @@ public class NetUtilsTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testFreePortRangeUtility() {
+		// inspired by Hadoop's example for "yarn.app.mapreduce.am.job.client.port-range"
+		String rangeDefinition = "50000-50050, 50100-50200,51234 "; // this also contains some whitespaces
+		Set<Integer> ports = NetUtils.getPortRangeFromString(rangeDefinition);
+		Assert.assertEquals(51+101+1, ports.size());
+		// check first range
+		Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050));
+		// check second range and last point
+		Assert.assertThat(ports, hasItems(50100, 50101, 50110, 50200, 51234));
+		// check that only ranges are included
+		Assert.assertThat(ports, not(hasItems(50051, 50052, 1337, 50201, 49999, 50099)));
+
+
+		// test single port "range":
+		ports = NetUtils.getPortRangeFromString(" 51234");
+		Assert.assertEquals(1, ports.size());
+		Assert.assertEquals(51234, (int)ports.iterator().next());
+
+		// test port list
+		ports = NetUtils.getPortRangeFromString("5,1,2,3,4");
+		Assert.assertEquals(5, ports.size());
+		Assert.assertThat(ports, hasItems(1,2,3,4,5));
+
+
+		Throwable error = null;
+
+		// try some wrong values: String
+		try { NetUtils.getPortRangeFromString("localhost"); } catch(Throwable t) { error = t; }
+		Assert.assertTrue(error instanceof NumberFormatException);
+		error = null;
+
+		// incomplete range
+		try { NetUtils.getPortRangeFromString("5-"); } catch(Throwable t) { error = t; }
+		Assert.assertTrue(error instanceof NumberFormatException);
+		error = null;
+
+		// incomplete range
+		try { NetUtils.getPortRangeFromString("-5"); } catch(Throwable t) { error = t; }
+		Assert.assertTrue(error instanceof NumberFormatException);
+		error = null;
+
+		// empty range
+		try { NetUtils.getPortRangeFromString(",5"); } catch(Throwable t) { error = t; }
+		Assert.assertTrue(error instanceof NumberFormatException);
+		error = null;
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 851cff4..f4b6c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import java.net.ServerSocket;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -137,12 +139,30 @@ public class BlobServer extends Thread implements BlobService {
 			this.shutdownHook = null;
 		}
 
-		// start the server
-		try {
-			this.serverSocket = new ServerSocket(0, backlog);
+		//  ----------------------- start the server -------------------
+
+		String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
+		Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange).iterator();
+
+		ServerSocket socketAttempt = null;
+		while(ports.hasNext()) {
+			int port = ports.next();
+			LOG.debug("Trying to open socket on port {}", port);
+			try {
+				socketAttempt = new ServerSocket(port, backlog);
+				break; // we were able to use the port.
+			} catch (IOException | IllegalArgumentException e) {
+				if(LOG.isDebugEnabled()) {
+					LOG.debug("Unable to allocate socket on port", e);
+				} else {
+					LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());
+				}
+			}
 		}
-		catch (IOException e) {
-			throw new IOException("Could not create BlobServer with automatic port choice.", e);
+		if(socketAttempt == null) {
+			throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
+		} else {
+			this.serverSocket = socketAttempt;
 		}
 
 		// start the server thread

http://git-wip-us.apache.org/repos/asf/flink/blob/5558e768/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
new file mode 100644
index 0000000..36ae8cc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * Tests to ensure that the BlobServer properly starts on a specified range of available ports.
+ */
+public class BlobServerRangeTest extends TestLogger {
+	/**
+	 * Start blob server on 0 = pick an ephemeral port
+	 */
+	@Test
+	public void testOnEphemeralPort() throws IOException {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
+		BlobServer srv = new BlobServer(conf);
+	}
+
+	/**
+	 * Try allocating on an unavailable port
+	 * @throws IOException
+	 */
+	@Test(expected = IOException.class)
+	public void testPortUnavailable() throws IOException {
+		// allocate on an ephemeral port
+		ServerSocket socket = null;
+		try {
+			socket = new ServerSocket(0);
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail("An exception was thrown while preparing the test " + e.getMessage());
+		}
+
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort()));
+
+		// this thing is going to throw an exception
+		try {
+			BlobServer srv = new BlobServer(conf);
+		} finally {
+			socket.close();
+		}
+	}
+
+	/**
+	 * Give the BlobServer a choice of three ports, where two of them
+	 * are allocated
+	 */
+	@Test
+	public void testOnePortAvailable() throws IOException {
+		int numAllocated = 2;
+		ServerSocket[] sockets = new ServerSocket[numAllocated];
+		for(int i = 0; i < numAllocated; i++) {
+			ServerSocket socket = null;
+			try {
+				sockets[i] = new ServerSocket(0);
+			} catch (IOException e) {
+				e.printStackTrace();
+				Assert.fail("An exception was thrown while preparing the test " + e.getMessage());
+			}
+		}
+		int availablePort = NetUtils.getAvailablePort();
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
+
+		// this thing is going to throw an exception
+		try {
+			BlobServer srv = new BlobServer(conf);
+			Assert.assertEquals(availablePort, srv.getPort());
+			srv.shutdown();
+		} finally {
+			sockets[0].close();
+			sockets[1].close();
+		}
+	}
+}