You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/10/25 14:47:55 UTC

[flink] 05/08: [FLINK-14501] Add Standalone and Yarn ClusterClientFactories

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7777994df82c6fbae7fe101551a742f13822de00
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:38:28 2019 +0200

    [FLINK-14501] Add Standalone and Yarn ClusterClientFactories
---
 .../client/deployment/StandaloneClientFactory.java |  59 +++++++
 ...he.flink.client.deployment.ClusterClientFactory |  16 ++
 .../deployment/ClusterClientServiceLoaderTest.java | 167 ++++++++++++++++++++
 ...he.flink.client.deployment.ClusterClientFactory |  18 +++
 .../flink/yarn/YarnClusterClientFactory.java       | 173 +++++++++++++++++++++
 ...he.flink.client.deployment.ClusterClientFactory |  16 ++
 .../flink/yarn/YarnClusterClientFactoryTest.java   |  47 ++++++
 7 files changed, 496 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
new file mode 100644
index 0000000..b441a63
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ClusterClientFactory} for a standalone cluster, i.e. Flink on bare-metal.
+ */
+public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
+
+	public static final String ID = "default";
+
+	@Override
+	public boolean isCompatibleWith(Configuration configuration) {
+		checkNotNull(configuration);
+		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+	}
+
+	@Override
+	public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
+		checkNotNull(configuration);
+		return new StandaloneClusterDescriptor(configuration);
+	}
+
+	@Override
+	@Nullable
+	public StandaloneClusterId getClusterId(Configuration configuration) {
+		checkNotNull(configuration);
+		return StandaloneClusterId.getInstance();
+	}
+
+	@Override
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
+		checkNotNull(configuration);
+		return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
+	}
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..fd9e4fa
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.StandaloneClientFactory
\ No newline at end of file
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
new file mode 100644
index 0000000..45157a2
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultClusterClientServiceLoader}.
+ */
+public class ClusterClientServiceLoaderTest {
+
+	private static final String VALID_TARGET = "existing";
+	private static final String AMBIGUOUS_TARGET = "duplicate";
+	private static final String NON_EXISTING_TARGET = "non-existing";
+
+	private static final int VALID_ID = 42;
+
+	private ClusterClientServiceLoader serviceLoaderUnderTest;
+
+	@Before
+	public void init() {
+		serviceLoaderUnderTest = new DefaultClusterClientServiceLoader();
+	}
+
+	@Test
+	public void testStandaloneClusterClientFactoryDiscovery() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID);
+
+		ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+		assertTrue(factory instanceof StandaloneClientFactory);
+	}
+
+	@Test
+	public void testFactoryDiscovery() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, VALID_TARGET);
+
+		final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+		assertNotNull(factory);
+
+		final Integer id = factory.getClusterId(config);
+		assertThat(id, allOf(is(notNullValue()), equalTo(VALID_ID)));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testMoreThanOneCompatibleFactoriesException() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, AMBIGUOUS_TARGET);
+
+		serviceLoaderUnderTest.getClusterClientFactory(config);
+		fail();
+	}
+
+	@Test
+	public void testNoFactoriesFound() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET);
+
+		final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+		assertNull(factory);
+	}
+
+	/**
+	 * Test {@link ClusterClientFactory} that is successfully discovered.
+	 */
+	public static class ValidClusterClientFactory extends DummyClusterClientFactory {
+
+		public static final String ID = VALID_TARGET;
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			return configuration.getString(DeploymentOptions.TARGET).equals(VALID_TARGET);
+		}
+
+		@Nullable
+		@Override
+		public Integer getClusterId(Configuration configuration) {
+			return VALID_ID;
+		}
+	}
+
+	/**
+	 * Test {@link ClusterClientFactory} that has a duplicate.
+	 */
+	public static class FirstCollidingClusterClientFactory extends DummyClusterClientFactory {
+
+		public static final String ID = AMBIGUOUS_TARGET;
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET);
+		}
+	}
+
+	/**
+	 * Test {@link ClusterClientFactory} that has a duplicate.
+	 */
+	public static class SecondCollidingClusterClientFactory extends DummyClusterClientFactory {
+
+		public static final String ID = AMBIGUOUS_TARGET;
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET);
+		}
+	}
+
+	/**
+	 * A base test {@link ClusterClientFactory} that supports no operation and is meant to be extended.
+	 */
+	public static class DummyClusterClientFactory implements ClusterClientFactory<Integer> {
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public ClusterDescriptor<Integer> createClusterDescriptor(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Nullable
+		@Override
+		public Integer getClusterId(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public ClusterSpecification getClusterSpecification(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+	}
+}
diff --git a/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..930cea9
--- /dev/null
+++ b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$ValidClusterClientFactory
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$FirstCollidingClusterClientFactory
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$SecondCollidingClusterClientFactory
\ No newline at end of file
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
new file mode 100644
index 0000000..a0d9ab2
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.yarn.cli.YarnConfigUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ClusterClientFactory} for a YARN cluster.
+ */
+public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientFactory.class);
+
+	public static final String ID = "yarn-cluster";
+
+	@Override
+	public boolean isCompatibleWith(Configuration configuration) {
+		checkNotNull(configuration);
+		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+	}
+
+	@Override
+	public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
+		checkNotNull(configuration);
+
+		final YarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(configuration);
+		yarnClusterDescriptor.setDetachedMode(!configuration.getBoolean(DeploymentOptions.ATTACHED));
+
+		getLocalFlinkDistPath(configuration, yarnClusterDescriptor)
+				.ifPresent(yarnClusterDescriptor::setLocalJarPath);
+
+		decodeDirsToShipToCluster(configuration)
+				.ifPresent(yarnClusterDescriptor::addShipFiles);
+
+		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_QUEUE, yarnClusterDescriptor::setQueue);
+		handleConfigOption(configuration, YarnConfigOptions.DYNAMIC_PROPERTIES, yarnClusterDescriptor::setDynamicPropertiesEncoded);
+		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_NAME, yarnClusterDescriptor::setName);
+		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_TYPE, yarnClusterDescriptor::setApplicationType);
+		handleConfigOption(configuration, YarnConfigOptions.NODE_LABEL, yarnClusterDescriptor::setNodeLabel);
+		handleConfigOption(configuration, HighAvailabilityOptions.HA_CLUSTER_ID, yarnClusterDescriptor::setZookeeperNamespace);
+		return yarnClusterDescriptor;
+	}
+
+	@Nullable
+	@Override
+	public ApplicationId getClusterId(Configuration configuration) {
+		checkNotNull(configuration);
+		final String clusterId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
+		return clusterId != null ? ConverterUtils.toApplicationId(clusterId) : null;
+	}
+
+	@Override
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
+		checkNotNull(configuration);
+
+		// JobManager Memory
+		final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
+
+		// Task Managers memory
+		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+
+		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+
+		return new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(jobManagerMemoryMB)
+				.setTaskManagerMemoryMB(taskManagerMemoryMB)
+				.setSlotsPerTaskManager(slotsPerTaskManager)
+				.createClusterSpecification();
+	}
+
+	private Optional<List<File>> decodeDirsToShipToCluster(final Configuration configuration) {
+		checkNotNull(configuration);
+
+		final List<File> files = YarnConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
+		return files.isEmpty() ? Optional.empty() : Optional.of(files);
+	}
+
+	private Optional<Path> getLocalFlinkDistPath(final Configuration configuration, final YarnClusterDescriptor yarnClusterDescriptor) {
+		final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
+		if (localJarPath != null) {
+			return Optional.of(new Path(localJarPath));
+		}
+
+		LOG.info("No path for the flink jar passed. Using the location of " + yarnClusterDescriptor.getClass() + " to locate the jar");
+
+		// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
+		final String decodedPath = getDecodedJarPath(yarnClusterDescriptor);
+		return decodedPath.endsWith(".jar")
+				? Optional.of(new Path(new File(decodedPath).toURI()))
+				: Optional.empty();
+	}
+
+	private String getDecodedJarPath(final YarnClusterDescriptor yarnClusterDescriptor) {
+		final String encodedJarPath = yarnClusterDescriptor
+				.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+		try {
+			return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+					" You can supply a path manually via the command line.");
+		}
+	}
+
+	private void handleConfigOption(final Configuration configuration, final ConfigOption<String> option, final Consumer<String> consumer) {
+		checkNotNull(configuration);
+		checkNotNull(option);
+		checkNotNull(consumer);
+
+		final String value = configuration.getString(option);
+		if (value != null) {
+			consumer.accept(value);
+		}
+	}
+
+	private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+		final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+
+		return new YarnClusterDescriptor(
+				configuration,
+				yarnConfiguration,
+				yarnClient,
+				false);
+	}
+}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..ea1c4de
--- /dev/null
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.YarnClusterClientFactory
\ No newline at end of file
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
new file mode 100644
index 0000000..931313a
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for the {@link YarnClusterClientFactory} discovery.
+ */
+public class YarnClusterClientFactoryTest {
+
+	@Test
+	public void testYarnClusterClientFactoryDiscovery() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID);
+
+		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
+		final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);
+
+		assertTrue(factory instanceof YarnClusterClientFactory);
+	}
+}