You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/20 09:40:55 UTC

[flink] branch master updated (26bac51 -> 6522f17)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 26bac51  [FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb streams
     new 3bc7bc2  [FLINK-8660][ha] Enable user to provide custom HAServices implementation
     new 324d7be  [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name
     new ab3aabf  [hotfix] Add JavaDoc to HighAvailabilityServicesUtils#AddressResolution
     new 5d29196  [hotfix] Add ExceptionUtils#stripException
     new 6522f17  [hotfix] Unstrip UndeclaredThrowableExceptions from entrypoints

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/_includes/generated/common_section.html       |  2 +-
 .../generated/high_availability_configuration.html |  2 +-
 docs/ops/jobmanager_high_availability.md           |  1 +
 .../org/apache/flink/client/cli/CliFrontend.java   |  6 +-
 .../configuration/HighAvailabilityOptions.java     |  4 +-
 .../java/org/apache/flink/util/ExceptionUtils.java | 24 ++++--
 .../org/apache/flink/util/InstantiationUtil.java   | 19 +++++
 .../org/apache/flink/util/ExceptionUtilsTest.java  | 18 +++++
 .../mesos/entrypoint/MesosTaskExecutorRunner.java  | 15 ++--
 .../runtime/webmonitor/history/HistoryServer.java  | 13 ++--
 .../org/apache/flink/runtime/blob/BlobUtils.java   |  8 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  6 +-
 .../HighAvailabilityServicesFactory.java           | 26 +++----
 .../HighAvailabilityServicesUtils.java             | 48 +++++++++++-
 .../runtime/jobmanager/HighAvailabilityMode.java   | 29 +++++---
 .../runtime/taskexecutor/TaskManagerRunner.java    |  4 +-
 .../HighAvailabilityServicesUtilsTest.java         | 86 ++++++++++++++++++++++
 .../jobmanager/HighAvailabilityModeTest.java       | 33 ++++++++-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  | 14 ++--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 13 ++--
 20 files changed, 291 insertions(+), 80 deletions(-)
 copy flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java => flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java (59%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java


[flink] 05/05: [hotfix] Unstrip UndeclaredThrowableExceptions from entrypoints

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6522f17217e5f81829e5e51c39837bc3ce6b5ff4
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 19 17:34:42 2018 +0200

    [hotfix] Unstrip UndeclaredThrowableExceptions from entrypoints
    
    In order to better report errors while starting the cluster, we unstrip all
    UndeclaredThrowableExceptions from the entrypoints. This should give a better
    user experience.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java     |  6 ++++--
 .../flink/mesos/entrypoint/MesosTaskExecutorRunner.java   | 15 +++++++--------
 .../flink/runtime/webmonitor/history/HistoryServer.java   | 13 +++++--------
 .../flink/runtime/entrypoint/ClusterEntrypoint.java       |  6 ++++--
 .../flink/runtime/taskexecutor/TaskManagerRunner.java     |  4 +++-
 .../org/apache/flink/yarn/YarnTaskExecutorRunner.java     | 14 +++++++-------
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    | 13 ++++++++-----
 7 files changed, 38 insertions(+), 33 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index e2a260c..f8258b1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -69,6 +69,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.text.SimpleDateFormat;
@@ -1121,8 +1122,9 @@ public class CliFrontend {
 			System.exit(retCode);
 		}
 		catch (Throwable t) {
-			LOG.error("Fatal error while running command line interface.", t);
-			t.printStackTrace();
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+			LOG.error("Fatal error while running command line interface.", strippedThrowable);
+			strippedThrowable.printStackTrace();
 			System.exit(31);
 		}
 	}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index 11a4130..cc1289f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -40,8 +41,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 /**
  * The entry point for running a TaskManager in a Mesos container.
@@ -104,17 +105,15 @@ public class MesosTaskExecutorRunner {
 		SecurityUtils.install(sc);
 
 		try {
-			SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
-				@Override
-				public Integer call() throws Exception {
-					TaskManagerRunner.runTaskManager(configuration, resourceId);
+			SecurityUtils.getInstalledContext().runSecured(() -> {
+				TaskManagerRunner.runTaskManager(configuration, resourceId);
 
-					return 0;
-				}
+				return 0;
 			});
 		}
 		catch (Throwable t) {
-			LOG.error("Error while starting the TaskManager", t);
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+			LOG.error("Error while starting the TaskManager", strippedThrowable);
 			System.exit(INIT_ERROR_EXIT_CODE);
 		}
 	}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 0891426..53c5a83 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -121,14 +122,10 @@ public class HistoryServer {
 				}
 			});
 			System.exit(0);
-		} catch (UndeclaredThrowableException ute) {
-			Throwable cause = ute.getUndeclaredThrowable();
-			LOG.error("Failed to run HistoryServer.", cause);
-			cause.printStackTrace();
-			System.exit(1);
-		} catch (Exception e) {
-			LOG.error("Failed to run HistoryServer.", e);
-			e.printStackTrace();
+		} catch (Throwable t) {
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+			LOG.error("Failed to run HistoryServer.", strippedThrowable);
+			strippedThrowable.printStackTrace();
 			System.exit(1);
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 0fd4389..1a8c058 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -84,6 +84,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -192,12 +193,13 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				return null;
 			});
 		} catch (Throwable t) {
-			LOG.error("Cluster initialization failed.", t);
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+			LOG.error("Cluster initialization failed.", strippedThrowable);
 
 			shutDownAndTerminate(
 				STARTUP_FAILURE_RETURN_CODE,
 				ApplicationStatus.FAILED,
-				t.getMessage(),
+				strippedThrowable.getMessage(),
 				false);
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 5c1f420..40e628a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -301,7 +302,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 				}
 			});
 		} catch (Throwable t) {
-			LOG.error("TaskManager initialization failed.", t);
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+			LOG.error("TaskManager initialization failed.", strippedThrowable);
 			System.exit(STARTUP_FAILURE_RETURN_CODE);
 		}
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 0e70de9..a419bb8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.security.UserGroupInformation;
@@ -40,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -136,17 +138,15 @@ public class YarnTaskExecutorRunner {
 
 			SecurityUtils.install(sc);
 
-			SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
-					return null;
-				}
+			SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> {
+				TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
+				return null;
 			});
 		}
 		catch (Throwable t) {
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
 			// make sure that everything whatever ends up in the log
-			LOG.error("YARN TaskManager initialization failed.", t);
+			LOG.error("YARN TaskManager initialization failed.", strippedThrowable);
 			System.exit(INIT_ERROR_EXIT_CODE);
 		}
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index c0180a8..90aca89 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -71,6 +72,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -811,8 +813,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
 		} catch (CliArgsException e) {
 			retCode = handleCliArgsException(e);
-		} catch (Exception e) {
-			retCode = handleError(e);
+		} catch (Throwable t) {
+			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+			retCode = handleError(strippedThrowable);
 		}
 
 		System.exit(retCode);
@@ -949,15 +952,15 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		return 1;
 	}
 
-	private static int handleError(Exception e) {
-		LOG.error("Error while running the Flink Yarn session.", e);
+	private static int handleError(Throwable t) {
+		LOG.error("Error while running the Flink Yarn session.", t);
 
 		System.err.println();
 		System.err.println("------------------------------------------------------------");
 		System.err.println(" The program finished with the following exception:");
 		System.err.println();
 
-		e.printStackTrace();
+		t.printStackTrace();
 		return 1;
 	}
 


[flink] 01/05: [FLINK-8660][ha] Enable user to provide custom HAServices implementation

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bc7bc2e80af1e277ffb4906e6aca416a9de954b
Author: Krzysztof BiaƂek <kb...@users.noreply.github.com>
AuthorDate: Tue Sep 18 11:18:03 2018 +0200

    [FLINK-8660][ha] Enable user to provide custom HAServices implementation
    
    Create BlobStorage for any HA backend
    
    HighAvailabilityServicesFactory may throw exceptions
    
    Docs
    
    Use ha mode config property to specify factory class FQN
    
    Update docs
---
 docs/_includes/generated/common_section.html       |  2 +-
 .../generated/high_availability_configuration.html |  2 +-
 docs/ops/jobmanager_high_availability.md           |  1 +
 .../configuration/HighAvailabilityOptions.java     |  4 +-
 .../org/apache/flink/runtime/blob/BlobUtils.java   |  8 +--
 .../HighAvailabilityServicesFactory.java           | 39 ++++++++++
 .../HighAvailabilityServicesUtils.java             | 28 +++++++-
 .../runtime/jobmanager/HighAvailabilityMode.java   | 29 ++++----
 .../HighAvailabilityServicesUtilsTest.java         | 83 ++++++++++++++++++++++
 .../jobmanager/HighAvailabilityModeTest.java       | 33 ++++++++-
 10 files changed, 203 insertions(+), 26 deletions(-)

diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
index ea881e0..65804db 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -45,7 +45,7 @@
         <tr>
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
-            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.storageDir</h5></td>
diff --git a/docs/_includes/generated/high_availability_configuration.html b/docs/_includes/generated/high_availability_configuration.html
index b1e2ea9..398379d 100644
--- a/docs/_includes/generated/high_availability_configuration.html
+++ b/docs/_includes/generated/high_availability_configuration.html
@@ -10,7 +10,7 @@
         <tr>
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
-            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.cluster-id</h5></td>
diff --git a/docs/ops/jobmanager_high_availability.md b/docs/ops/jobmanager_high_availability.md
index 320f9d6..2c2f1a6 100644
--- a/docs/ops/jobmanager_high_availability.md
+++ b/docs/ops/jobmanager_high_availability.md
@@ -65,6 +65,7 @@ By default, the job manager will pick a *random port* for inter process communic
 In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
 
 - **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. 
 
   <pre>high-availability: zookeeper</pre>
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 787efff..5881533 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -43,6 +43,7 @@ public class HighAvailabilityOptions {
 	 * Defines high-availability mode used for the cluster execution.
 	 * A value of "NONE" signals no highly available setup.
 	 * To enable high-availability, set this mode to "ZOOKEEPER".
+	 * Can also be set to FQN of HighAvailability factory class.
 	 */
 	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_MODE =
@@ -50,7 +51,7 @@ public class HighAvailabilityOptions {
 			.defaultValue("NONE")
 			.withDeprecatedKeys("recovery.mode")
 			.withDescription("Defines high-availability mode used for the cluster execution." +
-				" To enable high-availability, set this mode to \"ZOOKEEPER\".");
+				" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
 
 	/**
 	 * The ID of the Flink cluster, used to separate multiple Flink clusters
@@ -73,7 +74,6 @@ public class HighAvailabilityOptions {
 			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
 			.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");
 
-
 	// ------------------------------------------------------------------------
 	//  Recovery Options
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index a61d679..e1caa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -88,14 +88,10 @@ public class BlobUtils {
 	 * 		thrown if the (distributed) file storage cannot be created
 	 */
 	public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
-		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
-
-		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-			return new VoidBlobStore();
-		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+		if (HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
 			return createFileSystemBlobStore(config);
 		} else {
-			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
+			return new VoidBlobStore();
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
new file mode 100644
index 0000000..0abc6de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory interface for {@link HighAvailabilityServices}.
+ */
+public interface HighAvailabilityServicesFactory {
+
+	/**
+	 * Creates an {@link HighAvailabilityServices} instance.
+	 *
+	 * @param configuration Flink configuration
+	 * @param executor background task executor
+	 * @return instance of {@link HighAvailabilityServices}
+	 * @throws Exception when HAServices cannot be created
+	 */
+	HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 778d2db..78484d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
@@ -64,11 +65,14 @@ public class HighAvailabilityServicesUtils {
 					config,
 					blobStoreService);
 
+			case FACTORY_CLASS:
+				return createCustomHAServices(config, executor);
+
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
-	
+
 	public static HighAvailabilityServices createHighAvailabilityServices(
 		Configuration configuration,
 		Executor executor,
@@ -76,7 +80,7 @@ public class HighAvailabilityServicesUtils {
 
 		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
 
-		switch(highAvailabilityMode) {
+		switch (highAvailabilityMode) {
 			case NONE:
 				final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
 
@@ -119,6 +123,10 @@ public class HighAvailabilityServicesUtils {
 					executor,
 					configuration,
 					blobStoreService);
+
+			case FACTORY_CLASS:
+				return createCustomHAServices(configuration, executor);
+
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
@@ -151,6 +159,22 @@ public class HighAvailabilityServicesUtils {
 		return Tuple2.of(hostname, port);
 	}
 
+	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception {
+		Class<HighAvailabilityServicesFactory> factoryClass;
+		try {
+			factoryClass = config.getClass(
+				HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader());
+		} catch (ClassNotFoundException e) {
+			throw new Exception("Custom HA FactoryClass not found");
+		}
+
+		if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
+			return factoryClass.newInstance().createHAServices(config, executor);
+		} else {
+			throw new Exception("Custom HA FactoryClass is not valid.");
+		}
+	}
+
 	public enum AddressResolution {
 		TRY_ADDRESS_RESOLUTION,
 		NO_ADDRESS_RESOLUTION
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 7dc13c2..65c202a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -30,10 +30,19 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
  * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
  * is responsible for the job execution. Upon failure of the leader a new leader is elected
  * which will take over the responsibilities of the old leader
+ * - FACTORY_CLASS: Use implementation of {@link org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory}
+ * specified in configuration property high-availability
  */
 public enum HighAvailabilityMode {
-	NONE,
-	ZOOKEEPER;
+	NONE(false),
+	ZOOKEEPER(true),
+	FACTORY_CLASS(true);
+
+	private final boolean haActive;
+
+	HighAvailabilityMode(boolean haActive) {
+		this.haActive = haActive;
+	}
 
 	/**
 	 * Return the configured {@link HighAvailabilityMode}.
@@ -51,7 +60,11 @@ public enum HighAvailabilityMode {
 			// Map old default to new default
 			return HighAvailabilityMode.NONE;
 		} else {
-			return HighAvailabilityMode.valueOf(haMode.toUpperCase());
+			try {
+				return HighAvailabilityMode.valueOf(haMode.toUpperCase());
+			} catch (IllegalArgumentException e) {
+				return FACTORY_CLASS;
+			}
 		}
 	}
 
@@ -63,14 +76,6 @@ public enum HighAvailabilityMode {
 	 */
 	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
 		HighAvailabilityMode mode = fromConfig(configuration);
-		switch (mode) {
-			case NONE:
-				return false;
-			case ZOOKEEPER:
-				return true;
-			default:
-				return false;
-		}
-
+		return mode.haActive;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
new file mode 100644
index 0000000..e9063ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for the {@link HighAvailabilityServicesUtils} class.
+ */
+public class HighAvailabilityServicesUtilsTest extends TestLogger {
+
+	@Test
+	public void testCreateCustomHAServices() throws Exception {
+		Configuration config = new Configuration();
+
+		HighAvailabilityServices haServices = Mockito.mock(HighAvailabilityServices.class);
+		TestHAFactory.haServices = haServices;
+
+		Executor executor = Mockito.mock(Executor.class);
+
+		config.setString(HighAvailabilityOptions.HA_MODE, TestHAFactory.class.getName());
+
+		// when
+		HighAvailabilityServices actualHaServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
+
+		// then
+		assertSame(haServices, actualHaServices);
+
+		// when
+		actualHaServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(config, executor,
+			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+		// then
+		assertSame(haServices, actualHaServices);
+	}
+
+	@Test(expected = Exception.class)
+	public void testCustomHAServicesFactoryNotDefined() throws Exception {
+		Configuration config = new Configuration();
+
+		Executor executor = Mockito.mock(Executor.class);
+
+		config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+
+		// expect
+		HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
+	}
+
+	private static class TestHAFactory implements HighAvailabilityServicesFactory {
+
+		static HighAvailabilityServices haServices;
+
+		@Override
+		public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
+			return haServices;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 91fb514..e708c10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -21,15 +21,20 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class HighAvailabilityModeTest {
+/**
+ * Tests for the {@link HighAvailabilityMode}.
+ */
+public class HighAvailabilityModeTest extends TestLogger {
 
 	// Default HA mode
-	private final static HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
+	private static final HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
 			ConfigConstants.DEFAULT_HA_MODE.toUpperCase());
 
 	/**
@@ -45,6 +50,10 @@ public class HighAvailabilityModeTest {
 		// Check not equals default
 		config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
 		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+
+		// Check factory class
+		config.setString(HighAvailabilityOptions.HA_MODE, "factory.class.FQN");
+		assertEquals(HighAvailabilityMode.FACTORY_CLASS, HighAvailabilityMode.fromConfig(config));
 	}
 
 	/**
@@ -69,4 +78,24 @@ public class HighAvailabilityModeTest {
 		assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
 	}
 
+	@Test
+	public void testCheckHighAvailabilityModeActivated() throws Exception {
+		Configuration config = new Configuration();
+
+		// check defaults
+		assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+		// check NONE
+		config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase());
+		assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+		// check ZOOKEEPER
+		config.setString("high-availability", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+		// check FACTORY_CLASS
+		config.setString("high-availability", HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+		assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+	}
+
 }


[flink] 03/05: [hotfix] Add JavaDoc to HighAvailabilityServicesUtils#AddressResolution

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab3aabfac58ccea97ae12f9fdf9b0f11f6a39f97
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 18 16:20:41 2018 +0200

    [hotfix] Add JavaDoc to HighAvailabilityServicesUtils#AddressResolution
---
 .../flink/runtime/highavailability/HighAvailabilityServicesUtils.java | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 05f96ce..ee919c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -191,6 +191,10 @@ public class HighAvailabilityServicesUtils {
 		}
 	}
 
+	/**
+	 * Enum specifying whether address resolution should be tried or not when creating the
+	 * {@link HighAvailabilityServices}.
+	 */
 	public enum AddressResolution {
 		TRY_ADDRESS_RESOLUTION,
 		NO_ADDRESS_RESOLUTION


[flink] 04/05: [hotfix] Add ExceptionUtils#stripException

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d291961604be26f8be1228e2bf68ab39eec30ed
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 19 17:28:19 2018 +0200

    [hotfix] Add ExceptionUtils#stripException
    
    stripException strips a given throwable from a specified exception type. This
    is useful to unwrap exceptions.
---
 .../java/org/apache/flink/util/ExceptionUtils.java | 24 ++++++++++++++--------
 .../org/apache/flink/util/ExceptionUtilsTest.java  | 18 ++++++++++++++++
 2 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 601a252..0ea43ee 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -382,11 +382,7 @@ public final class ExceptionUtils {
 	 * @return Cause of ExecutionException or given Throwable
 	 */
 	public static Throwable stripExecutionException(Throwable throwable) {
-		while (throwable instanceof ExecutionException && throwable.getCause() != null) {
-			throwable = throwable.getCause();
-		}
-
-		return throwable;
+		return stripException(throwable, ExecutionException.class);
 	}
 
 	/**
@@ -397,11 +393,23 @@ public final class ExceptionUtils {
 	 * @return Cause of CompletionException or given Throwable
 	 */
 	public static Throwable stripCompletionException(Throwable throwable) {
-		while (throwable instanceof CompletionException && throwable.getCause() != null) {
-			throwable = throwable.getCause();
+		return stripException(throwable, CompletionException.class);
+	}
+
+	/**
+	 * Unpacks an specified exception and returns its cause. Otherwise the given
+	 * {@link Throwable} is returned.
+	 *
+	 * @param throwableToStrip to strip
+	 * @param typeToStrip type to strip
+	 * @return Unpacked cause or given Throwable if not packed
+	 */
+	public static Throwable stripException(Throwable throwableToStrip, Class<? extends Throwable> typeToStrip) {
+		while (typeToStrip.isAssignableFrom(throwableToStrip.getClass()) && throwableToStrip.getCause() != null) {
+			throwableToStrip = throwableToStrip.getCause();
 		}
 
-		return throwable;
+		return throwableToStrip;
 	}
 
 	/**
diff --git a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
index 07978a5..fa275d0 100644
--- a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.util;
 
 import org.junit.Test;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -69,4 +72,19 @@ public class ExceptionUtilsTest extends TestLogger {
 			IllegalStateException.class).isPresent());
 	}
 
+	@Test
+	public void testExceptionStripping() {
+		final FlinkException expectedException = new FlinkException("test exception");
+		final Throwable strippedException = ExceptionUtils.stripException(new RuntimeException(new RuntimeException(expectedException)), RuntimeException.class);
+
+		assertThat(strippedException, is(equalTo(expectedException)));
+	}
+
+	@Test
+	public void testInvalidExceptionStripping() {
+		final FlinkException expectedException = new FlinkException(new RuntimeException(new FlinkException("inner exception")));
+		final Throwable strippedException = ExceptionUtils.stripException(expectedException, RuntimeException.class);
+
+		assertThat(strippedException, is(equalTo(expectedException)));
+	}
 }


[flink] 02/05: [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 324d7bed226cb707512879842c33788f7f715ff2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 18 16:10:13 2018 +0200

    [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name
    
    InstantiationUtil#instantiate takes a class name, a target type and a class loader to load
    a class of the given class name and create an instance of it.
---
 .../org/apache/flink/util/InstantiationUtil.java   | 19 ++++++++++++
 .../HighAvailabilityServicesUtils.java             | 36 ++++++++++++++++------
 .../HighAvailabilityServicesUtilsTest.java         |  5 ++-
 3 files changed, 49 insertions(+), 11 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 2370c7c..a36560e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -287,6 +287,25 @@ public final class InstantiationUtil {
 	}
 
 	/**
+	 * Creates a new instance of the given class name and type using the provided {@link ClassLoader}.
+	 *
+	 * @param className of the class to load
+	 * @param targetType type of the instantiated class
+	 * @param classLoader to use for loading the class
+	 * @param <T> type of the instantiated class
+	 * @return Instance of the given class name
+	 * @throws ClassNotFoundException if the class could not be found
+	 */
+	public static <T> T instantiate(final String className, final Class<T> targetType, final ClassLoader classLoader) throws ClassNotFoundException {
+		final Class<? extends T> clazz = Class.forName(
+			className,
+			false,
+			classLoader).asSubclass(targetType);
+
+		return instantiate(clazz);
+	}
+
+	/**
 	 * Creates a new instance of the given class.
 	 *
 	 * @param <T> The generic type of the class.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 78484d3..05f96ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.util.concurrent.Executor;
 
@@ -159,19 +161,33 @@ public class HighAvailabilityServicesUtils {
 		return Tuple2.of(hostname, port);
 	}
 
-	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception {
-		Class<HighAvailabilityServicesFactory> factoryClass;
+	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
+		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+		final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
+
+		final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
+
 		try {
-			factoryClass = config.getClass(
-				HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader());
-		} catch (ClassNotFoundException e) {
-			throw new Exception("Custom HA FactoryClass not found");
+			highAvailabilityServicesFactory = InstantiationUtil.instantiate(
+				haServicesClassName,
+				HighAvailabilityServicesFactory.class,
+				classLoader);
+		} catch (Exception e) {
+			throw new FlinkException(
+				String.format(
+					"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
+					haServicesClassName),
+				e);
 		}
 
-		if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
-			return factoryClass.newInstance().createHAServices(config, executor);
-		} else {
-			throw new Exception("Custom HA FactoryClass is not valid.");
+		try {
+			return highAvailabilityServicesFactory.createHAServices(config, executor);
+		} catch (Exception e) {
+			throw new FlinkException(
+				String.format(
+					"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
+					haServicesClassName),
+				e);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
index e9063ac..c4f6473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -71,7 +71,10 @@ public class HighAvailabilityServicesUtilsTest extends TestLogger {
 		HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
 	}
 
-	private static class TestHAFactory implements HighAvailabilityServicesFactory {
+	/**
+	 * Testing class which needs to be public in order to be instantiatable.
+	 */
+	public static class TestHAFactory implements HighAvailabilityServicesFactory {
 
 		static HighAvailabilityServices haServices;