You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/10/25 14:47:55 UTC
[flink] 05/08: [FLINK-14501] Add Standalone and Yarn
ClusterClientFactories
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7777994df82c6fbae7fe101551a742f13822de00
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:38:28 2019 +0200
[FLINK-14501] Add Standalone and Yarn ClusterClientFactories
---
.../client/deployment/StandaloneClientFactory.java | 59 +++++++
...he.flink.client.deployment.ClusterClientFactory | 16 ++
.../deployment/ClusterClientServiceLoaderTest.java | 167 ++++++++++++++++++++
...he.flink.client.deployment.ClusterClientFactory | 18 +++
.../flink/yarn/YarnClusterClientFactory.java | 173 +++++++++++++++++++++
...he.flink.client.deployment.ClusterClientFactory | 16 ++
.../flink/yarn/YarnClusterClientFactoryTest.java | 47 ++++++
7 files changed, 496 insertions(+)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
new file mode 100644
index 0000000..b441a63
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ClusterClientFactory} for a standalone cluster, i.e. Flink on bare-metal.
+ */
+public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
+
+ public static final String ID = "default";
+
+ @Override
+ public boolean isCompatibleWith(Configuration configuration) {
+ checkNotNull(configuration);
+ return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+ }
+
+ @Override
+ public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
+ checkNotNull(configuration);
+ return new StandaloneClusterDescriptor(configuration);
+ }
+
+ @Override
+ @Nullable
+ public StandaloneClusterId getClusterId(Configuration configuration) {
+ checkNotNull(configuration);
+ return StandaloneClusterId.getInstance();
+ }
+
+ @Override
+ public ClusterSpecification getClusterSpecification(Configuration configuration) {
+ checkNotNull(configuration);
+ return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
+ }
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..fd9e4fa
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.StandaloneClientFactory
\ No newline at end of file
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
new file mode 100644
index 0000000..45157a2
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultClusterClientServiceLoader}.
+ */
+public class ClusterClientServiceLoaderTest {
+
+ private static final String VALID_TARGET = "existing";
+ private static final String AMBIGUOUS_TARGET = "duplicate";
+ private static final String NON_EXISTING_TARGET = "non-existing";
+
+ private static final int VALID_ID = 42;
+
+ private ClusterClientServiceLoader serviceLoaderUnderTest;
+
+ @Before
+ public void init() {
+ serviceLoaderUnderTest = new DefaultClusterClientServiceLoader();
+ }
+
+ @Test
+ public void testStandaloneClusterClientFactoryDiscovery() {
+ final Configuration config = new Configuration();
+ config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID);
+
+ ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+ assertTrue(factory instanceof StandaloneClientFactory);
+ }
+
+ @Test
+ public void testFactoryDiscovery() {
+ final Configuration config = new Configuration();
+ config.setString(DeploymentOptions.TARGET, VALID_TARGET);
+
+ final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+ assertNotNull(factory);
+
+ final Integer id = factory.getClusterId(config);
+ assertThat(id, allOf(is(notNullValue()), equalTo(VALID_ID)));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMoreThanOneCompatibleFactoriesException() {
+ final Configuration config = new Configuration();
+ config.setString(DeploymentOptions.TARGET, AMBIGUOUS_TARGET);
+
+ serviceLoaderUnderTest.getClusterClientFactory(config);
+ fail();
+ }
+
+ @Test
+ public void testNoFactoriesFound() {
+ final Configuration config = new Configuration();
+ config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET);
+
+ final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+ assertNull(factory);
+ }
+
+ /**
+ * Test {@link ClusterClientFactory} that is successfully discovered.
+ */
+ public static class ValidClusterClientFactory extends DummyClusterClientFactory {
+
+ public static final String ID = VALID_TARGET;
+
+ @Override
+ public boolean isCompatibleWith(Configuration configuration) {
+ return configuration.getString(DeploymentOptions.TARGET).equals(VALID_TARGET);
+ }
+
+ @Nullable
+ @Override
+ public Integer getClusterId(Configuration configuration) {
+ return VALID_ID;
+ }
+ }
+
+ /**
+ * Test {@link ClusterClientFactory} that has a duplicate.
+ */
+ public static class FirstCollidingClusterClientFactory extends DummyClusterClientFactory {
+
+ public static final String ID = AMBIGUOUS_TARGET;
+
+ @Override
+ public boolean isCompatibleWith(Configuration configuration) {
+ return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET);
+ }
+ }
+
+ /**
+ * Test {@link ClusterClientFactory} that has a duplicate.
+ */
+ public static class SecondCollidingClusterClientFactory extends DummyClusterClientFactory {
+
+ public static final String ID = AMBIGUOUS_TARGET;
+
+ @Override
+ public boolean isCompatibleWith(Configuration configuration) {
+ return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET);
+ }
+ }
+
+ /**
+ * A base test {@link ClusterClientFactory} that supports no operation and is meant to be extended.
+ */
+ public static class DummyClusterClientFactory implements ClusterClientFactory<Integer> {
+
+ @Override
+ public boolean isCompatibleWith(Configuration configuration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterDescriptor<Integer> createClusterDescriptor(Configuration configuration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Integer getClusterId(Configuration configuration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterSpecification getClusterSpecification(Configuration configuration) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..930cea9
--- /dev/null
+++ b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$ValidClusterClientFactory
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$FirstCollidingClusterClientFactory
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$SecondCollidingClusterClientFactory
\ No newline at end of file
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
new file mode 100644
index 0000000..a0d9ab2
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.yarn.cli.YarnConfigUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ClusterClientFactory} for a YARN cluster.
+ */
+public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientFactory.class);
+
+ public static final String ID = "yarn-cluster";
+
+ @Override
+ public boolean isCompatibleWith(Configuration configuration) {
+ checkNotNull(configuration);
+ return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+ }
+
+ @Override
+ public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
+ checkNotNull(configuration);
+
+ final YarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(configuration);
+ yarnClusterDescriptor.setDetachedMode(!configuration.getBoolean(DeploymentOptions.ATTACHED));
+
+ getLocalFlinkDistPath(configuration, yarnClusterDescriptor)
+ .ifPresent(yarnClusterDescriptor::setLocalJarPath);
+
+ decodeDirsToShipToCluster(configuration)
+ .ifPresent(yarnClusterDescriptor::addShipFiles);
+
+ handleConfigOption(configuration, YarnConfigOptions.APPLICATION_QUEUE, yarnClusterDescriptor::setQueue);
+ handleConfigOption(configuration, YarnConfigOptions.DYNAMIC_PROPERTIES, yarnClusterDescriptor::setDynamicPropertiesEncoded);
+ handleConfigOption(configuration, YarnConfigOptions.APPLICATION_NAME, yarnClusterDescriptor::setName);
+ handleConfigOption(configuration, YarnConfigOptions.APPLICATION_TYPE, yarnClusterDescriptor::setApplicationType);
+ handleConfigOption(configuration, YarnConfigOptions.NODE_LABEL, yarnClusterDescriptor::setNodeLabel);
+ handleConfigOption(configuration, HighAvailabilityOptions.HA_CLUSTER_ID, yarnClusterDescriptor::setZookeeperNamespace);
+ return yarnClusterDescriptor;
+ }
+
+ @Nullable
+ @Override
+ public ApplicationId getClusterId(Configuration configuration) {
+ checkNotNull(configuration);
+ final String clusterId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
+ return clusterId != null ? ConverterUtils.toApplicationId(clusterId) : null;
+ }
+
+ @Override
+ public ClusterSpecification getClusterSpecification(Configuration configuration) {
+ checkNotNull(configuration);
+
+ // JobManager Memory
+ final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
+
+ // Task Managers memory
+ final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+
+ int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+
+ return new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(jobManagerMemoryMB)
+ .setTaskManagerMemoryMB(taskManagerMemoryMB)
+ .setSlotsPerTaskManager(slotsPerTaskManager)
+ .createClusterSpecification();
+ }
+
+ private Optional<List<File>> decodeDirsToShipToCluster(final Configuration configuration) {
+ checkNotNull(configuration);
+
+ final List<File> files = YarnConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
+ return files.isEmpty() ? Optional.empty() : Optional.of(files);
+ }
+
+ private Optional<Path> getLocalFlinkDistPath(final Configuration configuration, final YarnClusterDescriptor yarnClusterDescriptor) {
+ final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
+ if (localJarPath != null) {
+ return Optional.of(new Path(localJarPath));
+ }
+
+ LOG.info("No path for the flink jar passed. Using the location of " + yarnClusterDescriptor.getClass() + " to locate the jar");
+
+ // check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
+ final String decodedPath = getDecodedJarPath(yarnClusterDescriptor);
+ return decodedPath.endsWith(".jar")
+ ? Optional.of(new Path(new File(decodedPath).toURI()))
+ : Optional.empty();
+ }
+
+ private String getDecodedJarPath(final YarnClusterDescriptor yarnClusterDescriptor) {
+ final String encodedJarPath = yarnClusterDescriptor
+ .getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ try {
+ return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+ " You can supply a path manually via the command line.");
+ }
+ }
+
+ private void handleConfigOption(final Configuration configuration, final ConfigOption<String> option, final Consumer<String> consumer) {
+ checkNotNull(configuration);
+ checkNotNull(option);
+ checkNotNull(consumer);
+
+ final String value = configuration.getString(option);
+ if (value != null) {
+ consumer.accept(value);
+ }
+ }
+
+ private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
+ final YarnClient yarnClient = YarnClient.createYarnClient();
+ final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+
+ yarnClient.init(yarnConfiguration);
+ yarnClient.start();
+
+ return new YarnClusterDescriptor(
+ configuration,
+ yarnConfiguration,
+ yarnClient,
+ false);
+ }
+}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..ea1c4de
--- /dev/null
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.YarnClusterClientFactory
\ No newline at end of file
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
new file mode 100644
index 0000000..931313a
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for the {@link YarnClusterClientFactory} discovery.
+ */
+public class YarnClusterClientFactoryTest {
+
+ @Test
+ public void testYarnClusterClientFactoryDiscovery() {
+ final Configuration configuration = new Configuration();
+ configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID);
+
+ final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
+ final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);
+
+ assertTrue(factory instanceof YarnClusterClientFactory);
+ }
+}