You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/10/25 14:47:54 UTC
[flink] 04/08: [FLINK-14501] Add the ClusterClientFactory and make
it discoverable
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 74e35f00b2daa7afd73675b2a53ce5cd09979b70
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:22:36 2019 +0200
[FLINK-14501] Add the ClusterClientFactory and make it discoverable
---
.../client/deployment/ClusterClientFactory.java | 63 ++++++++++++++++++
.../deployment/ClusterClientServiceLoader.java | 36 +++++++++++
.../DefaultClusterClientServiceLoader.java | 75 ++++++++++++++++++++++
3 files changed, 174 insertions(+)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
new file mode 100644
index 0000000..36647b6
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory containing all the necessary information for creating clients to Flink clusters.
+ */
+public interface ClusterClientFactory<ClusterID> {
+
+ /**
+ * Returns {@code true} if the current {@link ClusterClientFactory} is compatible with the provided configuration,
+ * {@code false} otherwise.
+ */
+ boolean isCompatibleWith(Configuration configuration);
+
+ /**
+ * Create a {@link ClusterDescriptor} from the given configuration.
+ *
+ * @param configuration containing the configuration options relevant for the {@link ClusterDescriptor}
+ * @return the corresponding {@link ClusterDescriptor}.
+ */
+ ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
+
+ /**
+ * Returns the cluster id if a cluster id is specified in the provided configuration, otherwise it returns {@code null}.
+ *
+ * <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink cluster running on Yarn.
+ *
+ * @param configuration containing the configuration options relevant for the cluster id retrieval
+ * @return Cluster id identifying the cluster to deploy jobs to or null
+ */
+ @Nullable
+ ClusterID getClusterId(Configuration configuration);
+
+ /**
+ * Returns the {@link ClusterSpecification} specified by the configuration and the command
+ * line options. This specification can be used to deploy a new Flink cluster.
+ *
+ * @param configuration containing the configuration options relevant for the {@link ClusterSpecification}
+ * @return the corresponding {@link ClusterSpecification} for a new Flink cluster
+ */
+ ClusterSpecification getClusterSpecification(Configuration configuration);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
new file mode 100644
index 0000000..51eef13
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An interface used to discover the appropriate {@link ClusterClientFactory cluster client factory} based on the
+ * provided {@link Configuration}.
+ */
+public interface ClusterClientServiceLoader {
+
+ /**
+ * Discovers the appropriate {@link ClusterClientFactory} based on the provided configuration.
+ *
+ * @param configuration the configuration based on which the appropriate factory is going to be used.
+ * @return the appropriate {@link ClusterClientFactory}.
+ */
+ <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
new file mode 100644
index 0000000..574aeaf
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A service provider for {@link ClusterClientFactory cluster client factories}.
+ */
+public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
+
+ private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
+
+ @Override
+ public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
+ checkNotNull(configuration);
+
+ final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
+ final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
+ while (factories.hasNext()) {
+ try {
+ final ClusterClientFactory factory = factories.next();
+ if (factory != null && factory.isCompatibleWith(configuration)) {
+ compatibleFactories.add(factory);
+ }
+ } catch (Throwable e) {
+ if (e.getCause() instanceof NoClassDefFoundError) {
+ LOG.info("Could not load factory due to missing dependencies.");
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ if (compatibleFactories.size() > 1) {
+ final List<String> configStr =
+ configuration.toMap().entrySet().stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .collect(Collectors.toList());
+
+ throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+ }
+
+ return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
+ }
+}