You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/18 09:44:07 UTC

[GitHub] [flink] zentol opened a new pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

zentol opened a new pull request #13190:
URL: https://github.com/apache/flink/pull/13190


   Only create a ClusterClient for attached deployments.
   
   The client is not necessary for detached deployments, and the creation results in error messages if the REST API is not accessible from the outside.
   
   I would like to add a test for this, but this requires mocking of several classes, and I only wanted to do that once I have approval of the change in general.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473709875



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
##########
@@ -51,10 +51,7 @@ public boolean isCompatibleWith(Configuration configuration) {
 	@Override
 	public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
 		checkNotNull(configuration);
-		if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
-			final String clusterId = generateClusterId();
-			configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-		}
+		ensureClusterIdIsSet(configuration);
 		return new KubernetesClusterDescriptor(configuration, KubeClientFactory.fromConfiguration(configuration));

Review comment:
       I think the idea was that the `ClusterDescriptor` will generate a cluster id when you call any of the `deploy*` methods. I believe what is set in the constructor of the `YarnClusterDescriptor` is fine and does not contradict what I was saying.
   
   If one really needs to configure the cluster id manually, then one could extend the `ClusterDescriptor` interface accordingly.
   
   I think that we are adding technical debt by working around broken contracts which might be fine now but we should definitely pull this straight eventually.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472783027



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       Why is calling `getClusterClient` a problem? Which call does the `RestClusterClient` do where it cannot reach the cluster?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473214901



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
##########
@@ -51,10 +51,7 @@ public boolean isCompatibleWith(Configuration configuration) {
 	@Override
 	public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
 		checkNotNull(configuration);
-		if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
-			final String clusterId = generateClusterId();
-			configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-		}
+		ensureClusterIdIsSet(configuration);
 		return new KubernetesClusterDescriptor(configuration, KubeClientFactory.fromConfiguration(configuration));

Review comment:
       Yes I noticed that too, it is at odds with the interface. You can connect to _any_ cluster, but only deploy a single one.
   But this seems to be an issue of the `ClusterDescriptor` interface; there is no way for implementations to accept additional arguments for deploying clusters.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d0ec8202d20e69a2e0f76598566378a28152dd6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672) 
   * 3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472933647



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       Turns the clusterId is never null. `KubernetesClusterClientFactory#createClusterDescriptor` ensures that the cluster-id is set in the configuration, so we actually never need the client here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d0ec8202d20e69a2e0f76598566378a28152dd6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675378974


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 3d0ec8202d20e69a2e0f76598566378a28152dd6 (Tue Aug 18 09:46:25 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472915726



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
+			if (connectToExistingCluster) {
+				clusterClientProvider = kubernetesClusterDescriptor.retrieve(clusterId);
 			} else {
-				clusterClient = kubernetesClusterDescriptor
+				clusterClientProvider = kubernetesClusterDescriptor
 						.deploySessionCluster(
-								kubernetesClusterClientFactory.getClusterSpecification(configuration))
-						.getClusterClient();
-				clusterId = clusterClient.getClusterId();
+								kubernetesClusterClientFactory.getClusterSpecification(configuration));
 			}
 
-			try {
-				if (!detached) {
-					Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
-					try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
-						while (continueRepl.f0) {
-							continueRepl = repStep(in);
-						}
+			if (!detached) {
+				if (!connectToExistingCluster) {

Review comment:
       ```suggestion
   				if (clusterId == null) {
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       hmm....is it a hard requirement that a cluster-id must be specified when starting a cluster, or can it also be auto-generated by Kubernetes?
   
   If it is not required, then we could adjust the PR such that the cluster-id is only queried if `clusterId` is null: https://github.com/apache/flink/pull/13190/files#r472915726
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472941009



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       @tillrohrmann I've adjusted the code to explicitly set the cluster ID directly in the KubernetesSessionCli, and simplified the rest accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473021574



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -88,6 +88,7 @@ public Configuration getEffectiveConfiguration(String[] args) throws CliArgsExce
 
 	private int run(String[] args) throws FlinkException, CliArgsException {
 		final Configuration configuration = getEffectiveConfiguration(args);
+		KubernetesClusterClientFactory.ensureClusterIdIsSet(configuration);

Review comment:
       That way we wouldn't have this clumsy call here.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -88,6 +88,7 @@ public Configuration getEffectiveConfiguration(String[] args) throws CliArgsExce
 
 	private int run(String[] args) throws FlinkException, CliArgsException {
 		final Configuration configuration = getEffectiveConfiguration(args);
+		KubernetesClusterClientFactory.ensureClusterIdIsSet(configuration);

Review comment:
       Shouldn't we obtain the `clusterId` from the `KubernetesClusterDescriptor`?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
##########
@@ -51,10 +51,7 @@ public boolean isCompatibleWith(Configuration configuration) {
 	@Override
 	public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
 		checkNotNull(configuration);
-		if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
-			final String clusterId = generateClusterId();
-			configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-		}
+		ensureClusterIdIsSet(configuration);
 		return new KubernetesClusterDescriptor(configuration, KubeClientFactory.fromConfiguration(configuration));

Review comment:
       It is a bit unrelated. But setting a fixed cluster id for the `ClusterDescriptor` seems wrong. That way we can ever deploy a single cluster using a `ClusterDescriptor`. I believe that the `YarnClusterDescriptor` works differently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473220233



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -88,6 +88,7 @@ public Configuration getEffectiveConfiguration(String[] args) throws CliArgsExce
 
 	private int run(String[] args) throws FlinkException, CliArgsException {
 		final Configuration configuration = getEffectiveConfiguration(args);
+		KubernetesClusterClientFactory.ensureClusterIdIsSet(configuration);

Review comment:
       The `(Kubernetes)ClusterDescriptor` provides no way of accessing the cluster-id.
   There is `ClusterClientFactory#getClusterId`, but this one only works the way we'd like to after `ClusterClientFactory#createClusterDescriptor` was called (since it modifies the passed configuration).
   
   I thought about changing `ClusterClientFactory#getClusterId` to also set the id if it is missing, to ensure that the behavior of the methods is identical regardless of method call order, but I cannot assess what repercussions this might have on users of the `ClusterClientFactory` interface.
   
   Basically, I hate this behavior, and didn't want to rely on it.
   I would argue that the user of the `ClusterClientFactory` should ensure that the cluster-id was set, but I didn't want to remove the auto-generation in `ClusterClientFactory#createClusterDescriptor` because god knows what it might break.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472817962



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       It's not the RestClusterClient that is the issue, but the retrieval of the REST API address and port (https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L96).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472936802



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       It's a bit annoying to rely on it as is since it requires methods to be called in a specific order though,




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5711",
       "triggerID" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5711) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473220233



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -88,6 +88,7 @@ public Configuration getEffectiveConfiguration(String[] args) throws CliArgsExce
 
 	private int run(String[] args) throws FlinkException, CliArgsException {
 		final Configuration configuration = getEffectiveConfiguration(args);
+		KubernetesClusterClientFactory.ensureClusterIdIsSet(configuration);

Review comment:
       The `(Kubernetes)ClusterDescriptor` provides no way of accessing the cluster-id.
   There is `ClusterClientFactory#getClusterId`, but this one only works the way we'd like to after `ClusterClientFactory#createClusterDescriptor` was called (since it modifies the passed configuration).
   
   I thought about changing `ClusterClientFactory#getClusterId` to also set the id if it is missing, to ensure that the behavior of the methods is identical regardless of method call order, but I cannot assess what repercussions this might have on other users of the `ClusterClientFactory` interface.
   
   Basically, I hate this behavior, and didn't want to rely on it.
   I would argue that the user of the `ClusterClientFactory` should ensure that the cluster-id was set, but I didn't want to remove the auto-generation in `ClusterClientFactory#createClusterDescriptor` because god knows what it might break.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -88,6 +88,7 @@ public Configuration getEffectiveConfiguration(String[] args) throws CliArgsExce
 
 	private int run(String[] args) throws FlinkException, CliArgsException {
 		final Configuration configuration = getEffectiveConfiguration(args);
+		KubernetesClusterClientFactory.ensureClusterIdIsSet(configuration);

Review comment:
       The `(Kubernetes)ClusterDescriptor` provides no way of accessing the cluster-id.
   There is `ClusterClientFactory#getClusterId`, but this one only works the way we'd like to after `ClusterClientFactory#createClusterDescriptor` was called (since it modifies the passed configuration).
   
   I thought about changing `ClusterClientFactory#getClusterId` to also set the id if it is missing, to ensure that the behavior of the methods is identical regardless of method call order, but I cannot assess what repercussions this might have on other users of the `ClusterClientFactory` interface.
   
   Basically, I hate this behavior, and didn't want to rely on it.
   I would argue that the user of the `ClusterClientFactory` should ensure that the cluster-id was set, and I didn't want to remove the auto-generation in `ClusterClientFactory#createClusterDescriptor` because god knows what it might break.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5711",
       "triggerID" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5711) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472914149



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       I guess my point is that one could already retrieve the `clusterId` from the `ClusterClientProvider` given the current implementations because the `RestClusterClient` is being initialized with `clusterId` which it simply returns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473221865



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
##########
@@ -51,10 +51,7 @@ public boolean isCompatibleWith(Configuration configuration) {
 	@Override
 	public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
 		checkNotNull(configuration);
-		if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
-			final String clusterId = generateClusterId();
-			configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-		}
+		ensureClusterIdIsSet(configuration);
 		return new KubernetesClusterDescriptor(configuration, KubeClientFactory.fromConfiguration(configuration));

Review comment:
       The YarnClusterDescriptor actually works quite similarly; things like custom names and application types are being set in the constructor.
   The `ClusterDescriptor` just doesn't handle this case very well, but admittedly we do not really have a use-case for deploying multiples clusters.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d0ec8202d20e69a2e0f76598566378a28152dd6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472916183



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       Actually we can make this change in any case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473711834



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -88,6 +88,7 @@ public Configuration getEffectiveConfiguration(String[] args) throws CliArgsExce
 
 	private int run(String[] args) throws FlinkException, CliArgsException {
 		final Configuration configuration = getEffectiveConfiguration(args);
+		KubernetesClusterClientFactory.ensureClusterIdIsSet(configuration);

Review comment:
       Hmm, yeah one should either get the `clusterId` from the user or after a cluster is deployed via the `ClusterClient` or `ClusterClientProvider`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472942385



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,42 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
-			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
+			final String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
+			if (connectToExistingCluster) {
+				kubernetesClusterDescriptor.retrieve(clusterId);
 			} else {
-				clusterClient = kubernetesClusterDescriptor
+				kubernetesClusterDescriptor
 						.deploySessionCluster(
-								kubernetesClusterClientFactory.getClusterSpecification(configuration))
-						.getClusterClient();
-				clusterId = clusterClient.getClusterId();
+								kubernetesClusterClientFactory.getClusterSpecification(configuration));
 			}
 
-			try {
-				if (!detached) {
-					Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
-					try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
-						while (continueRepl.f0) {
-							continueRepl = repStep(in);
-						}
-					} catch (Exception e) {
-						LOG.warn("Exception while running the interactive command line interface.", e);
+			if (!detached) {
+				Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
+				try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
+					while (continueRepl.f0) {
+						continueRepl = repStep(in);
 					}
-					if (continueRepl.f1) {
+				} catch (Exception e) {
+					LOG.warn("Exception while running the interactive command line interface.", e);
+				}
+
+				if (continueRepl.f1) {
+					try {

Review comment:
       these changes are now purely about better exception messages, and I would exclude them from the final PR, or move them into a hotfix commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472866577



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       Hmm ok, I see the problem. Maybe the problem is that we need to create a `ClusterClient` in order to retrieve the `clusterId` even though this is something which is known before the `ClusterClient` is being created.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r472863712



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -96,40 +97,51 @@ private int run(String[] args) throws FlinkException, CliArgsException {
 			kubernetesClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterClient<String> clusterClient;
+			final ClusterClientProvider<String> clusterClientProvider;
 			String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
 			final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
-			final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
+
+			boolean connectToExistingCluster = false;
+			try (final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration)) {
+				connectToExistingCluster = clusterId != null && kubeClient.getRestService(clusterId).isPresent();
+			} catch (Exception e) {
+				LOG.info("Could not properly shutdown cluster client.", e);
+			}
 
 			// Retrieve or create a session cluster.
-			if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
-				clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();

Review comment:
       Alright, this makes sense. Thanks for clarification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5711",
       "triggerID" : "3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d0ec8202d20e69a2e0f76598566378a28152dd6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672) 
   * 3c4c7fd79fc8e84bcf9afd078e3ec5e6c8629025 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5711) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13190:
URL: https://github.com/apache/flink/pull/13190#issuecomment-675384858


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672",
       "triggerID" : "3d0ec8202d20e69a2e0f76598566378a28152dd6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d0ec8202d20e69a2e0f76598566378a28152dd6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5672) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13190: [FLINK-18986][kubernetes] Create ClusterClient only for attached deployments

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13190:
URL: https://github.com/apache/flink/pull/13190#discussion_r473221865



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterClientFactory.java
##########
@@ -51,10 +51,7 @@ public boolean isCompatibleWith(Configuration configuration) {
 	@Override
 	public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
 		checkNotNull(configuration);
-		if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
-			final String clusterId = generateClusterId();
-			configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-		}
+		ensureClusterIdIsSet(configuration);
 		return new KubernetesClusterDescriptor(configuration, KubeClientFactory.fromConfiguration(configuration));

Review comment:
       The YarnClusterDescriptor actually works quite similarly; there things like custom names and application types are being defined in the constructor.
   The `ClusterDescriptor` just doesn't handle this case very well, but admittedly we do not really have a use-case for deploying multiples clusters.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org