You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/07 13:09:02 UTC

[GitHub] [flink] kl0u commented on a change in pull request #12003: [FLINK-10934] Support application mode for kubernetes

kl0u commented on a change in pull request #12003:
URL: https://github.com/apache/flink/pull/12003#discussion_r421475927



##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -75,16 +75,21 @@
 	@Nullable
 	private final File userLibDirectory;
 
+	@Nullable
+	private final File jarFile;
+

Review comment:
       Although unrelated to this PR, I think we should annotate this class as `@Internal`. 

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -179,6 +187,22 @@ public static String getCommonStartCommand(
 		return BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
 	}
 
+	public static File checkJarFileForApplicationMode(Configuration configuration) {

Review comment:
       I think here we can return the whole list, and check for the size later, in the `KubernetesApplicationClusterEntrypoint.getPackagedProgramRetriever()` as there it is the relevant place for the check. Here this method can remain general enough. 
   
   So this can become:
   ```
   public static List<File> checkJarFileForApplicationMode(Configuration configuration) {
   		return configuration.get(PipelineOptions.JARS).stream().map(
   			FunctionUtils.uncheckedFunction(
   				uri -> {
   					final URI jarURI = PackagedProgramUtils.resolveURI(uri);
   					if (jarURI.getScheme().equals("local") && jarURI.isAbsolute()) {
   						return new File(jarURI.getPath());
   					}
   					throw new IllegalArgumentException("Only support local schema for application mode. \"local\" means the " +
   						"jar is located in the image, not Flink client. e.g. local:///opt/flink/examples/streaming/WindowJoin.jar");
   				})
   		).collect(Collectors.toList());
   	}
   ```
   
   and in the `entrypoint` we can have:
   
   ```
   final List<File> pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(configuration);
   Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -146,8 +148,38 @@ public String getClusterDescription() {
 	@Override
 	public ClusterClientProvider<String> deployApplicationCluster(
 			final ClusterSpecification clusterSpecification,
-			final ApplicationConfiguration applicationConfiguration) {
-		throw new UnsupportedOperationException("Application Mode not supported by Active Kubernetes deployments.");
+			final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
+		if (client.getRestService(clusterId).isPresent()) {
+			throw new ClusterDeploymentException("The Flink cluster " + clusterId + " already exists.");
+		}
+
+		checkNotNull(clusterSpecification);
+		checkNotNull(applicationConfiguration);
+
+		final String deploymentTarget = flinkConfig.get(DeploymentOptions.TARGET);
+
+		if (!KubernetesConfigOptions.DeploymentTargetNames.APPLICATION.getName().equalsIgnoreCase(deploymentTarget)) {
+			throw new ClusterDeploymentException(
+				"Couldn't deploy Kubernetes Application Cluster." +
+					" Expected deployment.target=" + KubernetesConfigOptions.DeploymentTargetNames.APPLICATION.getName() +
+					" but actual one was \"" + deploymentTarget + "\"");
+		}
+
+		applicationConfiguration.applyToConfiguration(flinkConfig);
+		KubernetesUtils.checkJarFileForApplicationMode(flinkConfig);

Review comment:
       Why do we do this here if we ignore the result? Is it for a sanity check to fail early? Either way we call it also later.

##########
File path: flink-dist/src/main/flink-bin/conf/log4j-cli.properties
##########
@@ -38,6 +38,9 @@ logger.yarncli.appenderRef.console.ref = ConsoleAppender
 logger.hadoop.name = org.apache.hadoop
 logger.hadoop.level = INFO
 logger.hadoop.appenderRef.console.ref = ConsoleAppender
+logger.kubernetes.name = org.apache.flink.kubernetes

Review comment:
       Probably add a comment:
   ```
   # Log output from org.apache.flink.kubernetes to the console.
   ```
   as it is done above for Yarn.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -222,6 +223,25 @@
 		Never
 	}
 
+	/**
+	 * The deployment target names for Kubernetes.

Review comment:
       I would move this to a separate class like the `KubernetesDeploymentTarget` with some additional functionality. I think having all the options somewhere can help later if we want to generate help messages. The class could look like the `YarnDeploymentTarget` and also make the related chagnes in the:
   
   * `KubernetesSessionClusterExecutor`
   * `KubernetesClusterClientFactory`
   * `KubernetesClusterDescriptor` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org