You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/20 09:40:56 UTC
[flink] 01/05: [FLINK-8660][ha] Enable user to provide custom
HAServices implementation
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3bc7bc2e80af1e277ffb4906e6aca416a9de954b
Author: Krzysztof BiaĆek <kb...@users.noreply.github.com>
AuthorDate: Tue Sep 18 11:18:03 2018 +0200
[FLINK-8660][ha] Enable user to provide custom HAServices implementation
Create BlobStorage for any HA backend
HighAvailabilityServicesFactory may throw exceptions
Docs
Use ha mode config property to specify factory class FQN
Update docs
---
docs/_includes/generated/common_section.html | 2 +-
.../generated/high_availability_configuration.html | 2 +-
docs/ops/jobmanager_high_availability.md | 1 +
.../configuration/HighAvailabilityOptions.java | 4 +-
.../org/apache/flink/runtime/blob/BlobUtils.java | 8 +--
.../HighAvailabilityServicesFactory.java | 39 ++++++++++
.../HighAvailabilityServicesUtils.java | 28 +++++++-
.../runtime/jobmanager/HighAvailabilityMode.java | 29 ++++----
.../HighAvailabilityServicesUtilsTest.java | 83 ++++++++++++++++++++++
.../jobmanager/HighAvailabilityModeTest.java | 33 ++++++++-
10 files changed, 203 insertions(+), 26 deletions(-)
diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
index ea881e0..65804db 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -45,7 +45,7 @@
<tr>
<td><h5>high-availability</h5></td>
<td style="word-wrap: break-word;">"NONE"</td>
- <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+ <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
</tr>
<tr>
<td><h5>high-availability.storageDir</h5></td>
diff --git a/docs/_includes/generated/high_availability_configuration.html b/docs/_includes/generated/high_availability_configuration.html
index b1e2ea9..398379d 100644
--- a/docs/_includes/generated/high_availability_configuration.html
+++ b/docs/_includes/generated/high_availability_configuration.html
@@ -10,7 +10,7 @@
<tr>
<td><h5>high-availability</h5></td>
<td style="word-wrap: break-word;">"NONE"</td>
- <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+ <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
</tr>
<tr>
<td><h5>high-availability.cluster-id</h5></td>
diff --git a/docs/ops/jobmanager_high_availability.md b/docs/ops/jobmanager_high_availability.md
index 320f9d6..2c2f1a6 100644
--- a/docs/ops/jobmanager_high_availability.md
+++ b/docs/ops/jobmanager_high_availability.md
@@ -65,6 +65,7 @@ By default, the job manager will pick a *random port* for inter process communic
In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance.
<pre>high-availability: zookeeper</pre>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 787efff..5881533 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -43,6 +43,7 @@ public class HighAvailabilityOptions {
* Defines high-availability mode used for the cluster execution.
* A value of "NONE" signals no highly available setup.
* To enable high-availability, set this mode to "ZOOKEEPER".
+ * Can also be set to FQN of HighAvailability factory class.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_MODE =
@@ -50,7 +51,7 @@ public class HighAvailabilityOptions {
.defaultValue("NONE")
.withDeprecatedKeys("recovery.mode")
.withDescription("Defines high-availability mode used for the cluster execution." +
- " To enable high-availability, set this mode to \"ZOOKEEPER\".");
+ " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
/**
* The ID of the Flink cluster, used to separate multiple Flink clusters
@@ -73,7 +74,6 @@ public class HighAvailabilityOptions {
.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");
-
// ------------------------------------------------------------------------
// Recovery Options
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index a61d679..e1caa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -88,14 +88,10 @@ public class BlobUtils {
* thrown if the (distributed) file storage cannot be created
*/
public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
- HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
-
- if (highAvailabilityMode == HighAvailabilityMode.NONE) {
- return new VoidBlobStore();
- } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+ if (HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
return createFileSystemBlobStore(config);
} else {
- throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
+ return new VoidBlobStore();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
new file mode 100644
index 0000000..0abc6de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory interface for {@link HighAvailabilityServices}.
+ */
+public interface HighAvailabilityServicesFactory {
+
+ /**
+ * Creates an {@link HighAvailabilityServices} instance.
+ *
+ * @param configuration Flink configuration
+ * @param executor background task executor
+ * @return instance of {@link HighAvailabilityServices}
+ * @throws Exception when HAServices cannot be created
+ */
+ HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 778d2db..78484d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.BlobStoreService;
@@ -64,11 +65,14 @@ public class HighAvailabilityServicesUtils {
config,
blobStoreService);
+ case FACTORY_CLASS:
+ return createCustomHAServices(config, executor);
+
default:
throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
}
}
-
+
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
@@ -76,7 +80,7 @@ public class HighAvailabilityServicesUtils {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
- switch(highAvailabilityMode) {
+ switch (highAvailabilityMode) {
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
@@ -119,6 +123,10 @@ public class HighAvailabilityServicesUtils {
executor,
configuration,
blobStoreService);
+
+ case FACTORY_CLASS:
+ return createCustomHAServices(configuration, executor);
+
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
@@ -151,6 +159,22 @@ public class HighAvailabilityServicesUtils {
return Tuple2.of(hostname, port);
}
+ private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception {
+ Class<HighAvailabilityServicesFactory> factoryClass;
+ try {
+ factoryClass = config.getClass(
+ HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new Exception("Custom HA FactoryClass not found");
+ }
+
+ if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
+ return factoryClass.newInstance().createHAServices(config, executor);
+ } else {
+ throw new Exception("Custom HA FactoryClass is not valid.");
+ }
+ }
+
public enum AddressResolution {
TRY_ADDRESS_RESOLUTION,
NO_ADDRESS_RESOLUTION
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 7dc13c2..65c202a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -30,10 +30,19 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
* ZooKeeper is used to select a leader among a group of JobManager. This JobManager
* is responsible for the job execution. Upon failure of the leader a new leader is elected
* which will take over the responsibilities of the old leader
+ * - FACTORY_CLASS: Use implementation of {@link org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory}
+ * specified in configuration property high-availability
*/
public enum HighAvailabilityMode {
- NONE,
- ZOOKEEPER;
+ NONE(false),
+ ZOOKEEPER(true),
+ FACTORY_CLASS(true);
+
+ private final boolean haActive;
+
+ HighAvailabilityMode(boolean haActive) {
+ this.haActive = haActive;
+ }
/**
* Return the configured {@link HighAvailabilityMode}.
@@ -51,7 +60,11 @@ public enum HighAvailabilityMode {
// Map old default to new default
return HighAvailabilityMode.NONE;
} else {
- return HighAvailabilityMode.valueOf(haMode.toUpperCase());
+ try {
+ return HighAvailabilityMode.valueOf(haMode.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return FACTORY_CLASS;
+ }
}
}
@@ -63,14 +76,6 @@ public enum HighAvailabilityMode {
*/
public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
HighAvailabilityMode mode = fromConfig(configuration);
- switch (mode) {
- case NONE:
- return false;
- case ZOOKEEPER:
- return true;
- default:
- return false;
- }
-
+ return mode.haActive;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
new file mode 100644
index 0000000..e9063ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for the {@link HighAvailabilityServicesUtils} class.
+ */
+public class HighAvailabilityServicesUtilsTest extends TestLogger {
+
+ @Test
+ public void testCreateCustomHAServices() throws Exception {
+ Configuration config = new Configuration();
+
+ HighAvailabilityServices haServices = Mockito.mock(HighAvailabilityServices.class);
+ TestHAFactory.haServices = haServices;
+
+ Executor executor = Mockito.mock(Executor.class);
+
+ config.setString(HighAvailabilityOptions.HA_MODE, TestHAFactory.class.getName());
+
+ // when
+ HighAvailabilityServices actualHaServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
+
+ // then
+ assertSame(haServices, actualHaServices);
+
+ // when
+ actualHaServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(config, executor,
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+ // then
+ assertSame(haServices, actualHaServices);
+ }
+
+ @Test(expected = Exception.class)
+ public void testCustomHAServicesFactoryNotDefined() throws Exception {
+ Configuration config = new Configuration();
+
+ Executor executor = Mockito.mock(Executor.class);
+
+ config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+
+ // expect
+ HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
+ }
+
+ private static class TestHAFactory implements HighAvailabilityServicesFactory {
+
+ static HighAvailabilityServices haServices;
+
+ @Override
+ public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
+ return haServices;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 91fb514..e708c10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -21,15 +21,20 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-public class HighAvailabilityModeTest {
+/**
+ * Tests for the {@link HighAvailabilityMode}.
+ */
+public class HighAvailabilityModeTest extends TestLogger {
// Default HA mode
- private final static HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
+ private static final HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
ConfigConstants.DEFAULT_HA_MODE.toUpperCase());
/**
@@ -45,6 +50,10 @@ public class HighAvailabilityModeTest {
// Check not equals default
config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+
+ // Check factory class
+ config.setString(HighAvailabilityOptions.HA_MODE, "factory.class.FQN");
+ assertEquals(HighAvailabilityMode.FACTORY_CLASS, HighAvailabilityMode.fromConfig(config));
}
/**
@@ -69,4 +78,24 @@ public class HighAvailabilityModeTest {
assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
}
+ @Test
+ public void testCheckHighAvailabilityModeActivated() throws Exception {
+ Configuration config = new Configuration();
+
+ // check defaults
+ assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+ // check NONE
+ config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase());
+ assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+ // check ZOOKEEPER
+ config.setString("high-availability", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+ assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+ // check FACTORY_CLASS
+ config.setString("high-availability", HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+ assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+ }
+
}