You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/10/25 14:47:54 UTC

[flink] 04/08: [FLINK-14501] Add the ClusterClientFactory and make it discoverable

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74e35f00b2daa7afd73675b2a53ce5cd09979b70
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:22:36 2019 +0200

    [FLINK-14501] Add the ClusterClientFactory and make it discoverable
---
 .../client/deployment/ClusterClientFactory.java    | 63 ++++++++++++++++++
 .../deployment/ClusterClientServiceLoader.java     | 36 +++++++++++
 .../DefaultClusterClientServiceLoader.java         | 75 ++++++++++++++++++++++
 3 files changed, 174 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
new file mode 100644
index 0000000..36647b6
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory containing all the necessary information for creating clients to Flink clusters.
+ */
+public interface ClusterClientFactory<ClusterID> {
+
+	/**
+	 * Returns {@code true} if the current {@link ClusterClientFactory} is compatible with the provided configuration,
+	 * {@code false} otherwise.
+	 */
+	boolean isCompatibleWith(Configuration configuration);
+
+	/**
+	 * Create a {@link ClusterDescriptor} from the given configuration.
+	 *
+	 * @param configuration containing the configuration options relevant for the {@link ClusterDescriptor}
+	 * @return the corresponding {@link ClusterDescriptor}.
+	 */
+	ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
+
+	/**
+	 * Returns the cluster id if a cluster id is specified in the provided configuration, otherwise it returns {@code null}.
+	 *
+	 * <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink cluster running on Yarn.
+	 *
+	 * @param configuration containing the configuration options relevant for the cluster id retrieval
+	 * @return Cluster id identifying the cluster to deploy jobs to or null
+	 */
+	@Nullable
+	ClusterID getClusterId(Configuration configuration);
+
+	/**
+	 * Returns the {@link ClusterSpecification} specified by the configuration and the command
+	 * line options. This specification can be used to deploy a new Flink cluster.
+	 *
+	 * @param configuration containing the configuration options relevant for the {@link ClusterSpecification}
+	 * @return the corresponding {@link ClusterSpecification} for a new Flink cluster
+	 */
+	ClusterSpecification getClusterSpecification(Configuration configuration);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
new file mode 100644
index 0000000..51eef13
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An interface used to discover the appropriate {@link ClusterClientFactory cluster client factory} based on the
+ * provided {@link Configuration}.
+ */
+public interface ClusterClientServiceLoader {
+
+	/**
+	 * Discovers the appropriate {@link ClusterClientFactory} based on the provided configuration.
+	 *
+	 * @param configuration the configuration based on which the appropriate factory is going to be used.
+	 * @return the appropriate {@link ClusterClientFactory}.
+	 */
+	<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
new file mode 100644
index 0000000..574aeaf
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A service provider for {@link ClusterClientFactory cluster client factories}.
+ */
+public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
+
+	private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
+
+	@Override
+	public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
+		checkNotNull(configuration);
+
+		final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
+		final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
+		while (factories.hasNext()) {
+			try {
+				final ClusterClientFactory factory = factories.next();
+				if (factory != null && factory.isCompatibleWith(configuration)) {
+					compatibleFactories.add(factory);
+				}
+			} catch (Throwable e) {
+				if (e.getCause() instanceof NoClassDefFoundError) {
+					LOG.info("Could not load factory due to missing dependencies.");
+				} else {
+					throw e;
+				}
+			}
+		}
+
+		if (compatibleFactories.size() > 1) {
+			final List<String> configStr =
+					configuration.toMap().entrySet().stream()
+							.map(e -> e.getKey() + "=" + e.getValue())
+							.collect(Collectors.toList());
+
+			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+		}
+
+		return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
+	}
+}