You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/20 09:40:57 UTC
[flink] 02/05: [FLINK-8660][ha] Add InstantiationUtil#instantiate
to create instance from class name
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 324d7bed226cb707512879842c33788f7f715ff2
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 18 16:10:13 2018 +0200
[FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name
InstantiationUtil#instantiate takes a class name, a target type and a class loader to load
a class of the given class name and create an instance of it.
---
.../org/apache/flink/util/InstantiationUtil.java | 19 ++++++++++++
.../HighAvailabilityServicesUtils.java | 36 ++++++++++++++++------
.../HighAvailabilityServicesUtilsTest.java | 5 ++-
3 files changed, 49 insertions(+), 11 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 2370c7c..a36560e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -287,6 +287,25 @@ public final class InstantiationUtil {
}
/**
+ * Creates a new instance of the given class name and type using the provided {@link ClassLoader}.
+ *
+ * @param className of the class to load
+ * @param targetType type of the instantiated class
+ * @param classLoader to use for loading the class
+ * @param <T> type of the instantiated class
+ * @return Instance of the given class name
+ * @throws ClassNotFoundException if the class could not be found
+ */
+ public static <T> T instantiate(final String className, final Class<T> targetType, final ClassLoader classLoader) throws ClassNotFoundException {
+ final Class<? extends T> clazz = Class.forName(
+ className,
+ false,
+ classLoader).asSubclass(targetType);
+
+ return instantiate(clazz);
+ }
+
+ /**
* Creates a new instance of the given class.
*
* @param <T> The generic type of the class.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 78484d3..05f96ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import java.util.concurrent.Executor;
@@ -159,19 +161,33 @@ public class HighAvailabilityServicesUtils {
return Tuple2.of(hostname, port);
}
- private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception {
- Class<HighAvailabilityServicesFactory> factoryClass;
+ private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
+
+ final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
+
try {
- factoryClass = config.getClass(
- HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader());
- } catch (ClassNotFoundException e) {
- throw new Exception("Custom HA FactoryClass not found");
+ highAvailabilityServicesFactory = InstantiationUtil.instantiate(
+ haServicesClassName,
+ HighAvailabilityServicesFactory.class,
+ classLoader);
+ } catch (Exception e) {
+ throw new FlinkException(
+ String.format(
+ "Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
+ haServicesClassName),
+ e);
}
- if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
- return factoryClass.newInstance().createHAServices(config, executor);
- } else {
- throw new Exception("Custom HA FactoryClass is not valid.");
+ try {
+ return highAvailabilityServicesFactory.createHAServices(config, executor);
+ } catch (Exception e) {
+ throw new FlinkException(
+ String.format(
+ "Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
+ haServicesClassName),
+ e);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
index e9063ac..c4f6473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -71,7 +71,10 @@ public class HighAvailabilityServicesUtilsTest extends TestLogger {
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor);
}
- private static class TestHAFactory implements HighAvailabilityServicesFactory {
+ /**
+ * Testing class which needs to be public in order to be instantiatable.
+ */
+ public static class TestHAFactory implements HighAvailabilityServicesFactory {
static HighAvailabilityServices haServices;