You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 12:10:33 UTC
[10/14] flink git commit: [FLINK-6518] Port blobserver config
parameters to ConfigOptions
[FLINK-6518] Port blobserver config parameters to ConfigOptions
This closes #3865.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e61a01b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e61a01b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e61a01b
Branch: refs/heads/release-1.3
Commit: 5e61a01bae6d05bf1d5c76bc48f0ba90bbdef752
Parents: 9fe6135
Author: zentol <ch...@apache.org>
Authored: Wed May 10 10:26:19 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 14:07:26 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/BlobServerOptions.java | 76 ++++++++++++++++++++
.../flink/configuration/ConfigConstants.java | 40 ++++++-----
.../apache/flink/runtime/blob/BlobCache.java | 9 ++-
.../apache/flink/runtime/blob/BlobClient.java | 5 +-
.../apache/flink/runtime/blob/BlobServer.java | 22 +++---
.../flink/runtime/blob/BlobClientSslTest.java | 5 +-
.../flink/runtime/blob/BlobServerRangeTest.java | 8 +--
.../jobmanager/JobManagerStartupTest.java | 4 +-
8 files changed, 125 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
new file mode 100644
index 0000000..e27c29f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for the BlobServer.
+ */
+@PublicEvolving
+public class BlobServerOptions {
+
+ /**
+ * The config parameter defining the storage directory to be used by the blob server.
+ */
+ public static final ConfigOption<String> STORAGE_DIRECTORY =
+ key("blob.storage.directory")
+ .noDefaultValue();
+
+ /**
+ * The config parameter defining number of retires for failed BLOB fetches.
+ */
+ public static final ConfigOption<Integer> FETCH_RETRIES =
+ key("blob.fetch.retries")
+ .defaultValue(5);
+
+ /**
+ * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
+ */
+ public static final ConfigOption<Integer> FETCH_CONCURRENT =
+ key("blob.fetch.num-concurrent")
+ .defaultValue(50);
+
+ /**
+ * The config parameter defining the backlog of BLOB fetches on the JobManager.
+ */
+ public static final ConfigOption<Integer> FETCH_BACKLOG =
+ key("blob.fetch.backlog")
+ .defaultValue(1000);
+
+ /**
+ * The config parameter defining the server port of the blob service.
+ * The port can either be a port, such as "9123",
+ * a range of ports: "50100-50200"
+ * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+ *
+ * Setting the port to 0 will let the OS choose an available port.
+ */
+ public static final ConfigOption<String> PORT =
+ key("blob.server.port")
+ .defaultValue("0");
+
+ /**
+ * Flag to override ssl support for the blob service transport.
+ */
+ public static final ConfigOption<Boolean> SSL_ENABLED =
+ key("blob.service.ssl.enabled")
+ .defaultValue(true);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c3704be..b5b5486 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -139,36 +139,39 @@ public final class ConfigConstants {
public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";
/**
- * The config parameter defining the storage directory to be used by the blob server.
+ * @deprecated use {@link BlobServerOptions#STORAGE_DIRECTORY} instead
*/
+ @Deprecated
public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";
/**
- * The config parameter defining number of retires for failed BLOB fetches.
+ * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
*/
+ @Deprecated
public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries";
/**
- * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
+ * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
*/
+ @Deprecated
public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent";
/**
- * The config parameter defining the backlog of BLOB fetches on the JobManager
+ * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
*/
+ @Deprecated
public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";
/**
- * The config parameter defining the server port of the blob service.
- * The port can either be a port, such as "9123",
- * a range of ports: "50100-50200"
- * or a list of ranges and or points: "50100-50200,50300-50400,51234"
- *
- * Setting the port to 0 will let the OS choose an available port.
+ * @deprecated use {@link BlobServerOptions#PORT} instead
*/
+ @Deprecated
public static final String BLOB_SERVER_PORT = "blob.server.port";
- /** Flag to override ssl support for the blob service transport */
+ /**
+ * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
+ */
+ @Deprecated
public static final String BLOB_SERVICE_SSL_ENABLED = "blob.service.ssl.enabled";
/**
@@ -1094,28 +1097,33 @@ public final class ConfigConstants {
public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
/**
- * The default value to override ssl support for blob service transport
+ * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
*/
+ @Deprecated
public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true;
/**
- * Default number of retries for failed BLOB fetches.
+ * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
*/
+ @Deprecated
public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
/**
- * Default number of concurrent BLOB fetch operations.
+ * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
*/
+ @Deprecated
public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;
/**
- * Default BLOB fetch connection backlog.
+ * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
*/
+ @Deprecated
public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
/**
- * Default BLOB server port. 0 means ephemeral port.
+ * @deprecated use {@link BlobServerOptions#PORT} instead
*/
+ @Deprecated
public static final String DEFAULT_BLOB_SERVER_PORT = "0";
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 2587b15..23c7e63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.blob;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.util.FileUtils;
@@ -129,19 +129,18 @@ public final class BlobCache implements BlobService {
this.blobStore = blobStore;
// configure and create the storage directory
- String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+ String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB cache storage directory " + storageDir);
// configure the number of fetch retries
- final int fetchRetries = blobClientConfig.getInteger(
- ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
+ final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
if (fetchRetries >= 0) {
this.numFetchRetries = fetchRetries;
}
else {
LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.",
- ConfigConstants.BLOB_FETCH_RETRIES_KEY);
+ BlobServerOptions.FETCH_RETRIES.key());
this.numFetchRetries = 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ea90f54..49e54a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
@@ -92,8 +92,7 @@ public final class BlobClient implements Closeable {
// Check if ssl is enabled
SSLContext clientSSLContext = null;
if (clientConfig != null &&
- clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
- ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+ clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
clientSSLContext = SSLUtils.createSSLClientContext(clientConfig);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 8a70559..0e15777 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -111,34 +111,32 @@ public class BlobServer extends Thread implements BlobService {
this.blobStore = checkNotNull(blobStore);
// configure and create the storage directory
- String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+ String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", storageDir);
// configure the maximum number of concurrent connections
- final int maxConnections = config.getInteger(
- ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+ final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
if (maxConnections >= 1) {
this.maxConnections = maxConnections;
}
else {
LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}",
- maxConnections, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
- this.maxConnections = ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT;
+ maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue());
+ this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue();
}
// configure the backlog of connections
- int backlog = config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+ int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG);
if (backlog < 1) {
LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}",
- backlog, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
- backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
+ backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue());
+ backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue();
}
this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
- if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
- ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+ if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
try {
serverSSLContext = SSLUtils.createSSLServerContext(config);
} catch (Exception e) {
@@ -148,7 +146,7 @@ public class BlobServer extends Thread implements BlobService {
// ----------------------- start the server -------------------
- String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
+ String serverPortRange = config.getString(BlobServerOptions.PORT);
Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..27603d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -31,6 +31,7 @@ import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
@@ -91,7 +92,7 @@ public class BlobClientSslTest {
try {
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
- config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+ config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
@@ -104,7 +105,7 @@ public class BlobClientSslTest {
clientConfig = new Configuration();
clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
- clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+ clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index ea0eb94..c3762aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.blob;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
@@ -38,7 +38,7 @@ public class BlobServerRangeTest extends TestLogger {
@Test
public void testOnEphemeralPort() throws IOException {
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
+ conf.setString(BlobServerOptions.PORT, "0");
BlobServer srv = new BlobServer(conf);
srv.shutdown();
}
@@ -59,7 +59,7 @@ public class BlobServerRangeTest extends TestLogger {
}
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort()));
+ conf.setString(BlobServerOptions.PORT, String.valueOf(socket.getLocalPort()));
// this thing is going to throw an exception
try {
@@ -88,7 +88,7 @@ public class BlobServerRangeTest extends TestLogger {
}
int availablePort = NetUtils.getAvailablePort();
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
+ conf.setString(BlobServerOptions.PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort);
// this thing is going to throw an exception
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/5e61a01b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 9ac6873..a906d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -30,7 +30,7 @@ import java.util.List;
import com.google.common.io.Files;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.StartupUtils;
import org.apache.flink.util.NetUtils;
@@ -130,7 +130,7 @@ public class JobManagerStartupTest extends TestLogger {
}
Configuration failConfig = new Configuration();
String nonExistDirectory = new File(blobStorageDirectory, DOES_NOT_EXISTS_NO_SIR).getAbsolutePath();
- failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
+ failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory);
try {
JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);