You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/13 15:40:14 UTC
[4/8] flink git commit: [FLINK-4768] [core] Migrate high-availability
configuration parameters to ConfigOptions
[FLINK-4768] [core] Migrate high-availability configuration parameters to ConfigOptions
This closes #2607
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8dc074a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8dc074a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8dc074a
Branch: refs/heads/flip-6
Commit: c8dc074a1899fa0f7d6ce7c6377c5e3d30159c18
Parents: d71a09c
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 8 01:41:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/cli/DefaultCLI.java | 5 +-
.../configuration/HighAvailabilityOptions.java | 139 +++++++++++++++++++
.../webmonitor/WebRuntimeMonitorITCase.java | 7 +-
.../flink/runtime/blob/FileSystemBlobStore.java | 22 ++-
.../jobmanager/HighAvailabilityMode.java | 8 +-
.../flink/runtime/security/SecurityContext.java | 11 +-
.../flink/runtime/util/ZooKeeperUtils.java | 68 +++------
.../zookeeper/FlinkZooKeeperQuorumPeer.java | 46 +++---
.../flink/runtime/jobmanager/JobManager.scala | 14 +-
.../flink/runtime/blob/BlobRecoveryITCase.java | 5 +-
.../BlobLibraryCacheRecoveryITCase.java | 5 +-
.../jobmanager/HighAvailabilityModeTest.java | 13 +-
.../jobmanager/JobManagerHARecoveryTest.java | 5 +-
.../ZooKeeperLeaderElectionTest.java | 25 ++--
.../ZooKeeperLeaderRetrievalTest.java | 15 +-
.../runtime/testutils/ZooKeeperTestUtils.java | 13 +-
.../flink/runtime/util/ZooKeeperUtilTest.java | 3 +-
.../zookeeper/ZooKeeperTestEnvironment.java | 10 +-
.../runtime/testingUtils/TestingUtils.scala | 13 +-
.../connectors/fs/RollingSinkSecuredITCase.java | 5 +-
.../flink/test/util/SecureTestEnvironment.java | 3 +-
.../apache/flink/test/util/TestBaseUtils.java | 3 +-
.../flink/test/recovery/ChaosMonkeyITCase.java | 3 +-
...agerHAProcessFailureBatchRecoveryITCase.java | 5 +-
...CliFrontendYarnAddressConfigurationTest.java | 11 +-
.../flink/yarn/YARNHighAvailabilityITCase.java | 3 +-
.../yarn/AbstractYarnClusterDescriptor.java | 5 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 3 +-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 6 +-
29 files changed, 302 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 18fa323..8f79403 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -19,11 +19,12 @@ package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
+
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import java.net.InetSocketAddress;
@@ -64,7 +65,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
- config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+ config.setString(HighAvailabilityOptions.HA_CLUSTER_ID.key(), zkNamespace);
}
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
new file mode 100644
index 0000000..1ee988a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to high-availability settings.
+ */
+@PublicEvolving
+public class HighAvailabilityOptions {
+
+ // ------------------------------------------------------------------------
+ // Required High Availability Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * Defines high-availability mode used for the cluster execution.
+ * A value of "NONE" signals no highly available setup.
+ * To enable high-availability, set this mode to "ZOOKEEPER".
+ */
+ public static final ConfigOption<String> HA_MODE =
+ key("high-availability")
+ .defaultValue("NONE")
+ .withDeprecatedKeys("recovery.mode");
+
+ /**
+ * The ID of the Flink cluster, used to separate multiple Flink clusters
+ * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
+ */
+ public static final ConfigOption<String> HA_CLUSTER_ID =
+ key("high-availability.cluster-id")
+ .defaultValue("/default")
+ .withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace");
+
+ /**
+ * File system path (URI) where Flink persists metadata in high-availability setups
+ */
+ public static final ConfigOption<String> HA_STORAGE_PATH =
+ key("high-availability.storageDir")
+ .noDefaultValue()
+ .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");
+
+ /**
+ * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
+ */
+ public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
+ key("high-availability.zookeeper.quorum")
+ .noDefaultValue()
+ .withDeprecatedKeys("recovery.zookeeper.quorum");
+
+
+ // ------------------------------------------------------------------------
+ // Recovery Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * Optional port (range) used by the job manager in high-availability mode.
+ */
+ public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
+ key("high-availability.jobmanager.port")
+ .defaultValue("0")
+ .withDeprecatedKeys("recovery.jobmanager.port");
+
+ /**
+ * The time before a JobManager after a fail over recovers the current jobs.
+ */
+ public static final ConfigOption<String> HA_JOB_DELAY =
+ key("high-availability.job.delay")
+ .noDefaultValue()
+ .withDeprecatedKeys("recovery.job.delay");
+
+ // ------------------------------------------------------------------------
+ // ZooKeeper Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * The root path under which Flink stores its entries in ZooKeeper
+ */
+ public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
+ key("high-availability.zookeeper.path.root")
+ .defaultValue("/flink")
+ .withDeprecatedKeys("recovery.zookeeper.path.root");
+
+ // ------------------------------------------------------------------------
+ // ZooKeeper Client Settings
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =
+ key("high-availability.zookeeper.client.session-timeout")
+ .defaultValue(60000)
+ .withDeprecatedKeys("recovery.zookeeper.client.session-timeout");
+
+ public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
+ key("high-availability.zookeeper.client.connection-timeout")
+ .defaultValue(15000)
+ .withDeprecatedKeys("recovery.zookeeper.client.connection-timeout");
+
+ public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =
+ key("high-availability.zookeeper.client.retry-wait")
+ .defaultValue(5000)
+ .withDeprecatedKeys("recovery.zookeeper.client.retry-wait");
+
+ public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
+ key("high-availability.zookeeper.client.max-retry-attempts")
+ .defaultValue(3)
+ .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
+
+ public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
+ key("zookeeper.sasl.disable")
+ .defaultValue(true);
+
+ public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
+ key("zookeeper.sasl.service-name")
+ .noDefaultValue();
+
+ // ------------------------------------------------------------------------
+
+ /** Not intended to be instantiated */
+ private HighAvailabilityOptions() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 54c5e76..1ae776c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -237,7 +238,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
followingClient.sendGetRequest("index.html", deadline.timeLeft());
response = followingClient.getNextResponse(deadline.timeLeft());
assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, response.getStatus());
- assertTrue(response.getLocation().contains("" + leadingWebMonitor.getServerPort()));
+ assertTrue(response.getLocation().contains(String.valueOf(leadingWebMonitor.getServerPort())));
// Kill the leader
leadingSystem.shutdown();
@@ -296,8 +297,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
- config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString());
actorSystem = AkkaUtils.createDefaultActorSystem();
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index ee189d4..deba738 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -19,14 +19,17 @@
package org.apache.flink.runtime.blob;
import com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.ConfigurationUtil;
import org.apache.flink.util.IOUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,16 +55,11 @@ class FileSystemBlobStore implements BlobStore {
private final String basePath;
FileSystemBlobStore(Configuration config) throws IOException {
- String storagePath = ConfigurationUtil.getStringWithDeprecatedKeys(
- config,
- ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
- null,
- ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
-
- if (storagePath == null) {
- throw new IllegalConfigurationException(String.format("Missing configuration for " +
- "ZooKeeper file system path. Please specify via " +
- "'%s' key.", ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH));
+ String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+
+ if (storagePath == null || StringUtils.isBlank(storagePath)) {
+ throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
+ " Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
}
this.basePath = storagePath + "/blob";
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 087ad3b..fa2db48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.ConfigurationUtil;
+import org.apache.flink.configuration.HighAvailabilityOptions;
/**
* High availability mode for Flink's cluster execution. Currently supported modes are:
@@ -43,11 +43,7 @@ public enum HighAvailabilityMode {
* configured.
*/
public static HighAvailabilityMode fromConfig(Configuration config) {
- String haMode = ConfigurationUtil.getStringWithDeprecatedKeys(
- config,
- ConfigConstants.HA_MODE,
- null,
- ConfigConstants.RECOVERY_MODE);
+ String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);
if (haMode == null) {
return HighAvailabilityMode.NONE;
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index be6611f..67dd78c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
@@ -182,9 +183,9 @@ public class SecurityContext {
//with pseudo JAAS configuration file if SASL auth is enabled for ZK
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
- boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
- ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
- if(disableSaslClient) {
+ boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+
+ if (disableSaslClient) {
LOG.info("SASL client auth for ZK will be disabled");
//SASL auth is disabled by default but will be enabled if specified in configuration
System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
@@ -212,8 +213,8 @@ public class SecurityContext {
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
- String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
- if(!StringUtils.isBlank(zkSaslServiceName)) {
+ String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+ if (!StringUtils.isBlank(zkSaslServiceName)) {
LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 5e69875..137a85b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -18,12 +18,14 @@
package org.apache.flink.runtime.util;
+import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -57,53 +59,25 @@ public class ZooKeeperUtils {
* @return {@link CuratorFramework} instance
*/
public static CuratorFramework startCuratorFramework(Configuration configuration) {
- String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
- null,
- ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+ String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
- if (zkQuorum == null || zkQuorum.equals("")) {
+ if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
"You can specify the quorum via the configuration key '" +
- ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'.");
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "'.");
}
- int sessionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
- ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT,
- ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT);
+ int sessionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT);
- int connectionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
- ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT,
- ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT);
+ int connectionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT);
- int retryWait = ConfigurationUtil.getIntegerWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
- ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT,
- ConfigConstants.ZOOKEEPER_RETRY_WAIT);
+ int retryWait = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_RETRY_WAIT);
- int maxRetryAttempts = ConfigurationUtil.getIntegerWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
- ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
- ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+ int maxRetryAttempts = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
- String root = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
- ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY,
- ConfigConstants.ZOOKEEPER_DIR_KEY);
+ String root = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
- String namespace = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
- ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY,
- ConfigConstants.ZOOKEEPER_NAMESPACE_KEY);
+ String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
String rootWithNamespace = generateZookeeperPath(root, namespace);
@@ -138,13 +112,9 @@ public class ZooKeeperUtils {
public static String getZooKeeperEnsemble(Configuration flinkConf)
throws IllegalConfigurationException {
- String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
- flinkConf,
- ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
- "",
- ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+ String zkQuorum = flinkConf.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
- if (zkQuorum == null || zkQuorum.equals("")) {
+ if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
}
@@ -367,15 +337,11 @@ public class ZooKeeperUtils {
Configuration configuration,
String prefix) throws IOException {
- String rootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
- "",
- ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
+ String rootPath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
- if (rootPath.equals("")) {
- throw new IllegalConfigurationException("Missing recovery path. Specify via " +
- "configuration key '" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "'.");
+ if (rootPath == null || StringUtils.isBlank(rootPath)) {
+ throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
+ " Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
} else {
return new FileSystemStateStorageHelper<T>(rootPath, prefix);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
index 9fba529..c4140c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.zookeeper;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.util.EnvironmentInformation;
+
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
@@ -47,8 +47,25 @@ import java.util.UUID;
*/
public class FlinkZooKeeperQuorumPeer {
+ /** ZooKeeper default client port. */
+ public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
+
+ /** ZooKeeper default init limit. */
+ public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+
+ /** ZooKeeper default sync limit. */
+ public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+
+ /** ZooKeeper default peer port. */
+ public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
+
+ /** ZooKeeper default leader port. */
+ public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
+
private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+ // ------------------------------------------------------------------------
+
public static void main(String[] args) {
try {
// startup checks and logging
@@ -67,6 +84,8 @@ public class FlinkZooKeeperQuorumPeer {
}
}
+ // ------------------------------------------------------------------------
+
/**
* Runs a ZooKeeper {@link QuorumPeer} if further peers are configured or a single
* {@link ZooKeeperServer} if no further peers are configured.
@@ -120,26 +139,23 @@ public class FlinkZooKeeperQuorumPeer {
private static void setRequiredProperties(Properties zkProps) {
// Set default client port
if (zkProps.getProperty("clientPort") == null) {
- int clientPort = ConfigConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
- zkProps.setProperty("clientPort", String.valueOf(clientPort));
+ zkProps.setProperty("clientPort", String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_PORT));
- LOG.warn("No 'clientPort' configured. Set to '{}'.", clientPort);
+ LOG.warn("No 'clientPort' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_CLIENT_PORT);
}
// Set default init limit
if (zkProps.getProperty("initLimit") == null) {
- int initLimit = ConfigConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT;
- zkProps.setProperty("initLimit", String.valueOf(initLimit));
+ zkProps.setProperty("initLimit", String.valueOf(DEFAULT_ZOOKEEPER_INIT_LIMIT));
- LOG.warn("No 'initLimit' configured. Set to '{}'.", initLimit);
+ LOG.warn("No 'initLimit' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_INIT_LIMIT);
}
// Set default sync limit
if (zkProps.getProperty("syncLimit") == null) {
- int syncLimit = ConfigConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT;
- zkProps.setProperty("syncLimit", String.valueOf(syncLimit));
+ zkProps.setProperty("syncLimit", String.valueOf(DEFAULT_ZOOKEEPER_SYNC_LIMIT));
- LOG.warn("No 'syncLimit' configured. Set to '{}'.", syncLimit);
+ LOG.warn("No 'syncLimit' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_SYNC_LIMIT);
}
// Set default data dir
@@ -152,8 +168,8 @@ public class FlinkZooKeeperQuorumPeer {
LOG.warn("No 'dataDir' configured. Set to '{}'.", dataDir);
}
- int peerPort = ConfigConstants.DEFAULT_ZOOKEEPER_PEER_PORT;
- int leaderPort = ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PORT;
+ int peerPort = DEFAULT_ZOOKEEPER_PEER_PORT;
+ int leaderPort = DEFAULT_ZOOKEEPER_LEADER_PORT;
// Set peer and leader ports if none given, because ZooKeeper complains if multiple
// servers are configured, but no ports are given.
@@ -220,12 +236,8 @@ public class FlinkZooKeeperQuorumPeer {
// Write myid to file. We use a File Writer, because that properly propagates errors,
// while the PrintWriter swallows errors
- FileWriter writer = new FileWriter(new File(dataDir, "myid"));
- try {
+ try (FileWriter writer = new FileWriter(new File(dataDir, "myid"))) {
writer.write(String.valueOf(id));
}
- finally {
- writer.close();
- }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e90f2d2..be820ae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,7 +31,7 @@ import akka.pattern.ask
import grizzled.slf4j.Logger
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.metrics.{Gauge, MetricGroup}
@@ -2367,9 +2367,7 @@ object JobManager {
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
// The port range of allowed job manager ports or 0 for random
- configuration.getString(
- ConfigConstants.RECOVERY_JOB_MANAGER_PORT,
- ConfigConstants.DEFAULT_HA_JOB_MANAGER_PORT)
+ configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE)
}
else {
LOG.info("Starting JobManager without high-availability")
@@ -2501,11 +2499,7 @@ object JobManager {
val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
- val jobRecoveryTimeoutStr = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_JOB_DELAY,
- null,
- ConfigConstants.RECOVERY_JOB_DELAY)
+ val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
timeout
@@ -2515,7 +2509,7 @@ object JobManager {
} catch {
case n: NumberFormatException =>
throw new Exception(
- s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " +
+ s"Invalid config value for ${HighAvailabilityOptions.HA_JOB_DELAY.key()}: " +
s"$jobRecoveryTimeoutStr. Value must be a valid duration (such as '10 s' or '1 min')")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 8464d68..8ba20c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.junit.After;
import org.junit.Before;
@@ -68,9 +69,9 @@ public class BlobRecoveryITCase {
try {
Configuration config = new Configuration();
- config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, recoveryDir.getPath());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f6bed56..f6cdf09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
@@ -63,9 +64,9 @@ public class BlobLibraryCacheRecoveryITCase {
try {
Configuration config = new Configuration();
- config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 04c0e48..91fb514 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -42,7 +43,7 @@ public class HighAvailabilityModeTest {
assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
// Check not equals default
- config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+ config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
}
@@ -54,16 +55,16 @@ public class HighAvailabilityModeTest {
Configuration config = new Configuration();
// Check mapping of old default to new default
- config.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE);
+ config.setString("recovery.mode", ConfigConstants.DEFAULT_RECOVERY_MODE);
assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
// Check deprecated config
- config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+ config.setString("recovery.mode", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
// Check precedence over deprecated config
- config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.NONE.name().toLowerCase());
- config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+ config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase());
+ config.setString("recovery.mode", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 360588d..5b12eee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -131,8 +132,8 @@ public class JobManagerHARecoveryTest {
ActorRef jobManager = null;
ActorRef taskManager = null;
- flinkConfiguration.setString(ConfigConstants.HA_MODE, "zookeeper");
- flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.newFolder().toString());
+ flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index e20985b..1f1eb62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
@@ -89,8 +90,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testZooKeeperLeaderElectionRetrieval() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
ZooKeeperLeaderElectionService leaderElectionService = null;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -134,8 +135,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testZooKeeperReelection() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
@@ -217,8 +218,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testZooKeeperReelectionWithReplacement() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
int num = 3;
int numTries = 30;
@@ -295,8 +296,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
final String leaderPath = "/leader";
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -379,8 +380,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testExceptionForwarding() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
ZooKeeperLeaderElectionService leaderElectionService = null;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -448,8 +449,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testEphemeralZooKeeperNodes() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
ZooKeeperLeaderElectionService leaderElectionService;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0fe0644..70b1da0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
+
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
@@ -82,8 +85,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
long sleepingTime = 1000;
- config.setString(ConfigConstants.HA_MODE, "zookeeper");
- config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
LeaderElectionService leaderElectionService = null;
LeaderElectionService faultyLeaderElectionService;
@@ -179,8 +182,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
@Test
public void testTimeoutOfFindConnectingAddress() throws Exception {
Configuration config = new Configuration();
- config.setString(ConfigConstants.HA_MODE, "zookeeper");
- config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -190,7 +193,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
assertEquals(InetAddress.getLocalHost(), result);
}
- class FindConnectingAddress implements Runnable {
+ static class FindConnectingAddress implements Runnable {
private final Configuration config;
private final FiniteDuration timeout;
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 7dd7067..07cec32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -66,8 +67,8 @@ public class ZooKeeperTestUtils {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
// ZooKeeper recovery mode
- config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
int connTimeout = 5000;
if (System.getenv().containsKey("CI")) {
@@ -75,20 +76,20 @@ public class ZooKeeperTestUtils {
connTimeout = 30000;
}
- config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
- config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
+ config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+ config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
// File system state backend
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
- config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, fsStateHandlePath + "/recovery");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
// Akka failure detection and execution retries
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
- config.setString(ConfigConstants.HA_JOB_DELAY, "10 s");
+ config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");
return config;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
index daed4a4..d5895ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.util;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -71,7 +72,7 @@ public class ZooKeeperUtilTest extends TestLogger {
}
private Configuration setQuorum(Configuration conf, String quorum) {
- conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum);
+ conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, quorum);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index bd58515..66c4fac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -22,9 +22,11 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.ZKPaths;
-import org.apache.flink.configuration.ConfigConstants;
+
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+
import org.apache.zookeeper.KeeperException;
import java.util.List;
@@ -58,7 +60,7 @@ public class ZooKeeperTestEnvironment {
zooKeeperServer = new TestingServer(true);
zooKeeperCluster = null;
- conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+ conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperServer.getConnectString());
}
else {
@@ -67,7 +69,7 @@ public class ZooKeeperTestEnvironment {
zooKeeperCluster.start();
- conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+ conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperCluster.getConnectString());
}
@@ -127,7 +129,7 @@ public class ZooKeeperTestEnvironment {
*/
public CuratorFramework createClient() {
Configuration config = new Configuration();
- config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, getConnectString());
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, getConnectString());
return ZooKeeperUtils.startCuratorFramework(config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index e878097..97016e4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,12 +28,12 @@ import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logger
import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{HighAvailabilityOptions, ConfigConstants, Configuration}
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.{HighAvailabilityMode, MemoryArchivist, JobManager}
import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.runtime.util.LeaderRetrievalUtils
import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor}
@@ -412,8 +412,7 @@ object TestingUtils {
* @param configuration Configuration to use
* @param jobManagerClass JobManager class to instantiate
* @param prefix The prefix to use for the Actor names
- *
- * @return
+ * @return
*/
def createJobManager(
actorSystem: ActorSystem,
@@ -422,7 +421,8 @@ object TestingUtils {
prefix: String)
: ActorGateway = {
- configuration.setString(ConfigConstants.HA_MODE,
+ configuration.setString(
+ HighAvailabilityOptions.HA_MODE,
ConfigConstants.DEFAULT_HA_MODE)
val (actor, _) = JobManager.startJobManagerActors(
@@ -502,7 +502,8 @@ object TestingUtils {
configuration: Configuration)
: ActorGateway = {
- configuration.setString(ConfigConstants.HA_MODE,
+ configuration.setString(
+ HighAvailabilityOptions.HA_MODE,
ConfigConstants.DEFAULT_HA_MODE)
val actor = FlinkResourceManager.startResourceManagerActors(
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 051175a..c005814 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment;
@@ -215,10 +216,10 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
- config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
- config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + "/flink/recovery");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
SecureTestEnvironment.populateFlinkSecureConfigurations(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index b5e622b..0250c16 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.hadoop.minikdc.MiniKdc;
import org.junit.rules.TemporaryFolder;
@@ -115,7 +116,7 @@ public class SecureTestEnvironment {
Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
- flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+ flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
ctx.setFlinkConfiguration(flinkConfig);
TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap());
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b774f97..aa5e7d3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
@@ -121,7 +122,7 @@ public class TestBaseUtils extends TestLogger {
if (startZooKeeper) {
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
- config.setString(ConfigConstants.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
}
return startCluster(config, singleActorSystem);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index cc8ab80..4d10bf1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -564,7 +565,7 @@ public class ChaosMonkeyITCase extends TestLogger {
fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
}
- File fsRecovery = new File(new URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath());
+ File fsRecovery = new File(new URI(config.getString(HighAvailabilityOptions.HA_STORAGE_PATH)).getPath());
LOG.info("Checking " + fsRecovery);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 9b0d9de..a51f88b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -149,8 +150,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
*/
public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
Configuration config = new Configuration();
- config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum);
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"leader", 1, config);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 48ad7f5..4bcde16 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.yarn;
import org.apache.commons.cli.CommandLine;
+
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
@@ -27,7 +28,7 @@ import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -38,20 +39,20 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
+
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
-import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
@@ -202,7 +203,7 @@ public class CliFrontendYarnAddressConfigurationTest {
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
frontend.retrieveClient(options);
- String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error");
+ String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
}
@@ -216,7 +217,7 @@ public class CliFrontendYarnAddressConfigurationTest {
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
frontend.retrieveClient(options);
- String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error");
+ String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
Assert.assertEquals(overrideZkNamespace, zkNs);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9d6ff85..79f790f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -119,7 +120,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
- "@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" + fsStateHandlePath + "/recovery");
+ "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
ClusterClient yarnCluster = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 848013c..9481c24 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.security.SecurityContext;
@@ -539,11 +540,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
- zkNamespace = flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId));
+ zkNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
- flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+ flinkConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
// activate re-execution of failed applications
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index b27876b..10e229e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner {
// override zookeeper namespace with user cli argument (if provided)
String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace);
+ configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
}
// if a web monitor shall be started, set the port to random binding
http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d09340c..e4da140 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
@@ -60,7 +61,6 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
/**
* Class handling the command line interface to the YARN session.
@@ -513,8 +513,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
if(null != applicationID) {
String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
- : config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID);
- config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+ : config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
+ config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
yarnDescriptor.setFlinkConfiguration(config);