You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/20 09:40:56 UTC

[flink] 01/05: [FLINK-8660][ha] Enable user to provide custom HAServices implementation

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bc7bc2e80af1e277ffb4906e6aca416a9de954b
Author: Krzysztof BiaƂek <kb...@users.noreply.github.com>
AuthorDate: Tue Sep 18 11:18:03 2018 +0200

    [FLINK-8660][ha] Enable user to provide custom HAServices implementation
    
    Create BlobStorage for any HA backend
    
    HighAvailabilityServicesFactory may throw exceptions
    
    Docs
    
    Use ha mode config property to specify factory class FQN
    
    Update docs
---
 docs/_includes/generated/common_section.html       |  2 +-
 .../generated/high_availability_configuration.html |  2 +-
 docs/ops/jobmanager_high_availability.md           |  1 +
 .../configuration/HighAvailabilityOptions.java     |  4 +-
 .../org/apache/flink/runtime/blob/BlobUtils.java   |  8 +--
 .../HighAvailabilityServicesFactory.java           | 39 ++++++++++
 .../HighAvailabilityServicesUtils.java             | 28 +++++++-
 .../runtime/jobmanager/HighAvailabilityMode.java   | 29 ++++----
 .../HighAvailabilityServicesUtilsTest.java         | 83 ++++++++++++++++++++++
 .../jobmanager/HighAvailabilityModeTest.java       | 33 ++++++++-
 10 files changed, 203 insertions(+), 26 deletions(-)

diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
index ea881e0..65804db 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -45,7 +45,7 @@
         <tr>
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
-            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.storageDir</h5></td>
diff --git a/docs/_includes/generated/high_availability_configuration.html b/docs/_includes/generated/high_availability_configuration.html
index b1e2ea9..398379d 100644
--- a/docs/_includes/generated/high_availability_configuration.html
+++ b/docs/_includes/generated/high_availability_configuration.html
@@ -10,7 +10,7 @@
         <tr>
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
-            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.cluster-id</h5></td>
diff --git a/docs/ops/jobmanager_high_availability.md b/docs/ops/jobmanager_high_availability.md
index 320f9d6..2c2f1a6 100644
--- a/docs/ops/jobmanager_high_availability.md
+++ b/docs/ops/jobmanager_high_availability.md
@@ -65,6 +65,7 @@ By default, the job manager will pick a *random port* for inter process communic
 In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
 
 - **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. 
 
   <pre>high-availability: zookeeper</pre>
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 787efff..5881533 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -43,6 +43,7 @@ public class HighAvailabilityOptions {
 	 * Defines high-availability mode used for the cluster execution.
 	 * A value of "NONE" signals no highly available setup.
 	 * To enable high-availability, set this mode to "ZOOKEEPER".
+	 * Can also be set to FQN of HighAvailability factory class.
 	 */
 	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_MODE =
@@ -50,7 +51,7 @@ public class HighAvailabilityOptions {
 			.defaultValue("NONE")
 			.withDeprecatedKeys("recovery.mode")
 			.withDescription("Defines high-availability mode used for the cluster execution." +
-				" To enable high-availability, set this mode to \"ZOOKEEPER\".");
+				" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
 
 	/**
 	 * The ID of the Flink cluster, used to separate multiple Flink clusters
@@ -73,7 +74,6 @@ public class HighAvailabilityOptions {
 			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
 			.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");
 
-
 	// ------------------------------------------------------------------------
 	//  Recovery Options
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index a61d679..e1caa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -88,14 +88,10 @@ public class BlobUtils {
 	 * 		thrown if the (distributed) file storage cannot be created
 	 */
 	public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
-		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
-
-		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-			return new VoidBlobStore();
-		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+		if (HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
 			return createFileSystemBlobStore(config);
 		} else {
-			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
+			return new VoidBlobStore();
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
new file mode 100644
index 0000000..0abc6de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory interface for {@link HighAvailabilityServices}.
+ */
+public interface HighAvailabilityServicesFactory {
+
+	/**
+	 * Creates an {@link HighAvailabilityServices} instance.
+	 *
+	 * @param configuration Flink configuration
+	 * @param executor background task executor
+	 * @return instance of {@link HighAvailabilityServices}
+	 * @throws Exception when HAServices cannot be created
+	 */
+	HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 778d2db..78484d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
@@ -64,11 +65,14 @@ public class HighAvailabilityServicesUtils {
 					config,
 					blobStoreService);
 
+			case FACTORY_CLASS:
+				return createCustomHAServices(config, executor);
+
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
-	
+
 	public static HighAvailabilityServices createHighAvailabilityServices(
 		Configuration configuration,
 		Executor executor,
@@ -76,7 +80,7 @@ public class HighAvailabilityServicesUtils {
 
 		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
 
-		switch(highAvailabilityMode) {
+		switch (highAvailabilityMode) {
 			case NONE:
 				final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
 
@@ -119,6 +123,10 @@ public class HighAvailabilityServicesUtils {
 					executor,
 					configuration,
 					blobStoreService);
+
+			case FACTORY_CLASS:
+				return createCustomHAServices(configuration, executor);
+
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
@@ -151,6 +159,22 @@ public class HighAvailabilityServicesUtils {
 		return Tuple2.of(hostname, port);
 	}
 
+	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception {
+		Class<HighAvailabilityServicesFactory> factoryClass;
+		try {
+			factoryClass = config.getClass(
+				HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader());
+		} catch (ClassNotFoundException e) {
+			throw new Exception("Custom HA FactoryClass not found");
+		}
+
+		if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
+			return factoryClass.newInstance().createHAServices(config, executor);
+		} else {
+			throw new Exception("Custom HA FactoryClass is not valid.");
+		}
+	}
+
 	public enum AddressResolution {
 		TRY_ADDRESS_RESOLUTION,
 		NO_ADDRESS_RESOLUTION
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 7dc13c2..65c202a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -30,10 +30,19 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
  * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
  * is responsible for the job execution. Upon failure of the leader a new leader is elected
  * which will take over the responsibilities of the old leader
+ * - FACTORY_CLASS: Use implementation of {@link org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory}
+ * specified in configuration property high-availability
  */
 public enum HighAvailabilityMode {
-	NONE,
-	ZOOKEEPER;
+	NONE(false),
+	ZOOKEEPER(true),
+	FACTORY_CLASS(true);
+
+	private final boolean haActive;
+
+	HighAvailabilityMode(boolean haActive) {
+		this.haActive = haActive;
+	}
 
 	/**
 	 * Return the configured {@link HighAvailabilityMode}.
@@ -51,7 +60,11 @@ public enum HighAvailabilityMode {
 			// Map old default to new default
 			return HighAvailabilityMode.NONE;
 		} else {
-			return HighAvailabilityMode.valueOf(haMode.toUpperCase());
+			try {
+				return HighAvailabilityMode.valueOf(haMode.toUpperCase());
+			} catch (IllegalArgumentException e) {
+				return FACTORY_CLASS;
+			}
 		}
 	}
 
@@ -63,14 +76,6 @@ public enum HighAvailabilityMode {
 	 */
 	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
 		HighAvailabilityMode mode = fromConfig(configuration);
-		switch (mode) {
-			case NONE:
-				return false;
-			case ZOOKEEPER:
-				return true;
-			default:
-				return false;
-		}
-
+		return mode.haActive;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
new file mode 100644
index 0000000..e9063ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for the {@link HighAvailabilityServicesUtils} class.
+ */
+public class HighAvailabilityServicesUtilsTest extends TestLogger {
+
+	@Test
+	public void testCreateCustomHAServices() throws Exception {
+		Configuration config = new Configuration();
+
+		HighAvailabilityServices haServices = Mockito.mock(HighAvailabilityServices.class);
+		TestHAFactory.haServices = haServices;
+
+		Executor executor = Mockito.mock(Executor.class);
+
+		config.setString(HighAvailabilityOptions.HA_MODE, TestHAFactory.class.getName());
+
+		// when
+		HighAvailabilityServices actualHaServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
+
+		// then
+		assertSame(haServices, actualHaServices);
+
+		// when
+		actualHaServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(config, executor,
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+		// then
+		assertSame(haServices, actualHaServices);
+	}
+
+	@Test(expected = Exception.class)
+	public void testCustomHAServicesFactoryNotDefined() throws Exception {
+		Configuration config = new Configuration();
+
+		Executor executor = Mockito.mock(Executor.class);
+
+		config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+
+		// expect
+		HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
+	}
+
+	private static class TestHAFactory implements HighAvailabilityServicesFactory {
+
+		static HighAvailabilityServices haServices;
+
+		@Override
+		public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
+			return haServices;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 91fb514..e708c10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -21,15 +21,20 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class HighAvailabilityModeTest {
+/**
+ * Tests for the {@link HighAvailabilityMode}.
+ */
+public class HighAvailabilityModeTest extends TestLogger {
 
 	// Default HA mode
-	private final static HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
+	private static final HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
 			ConfigConstants.DEFAULT_HA_MODE.toUpperCase());
 
 	/**
@@ -45,6 +50,10 @@ public class HighAvailabilityModeTest {
 		// Check not equals default
 		config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
 		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+
+		// Check factory class
+		config.setString(HighAvailabilityOptions.HA_MODE, "factory.class.FQN");
+		assertEquals(HighAvailabilityMode.FACTORY_CLASS, HighAvailabilityMode.fromConfig(config));
 	}
 
 	/**
@@ -69,4 +78,24 @@ public class HighAvailabilityModeTest {
 		assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
 	}
 
+	@Test
+	public void testCheckHighAvailabilityModeActivated() throws Exception {
+		Configuration config = new Configuration();
+
+		// check defaults
+		assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+		// check NONE
+		config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase());
+		assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+		// check ZOOKEEPER
+		config.setString("high-availability", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+		// check FACTORY_CLASS
+		config.setString("high-availability", HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+		assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+	}
+
 }