You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/28 03:41:30 UTC

[GitHub] [flink] KarmaGYZ opened a new pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

KarmaGYZ opened a new pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248
 
 
   …hich TaskExecutor is not started
   
   ## What is the purpose of the change
   
   Release containers recovered from previous attempt in which TaskExecutor is not started. 
   
   ## Brief change log
   
   - Call nodeManagerClient#getContainerStatusAsync when getting containers from previous attpempt. When received container with state `NEW`, release it.
   - Add TestingNMClientAsync.class in test infra, mock the behavior of NMClientAsync.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - YarnResourceManagerTest#testStartWithContainerFromPreviousAttempt
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386028701
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   Currently, I don't see the benefit or requirement of changing the behavior during tests. Besides, some of its functions are called asynchronously. I prefer to add `TestingNMClientAsync` as a parameter of the constructor of `Context`. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386034735
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws Exception {
 			}
 		}
 
-		void verifyContainerHasBeenStarted(Container testingContainer) {
+		void verifyContainerHasBeenStarted(Container testingContainer, List<ContainerId> startedContainerIds) throws Exception {
 			verify(mockResourceManagerClient, VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient, VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), any(ContainerLaunchContext.class));
+			// Wait the call of
+			for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 1000) {
+				if (startedContainerIds.contains(testingContainer.getId())) {
+					return;
+				}
+				Thread.sleep(1000);
+			}
+			throw new Exception("The container has not been start before timeout.");
 
 Review comment:
   I think there are various benefits using `CompletableFuture`:
   - You don't have to handle the thread safety yourself. 
   - You don't have to do the loop and sleep yourself.
   - Better readability.
   - Probably better performance, compared to your loop and sleep.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-593180527
 
 
   @xintongsong Thanks for the review. PR updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386071286
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   Regarding your concern:
   - In most cases, people can simply set the functions before starting the RM, i.e., before `runTest`.
   - In cases changing functions during the test, one can easily make sure the change of the function happens after calls to the previous function with `CompletableFuture`. Similar to what we do with `rpcService.(re)setRpcGatewayFutureFunction` in `ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor`. 
   - Even if someone messed it up, it should only affect certain test case, which is easy to locate. No production code or other test cases will be affected.
   
   I agree that adding a builder class for `Context` could also be an option. However, I think overwriting the `TestingNMClientAsync` functions might be a better solution, for the following advantages compared to adding builder for `Context`:
   - It narrows down the scope that we need to customize in a test case. We need only provide a function that defines the NM client behavior (exactly what actually needs to be customized for the test case), instead of the entire `TestingNMClientAsync`.
   - It provides the flexibility of changing NM client behavior during the test, which might not be needed ATM but might be in the future.
   - Currently we define the test cases in double brace initialization of the `Context` class, providing very good readability. I believe the double brace initialization is required to directly follow the constructor, which means adding a builder to `Context` would prevent the usage of it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386187320
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##########
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+	private BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+		throw new UnsupportedOperationException("No getContainerStatusAsync function has been set.");
 
 Review comment:
   > This function should only be invoked explicitly.
   
   I don't see how is this necessary. We should not assume how RM uses this method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] TaoYang526 commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
TaoYang526 commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386823957
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   > Are you suggesting that calling NMClientAsync.getContainerStatusAsync on a NEW container might result in onGetContainerStatusError on some Hadoop versions while onContainerStatusReceived on other versions?
   
   No, they are coexisting in Hadoop, onContainerStatusReceived is for containers that already started by AM via calling NMClient#startContainers while onGetContainerStatusError is for containers that haven't been been started by AM.
   
   > If that is the case, I think we can have a common method handling releasing the container and removing it from the worker node map
   
   Yes, a common method is necessary.
   
   > One more question, how do we now whether a container is NEW or there's some other problems in onGetContainerStatusError?
   
   There maybe several causes for this handling, such as container is not found on NM or NM can't be connected, but they can be considered as a same problem: this container may be not useable for now since we can't get the status successfully,  I think we can just handle this as above no matter what the real cause is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385525529
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -176,9 +179,37 @@ public void teardown() throws Exception {
 		}
 	}
 
+	static class TestingNMClientAsync extends NMClientAsync {
+
+		public List<ContainerStatus> containerStatuses = new ArrayList<>();
+
+		protected TestingNMClientAsync(CallbackHandler callbackHandler) {
+			super(callbackHandler);
+		}
+
+		@Override
+		public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) {
+			// Do nothing.
+		}
+
+		@Override
+		public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
+			// Do nothing.
+		}
+
+		@Override
+		public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
+			for (ContainerStatus containerStatus: containerStatuses) {
+				if (containerStatus.getContainerId().equals(containerId)) {
+					callbackHandler.onContainerStatusReceived(containerId, containerStatus);
+				}
+			}
+		}
+	}
 
 Review comment:
   Let's make this class more general by using `Function` and `Consumer` to define its behaviors. Something like the following, taking `getContainerStatusAsync` as an example.
   ```
   static class TestingNMClientAsync extends NMClientAsync {
       // ...
   
       private final BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncFunction;
   
       // ...
   
       public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
           callbackHandler.onContainerStatusReceived(containerId, getContainerStatusAsyncFunction.apply(containerId, NodeId));
       }
   
       // ...
   }
   ```
   
   We can use a builder class to create this class, allowing setting custom `Function` and `Consumer`. If the codes for this class grows too much, we can also put it in a separate file.
   
   There are several benefit for doing this.
   - It allows defining per-test-case behavior, which makes is easier to reuse this class in the future.
   - It avoids using `mock`, `spy` and `verify`.
   - It avoids having a public accessible `containerStatuses`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385998143
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws Exception {
 			}
 		}
 
-		void verifyContainerHasBeenStarted(Container testingContainer) {
+		void verifyContainerHasBeenStarted(Container testingContainer, List<ContainerId> startedContainerIds) throws Exception {
 			verify(mockResourceManagerClient, VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient, VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), any(ContainerLaunchContext.class));
+			// Wait the call of
+			for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 1000) {
+				if (startedContainerIds.contains(testingContainer.getId())) {
+					return;
+				}
+				Thread.sleep(1000);
+			}
+			throw new Exception("The container has not been start before timeout.");
 
 Review comment:
   I see a thread safety issue here. The collection `startedContainerIds` are concurrently accessed by this verification method and `TestingNMClientAsync#startContainerAsyncBiConsumer`. It's probably fine at the moment, since the verification method does not do any modifications. But I think it would still be good to avoid this.
   
   In the test case, for each container we could create a `CompletableFuture`, and complete the future in `TestingNMClientAsync#startContainerAsyncBiConsumer`. And in the verification method, we can use `CompletableFuture#get(timeout, unit)` to verify whether the container is started. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9742a2c085266460d7b2bcde20ae46de8d8c72d5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151295147) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793) 
   * b61c045eddf32b77b81238ed06cbd961351f2e3b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151285397) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790) 
   * 9742a2c085266460d7b2bcde20ae46de8d8c72d5 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/151295147) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386187320
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##########
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+	private BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+		throw new UnsupportedOperationException("No getContainerStatusAsync function has been set.");
 
 Review comment:
   > This function should only be invoked explicitly.
   I don't see how is this necessary. We should not assume how RM uses this method.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b61c045eddf32b77b81238ed06cbd961351f2e3b Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151300896) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386172732
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -446,8 +459,8 @@ public void testStopWorker() throws Exception {
 
 				unregisterAndReleaseFuture.get();
 
-				verify(mockNMClient).stopContainerAsync(any(ContainerId.class), any(NodeId.class));
 				verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+				testingContainerStopFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
 
 Review comment:
   This should come before the previous line, because containers are first stopped, then released.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386859531
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   I just have an offline discussion with Xintong and Taoyang.
   
   The key point of this issue is: we need to release the container which is not started in the previous attempt or is not useable anymore. When we try to get the status of those containers:
   - If it goes into `onContainerStatusReceived`: It means that the container has been started in the previous attempt. No matter what is the state, these containers will be released eventually by the existing `onStartContainerError` or `onContainersCompleted`. There is no need to handle them.
   - If it goes into `onGetContainerStatusError`: It means that the container is not started in the previous attempt or other causes like NM lost. In such cases, the container is not useable and we need to release it and remove it from the `workerNodeMap`.
   
   So, I will move the release logic here to onGetContainerStatusError.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386173738
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -528,6 +553,40 @@ public void testOnStartContainerError() throws Exception {
 		}};
 	}
 
+	@Test
+	public void testHandleContainersFromPreviousAttempt() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				Container runningCountainer = mockContainer("container", 1234, 1, resourceManager.getContainerResource());
+				Container newContainer = mockContainer("container", 1234, 2, resourceManager.getContainerResource());
+
+				resourceManager.getTestingNMClientAsync().setGetContainerStatusAsyncBiFunction((containerId, nodeId) -> {
+						if (containerId.equals(runningCountainer.getId())) {
+							return ContainerStatus.newInstance(containerId, ContainerState.RUNNING, "", 0);
+						} else {
+							return ContainerStatus.newInstance(containerId, ContainerState.NEW, "", 0);
+						}
+					});
+
+				List<Container> containersFromPreviousAttempt = new ArrayList<>();
+				containersFromPreviousAttempt.add(newContainer);
+				containersFromPreviousAttempt.add(runningCountainer);
+				RegisterApplicationMasterResponse testResponse = RegisterApplicationMasterResponse.newInstance(
+					resourceManager.getContainerResource(),
+					resourceManager.getContainerResource(),
+					Collections.emptyMap(),
+					null,
+					containersFromPreviousAttempt,
+					"",
+					Collections.emptyList());
+				resourceManager.getContainersFromPreviousAttempts(testResponse);
+
+				verify(mockResourceManagerClient).releaseAssignedContainer(newContainer.getId());
+				assertEquals(1, resourceManager.getWorkerNodeMap().size());
 
 Review comment:
   In addition to the size, we can also verify the worker in the worker not map is the running container.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-593856726
 
 
   Thanks @xintongsong @TaoYang526 . I've updated the PR.
   The major change is:
   - Move the release logic from `onContainerStatusReceived` to `onGetContainerStatusError`.
   - Change the `getContainerStatusAsync` in `TestingNMClientAsync`, from function to consumer.
   - Change the `testHandleContainersFromPreviousAttempt` test case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592298238
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 401e60a289f63177b3c59e0dd9b5b600a929adb7 (Fri Feb 28 03:43:54 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385527439
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -454,6 +456,11 @@ public void onError(Throwable error) {
 		onFatalError(error);
 	}
 
+	@VisibleForTesting
+	Map<ResourceID, YarnWorkerNode> getWorkerNodeMap() {
+		return workerNodeMap;
 
 Review comment:
   ```suggestion
   		return Collections.unmodifiableMap(workerNodeMap);
   ```
   We should always be careful exposing non-primitive fields, such as `Map` and `List`. Despite declared `final`, they might still be changed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b61c045eddf32b77b81238ed06cbd961351f2e3b Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151300896) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797) 
   * 72f4432f92749f7b8825a426a8ec8e1379aa76a2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592439609
 
 
   Thanks for the review @xintongsong . I've addressed your comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385996883
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
 	static class TestingYarnResourceManager extends YarnResourceManager {
 		AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
-		NMClientAsync mockNMClient;
+		TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   Let's make these fields `final`, to avoid changing them after the `TestingYarnResourceManager` is created.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-593230211
 
 
   FYI: Add empty `increaseContainerResourceAsync` method in `TestingNMClientAsync` in case using Hadoop 2.8.0+.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151326263",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5860",
       "triggerID" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "triggerType" : "PUSH"
     }, {
       "hash" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151516034",
       "triggerID" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 72f4432f92749f7b8825a426a8ec8e1379aa76a2 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/151326263) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810) 
   * 97dc6f69b4832ae94ac4c98f656efa02b3274955 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/151516034) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5860) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385995255
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
 	static class TestingYarnResourceManager extends YarnResourceManager {
 		AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
-		NMClientAsync mockNMClient;
+		TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   I think we can keep it declared as `NMClientAsync`. That ensures we use the exact same set of public interfaces in the tests as in the production code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9742a2c085266460d7b2bcde20ae46de8d8c72d5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151295147) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151326263",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b61c045eddf32b77b81238ed06cbd961351f2e3b Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151300896) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797) 
   * 72f4432f92749f7b8825a426a8ec8e1379aa76a2 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/151326263) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385994939
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+	private final BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction;
+	private final BiConsumer<Container, ContainerLaunchContext> startContainerAsyncBiConsumer;
+	private final BiConsumer<ContainerId, NodeId> stopContainerAsyncBiConsumer;
+
+	private TestingNMClientAsync(
+			CallbackHandler callbackHandler,
+			BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction,
+			BiConsumer<Container, ContainerLaunchContext> startContainerAsyncBiConsumer,
+			BiConsumer<ContainerId, NodeId> stopContainerAsyncBiConsumer) {
+		super(callbackHandler);
+		this.getContainerStatusAsyncBiFunction = getContainerStatusAsyncBiFunction;
+		this.startContainerAsyncBiConsumer = startContainerAsyncBiConsumer;
+		this.stopContainerAsyncBiConsumer = stopContainerAsyncBiConsumer;
+	}
+
+	@Override
+	public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) {
+		startContainerAsyncBiConsumer.accept(container, containerLaunchContext);
+	}
+
+	@Override
+	public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
+		stopContainerAsyncBiConsumer.accept(containerId, nodeId);
+	}
+
+	@Override
+	public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
+		callbackHandler.onContainerStatusReceived(containerId, getContainerStatusAsyncBiFunction.apply(containerId, nodeId));
+	}
+
+	public static Builder newBuilder(NMClientAsync.CallbackHandler callbackHandler) {
+		return new Builder(callbackHandler);
+	}
+
+	/**
+	 * Builder for {@link TestingNMClientAsync}.
+	 */
+	public static class Builder {
+		private BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+			throw new UnsupportedOperationException("No getContainerStatusAsync function has been set.");
+		};
+		private BiConsumer<Container, ContainerLaunchContext> startContainerAsyncBiConsumer = (container, containerLaunchContext) -> {};
+		private BiConsumer<ContainerId, NodeId> stopContainerAsyncBiConsumer = (containerId, nodeId) -> {};
+		private NMClientAsync.CallbackHandler callbackHandler;
 
 Review comment:
   nit: this could be `final`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385519373
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
+			// If the status is "NEW", it means that the container is allocated but not be started yet.
+			// We need to release it.
+			log.warn("The container {} from the previous attempt did not start. Released.", containerId);
 
 Review comment:
   I think a INFO level log message should be enough. This is not causing any problem.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] TaoYang526 commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
TaoYang526 commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386791133
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   ContainerStatus#getState() may only returns RUNNING(means it's on starting or started) or COMPLETE(means it has finished) in most hadoop versions, rare versions may contains NEW or SCHEDULED. So that I think this condition can be declared as not RUNNING here, and we should add a condition like  `if (containerStatus.getState() != ContainerState.COMPLETE)` for the resourceManagerClient#releaseAssignedContainer calling since there's no necessary to do that.
   
   We also should handle this in onGetContainerStatusError method for containers that haven't been started via calling NMClient#startContainer yet by the last AM.
   
   Last suggestion is to consider consistence when internal state may be updated inside, it can be handled by calling runAsyc(...).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386798964
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   @TaoYang526
   Are you suggesting that calling `NMClientAsync.getContainerStatusAsync` on a `NEW` container might result in `onGetContainerStatusError` on some Hadoop versions while `onContainerStatusReceived` on other versions?
   
   If that is the case, I think we can have a common method handling releasing the container and removing it from the worker node map, which should be called in both `onContainerStatusReceived` with `if (containerStatus.getState() == ContaienrState.NEW)` and `onGetContainerStatusError`.
   
   One more question, how do we now whether a container is `NEW` or there's some other problems in `onGetContainerStatusError`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a8f61e1e4e226765baeba41d6673b2d8943f6b62 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150999106) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151285397) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790) 
   * 9742a2c085266460d7b2bcde20ae46de8d8c72d5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151326263",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 72f4432f92749f7b8825a426a8ec8e1379aa76a2 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/151326263) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810) 
   * 97dc6f69b4832ae94ac4c98f656efa02b3274955 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-594262138
 
 
   Closed since it is not a problem. For details, see [FLINK-16299](https://issues.apache.org/jira/browse/FLINK-16299).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151326263",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 72f4432f92749f7b8825a426a8ec8e1379aa76a2 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/151326263) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386067195
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   My concern for defining the behavior in the test case is:
   - If someone sets the `startContainerAsync`, which is called asynchronously, multiple times during the test, it may cause an unknown issue which is hard to debug.
   
   Regarding the constructor of Context, I think we could fix it by adding a builder class.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386028701
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   Currently, I don't see the benefit or requirement of changing the behavior during tests. Besides, some of its functions are called asynchronously. I prefer to add `TestingNMClientAsync` to the parameter list of the constructor of `Context`. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ closed pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ closed pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385519116
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
+			// If the status is "NEW", it means that the container is allocated but not be started yet.
+			// We need to release it.
+			log.warn("The container {} from the previous attempt did not start. Released.", containerId);
 
 Review comment:
   ```suggestion
   			log.info("Releasing container {} from the previous attempt. No TaskExecutor started inside.", containerId);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] TaoYang526 commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
TaoYang526 commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386823957
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   > Are you suggesting that calling NMClientAsync.getContainerStatusAsync on a NEW container might result in onGetContainerStatusError on some Hadoop versions while onContainerStatusReceived on other versions?
   
   No, they are coexisting in Hadoop, onContainerStatusReceived is for containers that already started by AM via calling NMClient#startContainers while onGetContainerStatusError is for containers that haven't been been started by AM or other causes like NM lost.
   
   > If that is the case, I think we can have a common method handling releasing the container and removing it from the worker node map
   
   Yes, a common method is necessary.
   
   > One more question, how do we now whether a container is NEW or there's some other problems in onGetContainerStatusError?
   
   There maybe several causes for this handling, such as container is not found on NM or NM can't be connected, but they can be considered as a same problem: this container may be not useable for now since we can't get the status successfully,  I think we can just handle this as above no matter what the real cause is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386170128
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##########
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+	private BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+		throw new UnsupportedOperationException("No getContainerStatusAsync function has been set.");
 
 Review comment:
   I think we should try to have a default function that does not throw any exception if possible.
   
   We can change the return type of this function to `Optional<ContainerStatus>`, return `Optional.empty()` by default, and in `getContainerStatusAsync` we check it and only invoke the `callbackHandler` if the function return value is not empty.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) 
   * a8f61e1e4e226765baeba41d6673b2d8943f6b62 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150999106) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151326263",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5860",
       "triggerID" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "triggerType" : "PUSH"
     }, {
       "hash" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151516034",
       "triggerID" : "97dc6f69b4832ae94ac4c98f656efa02b3274955",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 97dc6f69b4832ae94ac4c98f656efa02b3274955 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/151516034) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5860) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) 
   * a8f61e1e4e226765baeba41d6673b2d8943f6b62 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385533680
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -176,9 +179,37 @@ public void teardown() throws Exception {
 		}
 	}
 
+	static class TestingNMClientAsync extends NMClientAsync {
+
+		public List<ContainerStatus> containerStatuses = new ArrayList<>();
+
+		protected TestingNMClientAsync(CallbackHandler callbackHandler) {
+			super(callbackHandler);
+		}
+
+		@Override
+		public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) {
+			// Do nothing.
+		}
+
+		@Override
+		public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
+			// Do nothing.
+		}
+
+		@Override
+		public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
+			for (ContainerStatus containerStatus: containerStatuses) {
+				if (containerStatus.getContainerId().equals(containerId)) {
+					callbackHandler.onContainerStatusReceived(containerId, containerStatus);
+				}
+			}
+		}
+	}
 
 Review comment:
   Good point!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385996792
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   I'm not sure about setting the `TestingNMClientAsync` in this way.
   It requires `setTestingNMClientAsync` get called before `createAndStartNodeManagerClient`, in other words before RM is started, which is implicit and hard to maintain. 
   
   I would suggest to overwrite the behavior of `TestingNMClientAsync`, rather than overwriting the object it self. To be specific, we could do the following changes.
   1. Add a field `TestingNMClientAsync` in `Context`, so we can access it in the test cases.
   2. Create `TestingNMClientAsync` in `Context` and pass it into `TestingYarnResourceManager`.
   3. Make `Consumer` and `Function` in `TestingNMClientAsync` settable, so we can define/change its behavior during tests.
   
   In this way, it only requires to define the NM client behaviors before using them, which I believe is more straightforward.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386859531
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##########
 @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer>
 
 	@Override
 	public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
-		// We are not interested in getting container status
+		// We fetch the status of the container from the previous attempts.
+		if (containerStatus.getState() == ContainerState.NEW) {
 
 Review comment:
   I just have an offline discussion with @xintongsong  and @TaoYang526.
   
   The key point of this issue is: we need to release the container which is not started in the previous attempt or is not useable anymore. When we try to get the status of those containers:
   - If it goes into `onContainerStatusReceived`: It means that the container has been started in the previous attempt. No matter what is the state, these containers will be released eventually by the existing `onStartContainerError` or `onContainersCompleted`. There is no need to handle them.
   - If it goes into `onGetContainerStatusError`: It means that the container is not started in the previous attempt or other causes like NM lost. In such cases, the container is not useable and we need to release it and remove it from the `workerNodeMap`.
   
   So, I will move the release logic here to onGetContainerStatusError.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386164345
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
 	static class TestingYarnResourceManager extends YarnResourceManager {
 		AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
-		NMClientAsync mockNMClient;
+		TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   Since we construct this field in `TestingYarnResourceManager`, I think there is no need to keep it declared as `NMClientAsync` now. Besides, we need to access those `setter` functions of it now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386027835
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -178,7 +179,7 @@ public void teardown() throws Exception {
 
 	static class TestingYarnResourceManager extends YarnResourceManager {
 		AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
-		NMClientAsync mockNMClient;
+		TestingNMClientAsync testingNMClientAsync;
 
 Review comment:
   Good catch. It could be NMClientAsync in the current implementation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592297871
 
 
   cc @tillrohrmann @xintongsong 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386029438
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws Exception {
 			}
 		}
 
-		void verifyContainerHasBeenStarted(Container testingContainer) {
+		void verifyContainerHasBeenStarted(Container testingContainer, List<ContainerId> startedContainerIds) throws Exception {
 			verify(mockResourceManagerClient, VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient, VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), any(ContainerLaunchContext.class));
+			// Wait the call of
+			for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 1000) {
+				if (startedContainerIds.contains(testingContainer.getId())) {
+					return;
+				}
+				Thread.sleep(1000);
+			}
+			throw new Exception("The container has not been start before timeout.");
 
 Review comment:
   Yes, you are right. I think we could fix it by using `CopyOnWriteArrayList`. Is there any extra benefit of leveraging the `CompletableFuture` mechanism?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9742a2c085266460d7b2bcde20ae46de8d8c72d5 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151295147) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793) 
   * b61c045eddf32b77b81238ed06cbd961351f2e3b Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/151300896) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386034363
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   I'm not sure about that. There are quite some fields in `Context`. If for each field that needs to be customized we always add it to the `Context` constructor parameters, then the constructor will soon become complex, or we have to maintain lots of constructors with different parameter combinations.
   
   I think the key point of making `Consumer` and `Function` of `TestingNMClientAsync` is that, we can use the same default `TestingNMClientAsync` created in `Context`, but still be able to define its behavior in the test case. Being able to change it's behavior is only an extra benefit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5793",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151295147",
       "triggerID" : "9742a2c085266460d7b2bcde20ae46de8d8c72d5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151300896",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5797",
       "triggerID" : "b61c045eddf32b77b81238ed06cbd961351f2e3b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151326263",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810",
       "triggerID" : "72f4432f92749f7b8825a426a8ec8e1379aa76a2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 72f4432f92749f7b8825a426a8ec8e1379aa76a2 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/151326263) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5810) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386175617
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingNMClientAsync.java
 ##########
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * Testing implementation of {@link NMClientAsync} for tests.
+ */
+public class TestingNMClientAsync extends NMClientAsync {
+	private BiFunction<ContainerId, NodeId, ContainerStatus> getContainerStatusAsyncBiFunction = (containerId, nodeId) -> {
+		throw new UnsupportedOperationException("No getContainerStatusAsync function has been set.");
 
 Review comment:
   I'm not sure about that. This function should only be invoked explicitly. If someone needs to invoke it in the test case, they need to define the behavior either. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386066649
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -321,9 +322,16 @@ void runTest(RunnableWithException testMethod) throws Exception {
 			}
 		}
 
-		void verifyContainerHasBeenStarted(Container testingContainer) {
+		void verifyContainerHasBeenStarted(Container testingContainer, List<ContainerId> startedContainerIds) throws Exception {
 			verify(mockResourceManagerClient, VERIFICATION_TIMEOUT).removeContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient, VERIFICATION_TIMEOUT).startContainerAsync(eq(testingContainer), any(ContainerLaunchContext.class));
+			// Wait the call of
+			for (int i = 0; i < TIMEOUT.toMilliseconds(); i += 1000) {
+				if (startedContainerIds.contains(testingContainer.getId())) {
+					return;
+				}
+				Thread.sleep(1000);
+			}
+			throw new Exception("The container has not been start before timeout.");
 
 Review comment:
   Make sense to me. Thanks for the explanation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385530896
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -528,6 +558,31 @@ public void testOnStartContainerError() throws Exception {
 		}};
 	}
 
+	@Test
+	public void testStartWithContainerFromPreviousAttempt() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				Container runningCountainer = mockContainer("container", 1234, 1, Resource.newInstance(1024, 1));
+				Container newContainer = mockContainer("container", 1234, 2, Resource.newInstance(1024, 1));
+				resourceManager.getWorkerNodeMap().put(new ResourceID(runningCountainer.getId().toString()), new YarnWorkerNode(runningCountainer));
+				resourceManager.getWorkerNodeMap().put(new ResourceID(newContainer.getId().toString()), new YarnWorkerNode(newContainer));
+				testingNMClientAsync.containerStatuses.add(
+					ContainerStatus.newInstance(runningCountainer.getId(), ContainerState.RUNNING, "", 0));
+				testingNMClientAsync.containerStatuses.add(
+					ContainerStatus.newInstance(newContainer.getId(), ContainerState.NEW, "", 0));
+
+				CompletableFuture<?> requestContainerStatusFuture = resourceManager.runInMainThread(() -> {
+					testingNMClientAsync.getContainerStatusAsync(runningCountainer.getId(), runningCountainer.getNodeId());
+					testingNMClientAsync.getContainerStatusAsync(newContainer.getId(), newContainer.getNodeId());
+					return null;
+				});
 
 Review comment:
   I think we can make `getContainersFromPreviousAttempts` visible for testing, and call it on the main thread with a custom `RegisterApplicationMasterResponse`.
   
   The purpose of this test case is to test `YarnResourceManager` properly handles recovered containers, including querying their status from Yarn NM and handles them according to the received status. Currently, only covers second part of the workflow. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/151285397) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r386073340
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -222,6 +222,10 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
+		void setTestingNMClientAsync(TestingNMClientAsync testingNMClientAsync) {
+			this.testingNMClientAsync = testingNMClientAsync;
+		}
 
 Review comment:
   Agreed. We can leverage `CompletableFuture`.
   
   Regarding creating `TestingNMClientAsync` in `Context`, we need to create a `TestingYarnResourceManager` first and then pass it as the `callBackHandler` to the constructor of `TestingNMClientAsync`.
   So, I prefer to create `TestingNMClientAsync` in the constructor of `TestingYarnResourceManager` and add a "getter" function in `TestingYarnResourceManager`. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385998488
 
 

 ##########
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -528,6 +563,43 @@ public void testOnStartContainerError() throws Exception {
 		}};
 	}
 
+	@Test
+	public void testStartWithContainerFromPreviousAttempt() throws Exception {
 
 Review comment:
   minor:
   It's a bit confusing what does "start with" mean in the test case name.
   I would call it something like "testHandleContainersFromPreviousAttempt".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a8f61e1e4e226765baeba41d6673b2d8943f6b62 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150999106) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729) 
   * d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150955151",
       "triggerID" : "401e60a289f63177b3c59e0dd9b5b600a929adb7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150999106",
       "triggerID" : "a8f61e1e4e226765baeba41d6673b2d8943f6b62",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/151285397",
       "triggerID" : "d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a8f61e1e4e226765baeba41d6673b2d8943f6b62 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150999106) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5729) 
   * d326cf62377ba8258f9c2c9a324e9ea99c3e4f2e Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/151285397) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5790) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services