You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/20 09:40:57 UTC

[flink] 02/05: [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 324d7bed226cb707512879842c33788f7f715ff2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 18 16:10:13 2018 +0200

    [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name
    
    InstantiationUtil#instantiate takes a class name, a target type and a class loader to load
    a class of the given class name and create an instance of it.
---
 .../org/apache/flink/util/InstantiationUtil.java   | 19 ++++++++++++
 .../HighAvailabilityServicesUtils.java             | 36 ++++++++++++++++------
 .../HighAvailabilityServicesUtilsTest.java         |  5 ++-
 3 files changed, 49 insertions(+), 11 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 2370c7c..a36560e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -287,6 +287,25 @@ public final class InstantiationUtil {
 	}
 
 	/**
+	 * Creates a new instance of the given class name and type using the provided {@link ClassLoader}.
+	 *
+	 * @param className of the class to load
+	 * @param targetType type of the instantiated class
+	 * @param classLoader to use for loading the class
+	 * @param <T> type of the instantiated class
+	 * @return Instance of the given class name
+	 * @throws ClassNotFoundException if the class could not be found
+	 */
+	public static <T> T instantiate(final String className, final Class<T> targetType, final ClassLoader classLoader) throws ClassNotFoundException {
+		final Class<? extends T> clazz = Class.forName(
+			className,
+			false,
+			classLoader).asSubclass(targetType);
+
+		return instantiate(clazz);
+	}
+
+	/**
 	 * Creates a new instance of the given class.
 	 *
 	 * @param <T> The generic type of the class.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 78484d3..05f96ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.util.concurrent.Executor;
 
@@ -159,19 +161,33 @@ public class HighAvailabilityServicesUtils {
 		return Tuple2.of(hostname, port);
 	}
 
-	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception {
-		Class<HighAvailabilityServicesFactory> factoryClass;
+	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
+		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+		final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
+
+		final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
+
 		try {
-			factoryClass = config.getClass(
-				HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader());
-		} catch (ClassNotFoundException e) {
-			throw new Exception("Custom HA FactoryClass not found");
+			highAvailabilityServicesFactory = InstantiationUtil.instantiate(
+				haServicesClassName,
+				HighAvailabilityServicesFactory.class,
+				classLoader);
+		} catch (Exception e) {
+			throw new FlinkException(
+				String.format(
+					"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
+					haServicesClassName),
+				e);
 		}
 
-		if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
-			return factoryClass.newInstance().createHAServices(config, executor);
-		} else {
-			throw new Exception("Custom HA FactoryClass is not valid.");
+		try {
+			return highAvailabilityServicesFactory.createHAServices(config, executor);
+		} catch (Exception e) {
+			throw new FlinkException(
+				String.format(
+					"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
+					haServicesClassName),
+				e);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
index e9063ac..c4f6473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -71,7 +71,10 @@ public class HighAvailabilityServicesUtilsTest extends TestLogger {
 		HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
 	}
 
-	private static class TestHAFactory implements HighAvailabilityServicesFactory {
+	/**
+	 * Testing class which needs to be public in order to be instantiatable.
+	 */
+	public static class TestHAFactory implements HighAvailabilityServicesFactory {
 
 		static HighAvailabilityServices haServices;