You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 12:10:33 UTC

[10/14] flink git commit: [FLINK-6518] Port blobserver config parameters to ConfigOptions

[FLINK-6518] Port blobserver config parameters to ConfigOptions

This closes #3865.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e61a01b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e61a01b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e61a01b

Branch: refs/heads/release-1.3
Commit: 5e61a01bae6d05bf1d5c76bc48f0ba90bbdef752
Parents: 9fe6135
Author: zentol <ch...@apache.org>
Authored: Wed May 10 10:26:19 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 14:07:26 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/BlobServerOptions.java  | 76 ++++++++++++++++++++
 .../flink/configuration/ConfigConstants.java    | 40 ++++++-----
 .../apache/flink/runtime/blob/BlobCache.java    |  9 ++-
 .../apache/flink/runtime/blob/BlobClient.java   |  5 +-
 .../apache/flink/runtime/blob/BlobServer.java   | 22 +++---
 .../flink/runtime/blob/BlobClientSslTest.java   |  5 +-
 .../flink/runtime/blob/BlobServerRangeTest.java |  8 +--
 .../jobmanager/JobManagerStartupTest.java       |  4 +-
 8 files changed, 125 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
new file mode 100644
index 0000000..e27c29f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for the BlobServer.
+ */
+@PublicEvolving
+public class BlobServerOptions {
+
+	/**
+	 * The config parameter defining the storage directory to be used by the blob server.
+	 */
+	public static final ConfigOption<String> STORAGE_DIRECTORY =
+		key("blob.storage.directory")
+			.noDefaultValue();
+
+	/**
+	 * The config parameter defining number of retires for failed BLOB fetches.
+	 */
+	public static final ConfigOption<Integer> FETCH_RETRIES =
+		key("blob.fetch.retries")
+			.defaultValue(5);
+
+	/**
+	 * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
+	 */
+	public static final ConfigOption<Integer> FETCH_CONCURRENT =
+		key("blob.fetch.num-concurrent")
+			.defaultValue(50);
+
+	/**
+	 * The config parameter defining the backlog of BLOB fetches on the JobManager.
+	 */
+	public static final ConfigOption<Integer> FETCH_BACKLOG =
+		key("blob.fetch.backlog")
+			.defaultValue(1000);
+
+	/**
+	 * The config parameter defining the server port of the blob service.
+	 * The port can either be a port, such as "9123",
+	 * a range of ports: "50100-50200"
+	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+	 *
+	 * Setting the port to 0 will let the OS choose an available port.
+	 */
+	public static final ConfigOption<String> PORT =
+		key("blob.server.port")
+			.defaultValue("0");
+
+	/**
+	 * Flag to override ssl support for the blob service transport.
+	 */
+	public static final ConfigOption<Boolean> SSL_ENABLED =
+		key("blob.service.ssl.enabled")
+			.defaultValue(true);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c3704be..b5b5486 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -139,36 +139,39 @@ public final class ConfigConstants {
 	public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";
 
 	/**
-	 * The config parameter defining the storage directory to be used by the blob server.
+	 * @deprecated use {@link BlobServerOptions#STORAGE_DIRECTORY} instead
 	 */
+	@Deprecated
 	public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";
 
 	/**
-	 * The config parameter defining number of retires for failed BLOB fetches.
+	 * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
 	 */
+	@Deprecated
 	public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries";
 
 	/**
-	 * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
+	 * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
 	 */
+	@Deprecated
 	public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent";
 
 	/**
-	 * The config parameter defining the backlog of BLOB fetches on the JobManager
+	 * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
 	 */
+	@Deprecated
 	public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";
 
 	/**
-	 * The config parameter defining the server port of the blob service.
-	 * The port can either be a port, such as "9123",
-	 * a range of ports: "50100-50200"
-	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
-	 *
-	 * Setting the port to 0 will let the OS choose an available port.
+	 * @deprecated use {@link BlobServerOptions#PORT} instead
 	 */
+	@Deprecated
 	public static final String BLOB_SERVER_PORT = "blob.server.port";
 
-	/** Flag to override ssl support for the blob service transport */
+	/**
+	 * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
+	 */
+	@Deprecated
 	public static final String BLOB_SERVICE_SSL_ENABLED = "blob.service.ssl.enabled";
 
 	/**
@@ -1094,28 +1097,33 @@ public final class ConfigConstants {
 	public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
 	/**
-	 * The default value to override ssl support for blob service transport
+	 * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
 	 */
+	@Deprecated
 	public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true;
 
 	/**
-	 * Default number of retries for failed BLOB fetches.
+	 * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
 	 */
+	@Deprecated
 	public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
 
 	/**
-	 * Default number of concurrent BLOB fetch operations.
+	 * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
 	 */
+	@Deprecated
 	public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;
 
 	/**
-	 * Default BLOB fetch connection backlog.
+	 * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
 	 */
+	@Deprecated
 	public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
 
 	/**
-	 * Default BLOB server port. 0 means ephemeral port.
+	 * @deprecated use {@link BlobServerOptions#PORT} instead
 	 */
+	@Deprecated
 	public static final String DEFAULT_BLOB_SERVER_PORT = "0";
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 2587b15..23c7e63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
@@ -129,19 +129,18 @@ public final class BlobCache implements BlobService {
 		this.blobStore = blobStore;
 
 		// configure and create the storage directory
-		String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB cache storage directory " + storageDir);
 
 		// configure the number of fetch retries
-		final int fetchRetries = blobClientConfig.getInteger(
-			ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
+		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
 		if (fetchRetries >= 0) {
 			this.numFetchRetries = fetchRetries;
 		}
 		else {
 			LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.",
-				ConfigConstants.BLOB_FETCH_RETRIES_KEY);
+				BlobServerOptions.FETCH_RETRIES.key());
 			this.numFetchRetries = 0;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ea90f54..49e54a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -92,8 +92,7 @@ public final class BlobClient implements Closeable {
 			// Check if ssl is enabled
 			SSLContext clientSSLContext = null;
 			if (clientConfig != null &&
-				clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
-						ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+				clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 
 				clientSSLContext = SSLUtils.createSSLClientContext(clientConfig);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 8a70559..0e15777 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -111,34 +111,32 @@ public class BlobServer extends Thread implements BlobService {
 		this.blobStore = checkNotNull(blobStore);
 
 		// configure and create the storage directory
-		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+		String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
 		// configure the maximum number of concurrent connections
-		final int maxConnections = config.getInteger(
-				ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+		final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
 		if (maxConnections >= 1) {
 			this.maxConnections = maxConnections;
 		}
 		else {
 			LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}",
-					maxConnections, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
-			this.maxConnections = ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT;
+					maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue());
+			this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue();
 		}
 
 		// configure the backlog of connections
-		int backlog = config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+		int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG);
 		if (backlog < 1) {
 			LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}",
-					backlog, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
-			backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
+					backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue());
+			backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue();
 		}
 
 		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
-		if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
-				ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+		if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 			try {
 				serverSSLContext = SSLUtils.createSSLServerContext(config);
 			} catch (Exception e) {
@@ -148,7 +146,7 @@ public class BlobServer extends Thread implements BlobService {
 
 		//  ----------------------- start the server -------------------
 
-		String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
+		String serverPortRange = config.getString(BlobServerOptions.PORT);
 
 		Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..27603d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -31,6 +31,7 @@ import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
@@ -91,7 +92,7 @@ public class BlobClientSslTest {
 		try {
 			Configuration config = new Configuration();
 			config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-			config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+			config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
 			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
 			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
 			config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
@@ -104,7 +105,7 @@ public class BlobClientSslTest {
 
 		clientConfig = new Configuration();
 		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-		clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+		clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
 		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
 		clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index ea0eb94..c3762aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -38,7 +38,7 @@ public class BlobServerRangeTest extends TestLogger {
 	@Test
 	public void testOnEphemeralPort() throws IOException {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
+		conf.setString(BlobServerOptions.PORT, "0");
 		BlobServer srv = new BlobServer(conf);
 		srv.shutdown();
 	}
@@ -59,7 +59,7 @@ public class BlobServerRangeTest extends TestLogger {
 		}
 
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort()));
+		conf.setString(BlobServerOptions.PORT, String.valueOf(socket.getLocalPort()));
 
 		// this thing is going to throw an exception
 		try {
@@ -88,7 +88,7 @@ public class BlobServerRangeTest extends TestLogger {
 		}
 		int availablePort = NetUtils.getAvailablePort();
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
+		conf.setString(BlobServerOptions.PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
 
 		// this thing is going to throw an exception
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 9ac6873..a906d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -30,7 +30,7 @@ import java.util.List;
 
 import com.google.common.io.Files;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
@@ -130,7 +130,7 @@ public class JobManagerStartupTest extends TestLogger {
 		}
 		Configuration failConfig = new Configuration();
 		String nonExistDirectory = new File(blobStorageDirectory, DOES_NOT_EXISTS_NO_SIR).getAbsolutePath();
-		failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
+		failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory);
 
 		try {
 			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);