You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/29 16:25:48 UTC

[GitHub] [flink] rkhachatryan opened a new pull request #11948: Uc aligned savepoints

rkhachatryan opened a new pull request #11948:
URL: https://github.com/apache/flink/pull/11948


   ## What is the purpose of the change
   
   Perform alignment for savepoints even if UnalignedCheckpoints are enabled.
   
   ## Verifying this change
   
   Added unit tests:
    - `AlternatingCheckpointBarrierHandlerTest`
    - `CheckpointOptionsTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621321811


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit e1d9c5cffe6d375dc1bdf42472692f2756e7e58a (Wed Apr 29 16:28:44 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **Invalid pull request title: No valid Jira ID provided**
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r421430784



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {

Review comment:
       I was more concerned that the alternation is currently not working when a barrier is received through notify. In particular, an UC is only triggered when the first barrier is processed vs. received defeating the purpose of UC.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -551,7 +551,7 @@ void onSenderBacklog(int backlog) throws IOException {
 	}
 
 	public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
-		boolean recycleBuffer = true;
+		boolean enqueued = false;

Review comment:
       While `enqueued` is more semantic, I'd assume that the technical `recycleBuffer` should be more explicit concerning bugs.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -291,7 +291,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 	 * returns null in all other cases.
 	 */
 	@Nullable
-	protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {
+	CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {

Review comment:
       But you are decreasing visibility here...

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -300,7 +300,7 @@ protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws I
 		// reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer.
 		// we can further improve to avoid double deserialization in the future.
 		buffer.setReaderIndex(0);
-		return event.getClass() == CheckpointBarrier.class ? (CheckpointBarrier) event : null;
+		return event instanceof CheckpointBarrier ? (CheckpointBarrier) event : null;

Review comment:
       Do we even have any subclass here?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
##########
@@ -124,7 +124,7 @@ public SingleInputGate create(
 			bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);
 		}
 
-		SingleInputGate inputGate = new SingleInputGate(

Review comment:
       Are the last three hotfix commits even used in the PR? (Introducing interfaces for gates/channels?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * e3a4c94c3da14004b31914fd8d0aee507d29c17c Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/162779167) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434) 
   * d43b046ab4c15ed91ac706b210faaaad4de1a392 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418024504



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -291,7 +291,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 	 * returns null in all other cases.
 	 */
 	@Nullable
-	protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {
+	CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {

Review comment:
       This change is necessary to create `TestInputChannel` (in a different package).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1d9c5cffe6d375dc1bdf42472692f2756e7e58a Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162758548) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431) 
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * e3a4c94c3da14004b31914fd8d0aee507d29c17c Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162779167) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-628694257


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425595057



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -469,7 +469,9 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 
 			if (notifyReceivedBarrier != null) {
 				receivedCheckpointId = notifyReceivedBarrier.getId();
-				listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
+				if (notifyReceivedBarrier.isCheckpoint()) {

Review comment:
       > above receivedCheckpointId < lastRequestedCheckpointId might be invalid 
   
   Yes, the intention was to invalidate this condition. 
   So the next data buffer after a *savepoint* barrier doesn't notify the listener and doesn't go to `ChannelStateWriter`.
   
   >  to make some unnecessary notifyReceivedBuffer to be spilled in listener.
   
   If the condition is invalidated then the listener is NOT notified.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1d9c5cffe6d375dc1bdf42472692f2756e7e58a Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162758548) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431) 
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-628664949


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * e3a4c94c3da14004b31914fd8d0aee507d29c17c Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/162779167) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434) 
   * d43b046ab4c15ed91ac706b210faaaad4de1a392 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425544862



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -469,7 +469,9 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 
 			if (notifyReceivedBarrier != null) {
 				receivedCheckpointId = notifyReceivedBarrier.getId();
-				listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
+				if (notifyReceivedBarrier.isCheckpoint()) {

Review comment:
       I guess the above `receivedCheckpointId = notifyReceivedBarrier.getId();` should also be placed inside this condition, because `receivedCheckpointId` only works for checkpoint now. Otherwise the above `receivedCheckpointId < lastRequestedCheckpointId` might be invalid to make some unnecessary `notifyReceivedBuffer` to be spilled in listener.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * e58d7190f08069c4b3f782d1be0a46c8201ae783 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r424678536



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -109,10 +109,17 @@ private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 		switch (config.getCheckpointMode()) {
 			case EXACTLY_ONCE:
 				if (config.isUnalignedCheckpointsEnabled()) {
-					return new CheckpointBarrierUnaligner(
-						numberOfInputChannelsPerGate.toArray(),
-						channelStateWriter,
-						taskName,
+					return new AlternatingCheckpointBarrierHandler(
+						new CheckpointBarrierAligner(
+							taskName,
+							channelIndexToInputGate,
+							inputGateToChannelIndexOffset,
+							toNotifyOnCheckpoint),
+						new CheckpointBarrierUnaligner(
+							numberOfInputChannelsPerGate.toArray(),
+							channelStateWriter,
+							taskName,
+							toNotifyOnCheckpoint),

Review comment:
       There are two methods in `CheckpointedInputGate` which are implemented in not very object oriented design (I guess they should be re-implemented) that would stop working with this change (I hope some tests will fail because of that):
   ```
   	public void spillInflightBuffers(
   			long checkpointId,
   			int channelIndex,
   			ChannelStateWriter channelStateWriter) throws IOException {
   		if (((CheckpointBarrierUnaligner) barrierHandler).hasInflightData(checkpointId, channelIndex)) {
   			inputGate.getChannel(channelIndex).spillInflightBuffers(checkpointId, channelStateWriter);
   		}
   	}
   
   	public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
   		return ((CheckpointBarrierUnaligner) barrierHandler).getAllBarriersReceivedFuture(checkpointId);
   	}
   ```
   I guess `hasInflightData()` and `getAllBarriersReceivedFuture()` should be pulled to `CheckpointBarrierHandler` interface (separate commit?)?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
 		localChannel.resumeConsumption();
 	}
 
+	@Test
+	public void testNoNotifyOnSavepoint() throws IOException {
+		TestBufferReceivedListener listener = new TestBufferReceivedListener();
+		LocalInputChannel channel = new LocalInputChannel(
+			new SingleInputGateBuilder().build(),
+			0,
+			new ResultPartitionID(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new TestCounter(),
+			new TestCounter());
+		CheckpointBarrier barrier = new CheckpointBarrier(123L, 123L, new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault()));
+		channel.notifyPriorityEvent(new BufferConsumer(toBuffer(barrier).getMemorySegment(), FreeingBufferRecycler.INSTANCE, getDataType(barrier)));
+		channel.checkError();
+		assertTrue(listener.notifiedOnBarriers.isEmpty());

Review comment:
       frankly, it's not that unexpected that it's empty, since `listener` was never passed anywhere? 😈 
   
   (you forgot to register it to `SingleInputGate`?)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
 		localChannel.resumeConsumption();
 	}
 
+	@Test
+	public void testNoNotifyOnSavepoint() throws IOException {
+		TestBufferReceivedListener listener = new TestBufferReceivedListener();
+		LocalInputChannel channel = new LocalInputChannel(
+			new SingleInputGateBuilder().build(),
+			0,
+			new ResultPartitionID(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new TestCounter(),
+			new TestCounter());

Review comment:
       nit:
   ```
   InputChannelBuilder
     .newBuilder()
     .buildLocalChannel(new SingleInputGateBuilder().build())
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425038624



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {

Review comment:
       > There is an inconsistency issue from perspective of InputChannels threads. But they are accessing AlternatingCheckpointBarrierHandler only via the notifyBarrierReceived and notifyBufferReceived
   
   If I got your point right,
   `InputChannels` threads notify `CheckpointBarrierUnaligner.ThreadSafeUnaligner`, not `AlternatingCheckpointBarrierHandler`.
   And `ThreadSafeUnaligner` doesn't know about the `AlternatingCheckpointBarrierHandler`.
   
   There is "inconsistency" in the sense that `AlternatingCheckpointBarrierHandler` can "lag behind" `ThreadSafeUnaligner` but it shouldn't cause any problems.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425595280



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
+
+class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
+	private final CheckpointBarrierAligner alignedHandler;
+	private final CheckpointBarrierUnaligner unalignedHandler;
+	private CheckpointBarrierHandler activeHandler;
+
+	AlternatingCheckpointBarrierHandler(CheckpointBarrierAligner alignedHandler, CheckpointBarrierUnaligner unalignedHandler, AbstractInvokable invokable) {
+		super(invokable);
+		this.activeHandler = this.alignedHandler = alignedHandler;
+		this.unalignedHandler = unalignedHandler;
+	}
+
+	@Override
+	public void releaseBlocksAndResetBarriers() {
+		activeHandler.releaseBlocksAndResetBarriers();
+	}
+
+	@Override
+	public boolean isBlocked(int channelIndex) {
+		return activeHandler.isBlocked(channelIndex);
+	}
+
+	@Override
+	public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
+		CheckpointBarrierHandler previousHandler = activeHandler;
+		activeHandler = receivedBarrier.isCheckpoint() ? unalignedHandler : alignedHandler;
+		abortPreviousIfNeeded(receivedBarrier, previousHandler);
+		activeHandler.processBarrier(receivedBarrier, channelIndex);
+	}
+
+	private void abortPreviousIfNeeded(CheckpointBarrier barrier, CheckpointBarrierHandler prevHandler) throws IOException {
+		if (prevHandler != activeHandler && prevHandler.isCheckpointPending() && prevHandler.getLatestCheckpointId() < barrier.getId()) {
+			prevHandler.releaseBlocksAndResetBarriers();
+			notifyAbort(
+				prevHandler.getLatestCheckpointId(),
+				new CheckpointException(
+					format("checkpoint %d subsumed by %d", prevHandler.getLatestCheckpointId(), barrier.getId()),
+					CHECKPOINT_DECLINED_SUBSUMED));
+		}
+	}
+
+	@Override
+	public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		activeHandler.processCancellationBarrier(cancelBarrier);
+	}
+
+	@Override
+	public void processEndOfPartition() throws Exception {
+		alignedHandler.processEndOfPartition();

Review comment:
       I this case both handlers need to update their number of open input channels.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "939c07544a571fead60da55aacdddae8f0a84966",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188",
       "triggerID" : "939c07544a571fead60da55aacdddae8f0a84966",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278",
       "triggerID" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6e67644b8eaad167f6397a07320946beb3693748",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1319",
       "triggerID" : "6e67644b8eaad167f6397a07320946beb3693748",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * 6e67644b8eaad167f6397a07320946beb3693748 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1319) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425544862



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -469,7 +469,9 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 
 			if (notifyReceivedBarrier != null) {
 				receivedCheckpointId = notifyReceivedBarrier.getId();
-				listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
+				if (notifyReceivedBarrier.isCheckpoint()) {

Review comment:
       I guess the above `receivedCheckpointId = notifyReceivedBarrier.getId();` should also be placed inside this condition, because `receivedCheckpointId` only works for checkpoint now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418038310



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {

Review comment:
       Thanks! I'll test that `InputChannel`s don't notify `BufferReceivedListener`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425580357



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
+
+class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
+	private final CheckpointBarrierAligner alignedHandler;
+	private final CheckpointBarrierUnaligner unalignedHandler;
+	private CheckpointBarrierHandler activeHandler;
+
+	AlternatingCheckpointBarrierHandler(CheckpointBarrierAligner alignedHandler, CheckpointBarrierUnaligner unalignedHandler, AbstractInvokable invokable) {
+		super(invokable);
+		this.activeHandler = this.alignedHandler = alignedHandler;
+		this.unalignedHandler = unalignedHandler;
+	}
+
+	@Override
+	public void releaseBlocksAndResetBarriers() {
+		activeHandler.releaseBlocksAndResetBarriers();
+	}
+
+	@Override
+	public boolean isBlocked(int channelIndex) {
+		return activeHandler.isBlocked(channelIndex);
+	}
+
+	@Override
+	public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
+		CheckpointBarrierHandler previousHandler = activeHandler;
+		activeHandler = receivedBarrier.isCheckpoint() ? unalignedHandler : alignedHandler;
+		abortPreviousIfNeeded(receivedBarrier, previousHandler);
+		activeHandler.processBarrier(receivedBarrier, channelIndex);
+	}
+
+	private void abortPreviousIfNeeded(CheckpointBarrier barrier, CheckpointBarrierHandler prevHandler) throws IOException {
+		if (prevHandler != activeHandler && prevHandler.isCheckpointPending() && prevHandler.getLatestCheckpointId() < barrier.getId()) {
+			prevHandler.releaseBlocksAndResetBarriers();
+			notifyAbort(
+				prevHandler.getLatestCheckpointId(),
+				new CheckpointException(
+					format("checkpoint %d subsumed by %d", prevHandler.getLatestCheckpointId(), barrier.getId()),
+					CHECKPOINT_DECLINED_SUBSUMED));
+		}
+	}
+
+	@Override
+	public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		activeHandler.processCancellationBarrier(cancelBarrier);
+	}
+
+	@Override
+	public void processEndOfPartition() throws Exception {
+		alignedHandler.processEndOfPartition();

Review comment:
       A bit confuse here why we need call both handlers, should only the active handler work at the same time?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
+
+class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
+	private final CheckpointBarrierAligner alignedHandler;
+	private final CheckpointBarrierUnaligner unalignedHandler;
+	private CheckpointBarrierHandler activeHandler;
+
+	AlternatingCheckpointBarrierHandler(CheckpointBarrierAligner alignedHandler, CheckpointBarrierUnaligner unalignedHandler, AbstractInvokable invokable) {
+		super(invokable);
+		this.activeHandler = this.alignedHandler = alignedHandler;
+		this.unalignedHandler = unalignedHandler;
+	}
+
+	@Override
+	public void releaseBlocksAndResetBarriers() {
+		activeHandler.releaseBlocksAndResetBarriers();
+	}
+
+	@Override
+	public boolean isBlocked(int channelIndex) {
+		return activeHandler.isBlocked(channelIndex);
+	}
+
+	@Override
+	public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
+		CheckpointBarrierHandler previousHandler = activeHandler;
+		activeHandler = receivedBarrier.isCheckpoint() ? unalignedHandler : alignedHandler;
+		abortPreviousIfNeeded(receivedBarrier, previousHandler);
+		activeHandler.processBarrier(receivedBarrier, channelIndex);
+	}
+
+	private void abortPreviousIfNeeded(CheckpointBarrier barrier, CheckpointBarrierHandler prevHandler) throws IOException {
+		if (prevHandler != activeHandler && prevHandler.isCheckpointPending() && prevHandler.getLatestCheckpointId() < barrier.getId()) {
+			prevHandler.releaseBlocksAndResetBarriers();
+			notifyAbort(
+				prevHandler.getLatestCheckpointId(),
+				new CheckpointException(
+					format("checkpoint %d subsumed by %d", prevHandler.getLatestCheckpointId(), barrier.getId()),
+					CHECKPOINT_DECLINED_SUBSUMED));
+		}
+	}
+
+	@Override
+	public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+		activeHandler.processCancellationBarrier(cancelBarrier);
+	}
+
+	@Override
+	public void processEndOfPartition() throws Exception {
+		alignedHandler.processEndOfPartition();

Review comment:
       A bit confused here why we need call both handlers, should only the active handler work at the same time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1d9c5cffe6d375dc1bdf42472692f2756e7e58a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418027203



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -551,7 +551,7 @@ void onSenderBacklog(int backlog) throws IOException {
 	}
 
 	public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
-		boolean recycleBuffer = true;
+		boolean enqueued = false;

Review comment:
       > In case of some parse exception, the whole TM is restarted anyways.
   
   That's true for now, but it can change in the future.
   
   > renaming and inversion is unnecessary
   
   This is to give more context to prevent future bugs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "939c07544a571fead60da55aacdddae8f0a84966",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188",
       "triggerID" : "939c07544a571fead60da55aacdddae8f0a84966",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278",
       "triggerID" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * de5f35ff8499cf1c4cc6455b887d1721a8ced837 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * e3a4c94c3da14004b31914fd8d0aee507d29c17c Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/162779167) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * e3a4c94c3da14004b31914fd8d0aee507d29c17c Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162779167) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * d43b046ab4c15ed91ac706b210faaaad4de1a392 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501) 
   * e58d7190f08069c4b3f782d1be0a46c8201ae783 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425024825



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {

Review comment:
       There is an inconsistency issue from perspective of InputChannels threads. But they are accessing `AlternatingCheckpointBarrierHandler` only via the `notifyBarrierReceived` and `notifyBufferReceived` and those methods are only called/active AFTER unaligned checkpoint has started in the task thread, so barrier had to be already processed by the `AlternatingCheckpointBarrierHandler`. Hence `activeHandler` had to already switch `unalignedHandler`.
   
   This is very fragile contract, but :
   
   - solution would require to synchronize code even more 
   - I think there is no bug at the moment
   - once we fix the threading model and get rid of the listener all together, the problem will be gone
   
   I only hope I'm not missing something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425020396



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -109,10 +109,17 @@ private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 		switch (config.getCheckpointMode()) {
 			case EXACTLY_ONCE:
 				if (config.isUnalignedCheckpointsEnabled()) {
-					return new CheckpointBarrierUnaligner(
-						numberOfInputChannelsPerGate.toArray(),
-						channelStateWriter,
-						taskName,
+					return new AlternatingCheckpointBarrierHandler(
+						new CheckpointBarrierAligner(
+							taskName,
+							channelIndexToInputGate,
+							inputGateToChannelIndexOffset,
+							toNotifyOnCheckpoint),
+						new CheckpointBarrierUnaligner(
+							numberOfInputChannelsPerGate.toArray(),
+							channelStateWriter,
+							taskName,
+							toNotifyOnCheckpoint),

Review comment:
       Good point! 
   Moving up the methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-628539917


   Thanks for the feedback @pnowojski, I've addressed the remaining issues.
   
   Regarding the IT case, I think it's out of the scope of this PR because it only fixes savepoint creation in UC mode without changing the snapshot format or recovery logic.
   
   [This thread](https://github.com/apache/flink/pull/11948#discussion_r424570447) remains open, could you please revisit it @AHeise and/or @pnowojski?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "939c07544a571fead60da55aacdddae8f0a84966",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188",
       "triggerID" : "939c07544a571fead60da55aacdddae8f0a84966",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278",
       "triggerID" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * 939c07544a571fead60da55aacdddae8f0a84966 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188) 
   * de5f35ff8499cf1c4cc6455b887d1721a8ced837 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e1d9c5cffe6d375dc1bdf42472692f2756e7e58a Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162758548) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431) 
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * d43b046ab4c15ed91ac706b210faaaad4de1a392 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501) 
   * e58d7190f08069c4b3f782d1be0a46c8201ae783 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-628401691


   One more thing. I think we are missing here an ITCase, that would verify user can modify job graph (changing record schema for example) after taking a savepoint in unaligned checkpoint mode.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski closed pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski closed pull request #11948:
URL: https://github.com/apache/flink/pull/11948


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418035479



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -108,18 +108,24 @@ private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 			AbstractInvokable toNotifyOnCheckpoint) {
 		switch (config.getCheckpointMode()) {
 			case EXACTLY_ONCE:
-				if (config.isUnalignedCheckpointsEnabled()) {
-					return new CheckpointBarrierUnaligner(
-						numberOfInputChannelsPerGate.toArray(),
-						channelStateWriter,
+				return config.isUnalignedCheckpointsEnabled() ?

Review comment:
       Replied [above](https://github.com/apache/flink/pull/11948#discussion_r418034387).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "939c07544a571fead60da55aacdddae8f0a84966",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188",
       "triggerID" : "939c07544a571fead60da55aacdddae8f0a84966",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278",
       "triggerID" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6e67644b8eaad167f6397a07320946beb3693748",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6e67644b8eaad167f6397a07320946beb3693748",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * de5f35ff8499cf1c4cc6455b887d1721a8ced837 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278) 
   * 6e67644b8eaad167f6397a07320946beb3693748 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "939c07544a571fead60da55aacdddae8f0a84966",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188",
       "triggerID" : "939c07544a571fead60da55aacdddae8f0a84966",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * 939c07544a571fead60da55aacdddae8f0a84966 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188) 
   * de5f35ff8499cf1c4cc6455b887d1721a8ced837 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418045147



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {

Review comment:
       Good point!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418018426



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -560,8 +559,8 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 			}
 
 			final boolean wasEmpty;
-			final CheckpointBarrier notifyReceivedBarrier;
-			final Buffer notifyReceivedBuffer;
+			final Optional<CheckpointBarrier> notifyReceivedBarrier;
+			final Optional<Buffer> notifyReceivedBuffer;

Review comment:
       Yes, only for symmetry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r423001942



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -291,7 +291,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 	 * returns null in all other cases.
 	 */
 	@Nullable
-	protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {
+	CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {

Review comment:
       You're right, reverting this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhijiangW commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
zhijiangW commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425593644



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -469,7 +469,9 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 
 			if (notifyReceivedBarrier != null) {
 				receivedCheckpointId = notifyReceivedBarrier.getId();
-				listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
+				if (notifyReceivedBarrier.isCheckpoint()) {

Review comment:
       Yes, it should be no problem here. I forgot the truth that the savepoint id is consecutive incremental based on checkpoint id.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418034387



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -575,12 +574,10 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 				receivedBuffers.add(buffer);
 				enqueued = true;
 
-				if (listener != null && buffer.isBuffer() && receivedCheckpointId < lastRequestedCheckpointId) {
-					notifyReceivedBuffer = buffer.retainBuffer();

Review comment:
       IMHO ternary is meant for branches that have only one "effect", usually assignment. And this is exactly the case here. 
   `If-else` OTH is meant for cases when "effect" is something more complex.
   
   If the condition is complex, it should be extracted to a method, regardless of whether it's `if-else` or ternary operator.
   
   Here, I think the condition is fine (and I didn't change it).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425059502



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {

Review comment:
       > There is "inconsistency" in the sense that AlternatingCheckpointBarrierHandler can "lag behind" ThreadSafeUnaligner but it shouldn't cause any problems.
   
   Yes, me and Arvid think and hope that's the case. (I've synced with him offline)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r425591726



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -469,7 +469,9 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 
 			if (notifyReceivedBarrier != null) {
 				receivedCheckpointId = notifyReceivedBarrier.getId();
-				listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
+				if (notifyReceivedBarrier.isCheckpoint()) {

Review comment:
       But the condition works the other way around? If we bump/increase `receivedCheckpointId` here, it shouldn't matter. Spilling from input channels starts only if `lastRequestedCheckpointId` is bumped over the `receivedCheckpointId`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r417896328



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -551,7 +551,7 @@ void onSenderBacklog(int backlog) throws IOException {
 	}
 
 	public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
-		boolean recycleBuffer = true;
+		boolean enqueued = false;

Review comment:
       renaming and inversion is unnecessary. You could have just pulled up setting `recycleBuffer = false`.
   
   Note that this whole hotfix is rather academic. In case of some parse exception, the whole TM is restarted anyways.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -291,7 +291,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 	 * returns null in all other cases.
 	 */
 	@Nullable
-	protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {
+	CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws IOException {

Review comment:
       Unnecessary change. `protected` was used to give all subclasses access to this method. That there are currently all in the same package is rather coincidental.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -108,18 +108,24 @@ private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 			AbstractInvokable toNotifyOnCheckpoint) {
 		switch (config.getCheckpointMode()) {
 			case EXACTLY_ONCE:
-				if (config.isUnalignedCheckpointsEnabled()) {
-					return new CheckpointBarrierUnaligner(
-						numberOfInputChannelsPerGate.toArray(),
-						channelStateWriter,
+				return config.isUnalignedCheckpointsEnabled() ?

Review comment:
       Again ternary is not making anything easier here.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {
+
+	@Test
+	public void testCheckpointHandling() throws Exception {
+		testBarrierHandling(CHECKPOINT);
+	}
+
+	@Test
+	public void testSavepointHandling() throws Exception {
+		testBarrierHandling(SAVEPOINT);
+	}
+
+	@Test
+	public void testAlternation() throws Exception {
+		int numBarriers = 123;
+		int numChannels = 123;
+		TestInvokable target = new TestInvokable();
+		CheckpointedInputGate gate = buildGate(target, numChannels);
+		List<Long> barriers = new ArrayList<>();
+		for (long barrier = 0; barrier < numBarriers; barrier++) {
+			barriers.add(barrier);
+			CheckpointType type = barrier % 2 == 0 ? CHECKPOINT : SAVEPOINT;
+			for (int channel = 0; channel < numChannels; channel++) {
+				sendBarrier(barrier, type, (TestInputChannel) gate.getChannel(channel), gate);
+			}
+		}
+		assertEquals(barriers, target.triggeredCheckpoints);
+	}
+
+	private void testBarrierHandling(CheckpointType checkpointType) throws Exception {
+		final long barrierId = 123L;
+		TestInvokable target = new TestInvokable();
+		SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
+		TestInputChannel fast = new TestInputChannel(gate, 0);
+		TestInputChannel slow = new TestInputChannel(gate, 1);
+		gate.setInputChannels(fast, slow);
+		AlternatingCheckpointBarrierHandler barrierHandler = barrierHandler(gate, target);
+		CheckpointedInputGate checkpointedGate = new CheckpointedInputGate(gate, barrierHandler, 0 /* offset */);
+
+		sendBarrier(barrierId, checkpointType, fast, checkpointedGate);
+
+		assertEquals(checkpointType.isSavepoint(), target.triggeredCheckpoints.isEmpty());
+		assertEquals(checkpointType.isSavepoint(), barrierHandler.isBlocked(fast.getChannelIndex()));
+		assertFalse(barrierHandler.isBlocked(slow.getChannelIndex()));
+
+		sendBarrier(barrierId, checkpointType, slow, checkpointedGate);
+
+		assertEquals(singletonList(barrierId), target.triggeredCheckpoints);
+		for (InputChannel channel : gate.getInputChannels().values()) {
+			assertFalse(barrierHandler.isBlocked(channel.getChannelIndex()));
+			assertEquals(
+					String.format("channel %d should be resumed", channel.getChannelIndex()),

Review comment:
       nit: indent

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {

Review comment:
       Good tests! But I think you also need to test `BufferReceivedListener` as the current version of AlternatingCheckpointBarrierHandler is probably not allowing any UC on input side.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -575,12 +574,10 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 				receivedBuffers.add(buffer);
 				enqueued = true;
 
-				if (listener != null && buffer.isBuffer() && receivedCheckpointId < lastRequestedCheckpointId) {
-					notifyReceivedBuffer = buffer.retainBuffer();

Review comment:
       Even if you go optional, I wouldn't go ternary here. Ternary is meant for some simple condition and one-liners.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {
+
+	@Test
+	public void testCheckpointHandling() throws Exception {
+		testBarrierHandling(CHECKPOINT);
+	}
+
+	@Test
+	public void testSavepointHandling() throws Exception {
+		testBarrierHandling(SAVEPOINT);
+	}
+
+	@Test
+	public void testAlternation() throws Exception {
+		int numBarriers = 123;
+		int numChannels = 123;
+		TestInvokable target = new TestInvokable();
+		CheckpointedInputGate gate = buildGate(target, numChannels);
+		List<Long> barriers = new ArrayList<>();
+		for (long barrier = 0; barrier < numBarriers; barrier++) {
+			barriers.add(barrier);
+			CheckpointType type = barrier % 2 == 0 ? CHECKPOINT : SAVEPOINT;
+			for (int channel = 0; channel < numChannels; channel++) {
+				sendBarrier(barrier, type, (TestInputChannel) gate.getChannel(channel), gate);
+			}
+		}
+		assertEquals(barriers, target.triggeredCheckpoints);
+	}
+
+	private void testBarrierHandling(CheckpointType checkpointType) throws Exception {
+		final long barrierId = 123L;
+		TestInvokable target = new TestInvokable();
+		SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
+		TestInputChannel fast = new TestInputChannel(gate, 0);
+		TestInputChannel slow = new TestInputChannel(gate, 1);
+		gate.setInputChannels(fast, slow);
+		AlternatingCheckpointBarrierHandler barrierHandler = barrierHandler(gate, target);
+		CheckpointedInputGate checkpointedGate = new CheckpointedInputGate(gate, barrierHandler, 0 /* offset */);
+
+		sendBarrier(barrierId, checkpointType, fast, checkpointedGate);
+
+		assertEquals(checkpointType.isSavepoint(), target.triggeredCheckpoints.isEmpty());
+		assertEquals(checkpointType.isSavepoint(), barrierHandler.isBlocked(fast.getChannelIndex()));
+		assertFalse(barrierHandler.isBlocked(slow.getChannelIndex()));
+
+		sendBarrier(barrierId, checkpointType, slow, checkpointedGate);
+
+		assertEquals(singletonList(barrierId), target.triggeredCheckpoints);
+		for (InputChannel channel : gate.getInputChannels().values()) {
+			assertFalse(barrierHandler.isBlocked(channel.getChannelIndex()));
+			assertEquals(
+					String.format("channel %d should be resumed", channel.getChannelIndex()),
+					checkpointType.isSavepoint(),
+					((TestInputChannel) channel).wasResumed);
+		}
+	}
+
+	private void sendBarrier(long id, CheckpointType type, TestInputChannel channel, CheckpointedInputGate gate) throws Exception {
+		channel.receive(barrier(id, type));
+		while (gate.pollNext().isPresent()) {
+		}
+	}
+
+	private static AlternatingCheckpointBarrierHandler barrierHandler(SingleInputGate inputGate, AbstractInvokable target) {
+		String taskName = "test";
+		InputGate[] channelIndexToInputGate = new InputGate[inputGate.getNumberOfInputChannels()];
+		Arrays.fill(channelIndexToInputGate, inputGate);
+		return new AlternatingCheckpointBarrierHandler(
+				new CheckpointBarrierAligner(taskName, channelIndexToInputGate, singletonMap(inputGate, 0), target),
+				new CheckpointBarrierUnaligner(new int[]{inputGate.getNumberOfInputChannels()}, ChannelStateWriter.NO_OP, taskName, target),
+				target);
+	}
+
+	private Buffer barrier(long id, CheckpointType checkpointType) throws IOException {
+		return toBuffer(new CheckpointBarrier(
+				id,
+				System.currentTimeMillis(),
+				new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault(), true, true)));
+	}
+
+	private static class TestInvokable extends AbstractInvokable {
+		private List<Long> triggeredCheckpoints = new ArrayList<>();
+
+		TestInvokable() {
+			super(new DummyEnvironment());
+		}
+
+		@Override
+		public void invoke() {
+		}
+
+		@Override
+		public <E extends Exception> void executeInTaskThread(ThrowingRunnable<E> runnable, String descriptionFormat, Object... descriptionArgs) throws E {
+			runnable.run();
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
+			triggeredCheckpoints.add(checkpointMetaData.getCheckpointId());
+		}
+	}
+
+	private static class TestInputChannel extends InputChannel {
+		private final Deque<Buffer> buffers = new LinkedList<>();
+		private boolean wasResumed;
+
+		TestInputChannel(SingleInputGate inputGate, int channelIndex) {
+			super(inputGate, channelIndex, new ResultPartitionID(), 0, 10, null, null);
+		}
+
+		public void receive(Buffer buffer) {
+			buffers.add(buffer);
+			notifyChannelNonEmpty();
+		}
+
+		@Override
+		public void requestSubpartition(int subpartitionIndex) {
+		}
+
+		@Override
+		protected Optional<BufferAndAvailability> getNextBuffer() {
+			return buffers.isEmpty() ? Optional.empty() : Optional.of(new InputChannel.BufferAndAvailability(buffers.poll(), buffers.isEmpty(), buffers.size()));
+		}
+
+		@Override
+		protected void sendTaskEvent(TaskEvent event) {
+		}
+
+		@Override
+		protected boolean isReleased() {
+			return false;
+		}
+
+		@Override
+		protected void releaseAllResources() {
+		}
+
+		@Override
+		public void resumeConsumption() {
+			wasResumed = true;
+		}
+	}
+
+	private static CheckpointedInputGate buildGate(TestInvokable target, int numChannels) {
+		SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numChannels).build();
+		TestInputChannel[] channels = new TestInputChannel[numChannels];
+		for (int i = 0; i < numChannels; i++) {
+			channels[i] = new TestInputChannel(gate, i);
+		}
+		gate.setInputChannels(channels);

Review comment:
       nit: could be a oneliner with (Int)Stream.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {

Review comment:
       You most defintively need to delegate `getBufferReceivedListener`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -560,8 +559,8 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
 			}
 
 			final boolean wasEmpty;
-			final CheckpointBarrier notifyReceivedBarrier;
-			final Buffer notifyReceivedBuffer;
+			final Optional<CheckpointBarrier> notifyReceivedBarrier;
+			final Optional<Buffer> notifyReceivedBuffer;

Review comment:
       While there is a good reason to use `Optional` for barrier as it's the result of a function, I don't see the benefit of using it on buffer. 
   Did you do it for symmetry or was there a deeper reason?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -221,16 +221,10 @@ private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
 		channelStateWriter.finishOutput(checkpointId);
 	}
 
-	private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics) {
-		final Future<?> channelWrittenFuture;
-		if (unalignedCheckpointEnabled) {
-			ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId());
-			channelWrittenFuture = CompletableFuture.allOf(
-				writeResult.getInputChannelStateHandles(),
-				writeResult.getResultSubpartitionStateHandles());
-		} else {
-			channelWrittenFuture = FutureUtils.completedVoidFuture();
-		}
+	private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) {
+		Future<?> channelWrittenFuture = unalignedCheckpointEnabled && !options.getCheckpointType().isSavepoint() ?

Review comment:
       Same on ternary.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {
+
+	@Test
+	public void testCheckpointHandling() throws Exception {
+		testBarrierHandling(CHECKPOINT);
+	}
+
+	@Test
+	public void testSavepointHandling() throws Exception {
+		testBarrierHandling(SAVEPOINT);
+	}
+
+	@Test
+	public void testAlternation() throws Exception {

Review comment:
       I guess this test is super fast (<1s). If it takes longer, please reduce `numChannels` a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r418040944



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
+import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link AlternatingCheckpointBarrierHandler} test.
+ */
+public class AlternatingCheckpointBarrierHandlerTest {
+
+	@Test
+	public void testCheckpointHandling() throws Exception {
+		testBarrierHandling(CHECKPOINT);
+	}
+
+	@Test
+	public void testSavepointHandling() throws Exception {
+		testBarrierHandling(SAVEPOINT);
+	}
+
+	@Test
+	public void testAlternation() throws Exception {

Review comment:
       Checked locally, it's just a matter of milliseconds.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-622367409


   Thanks for the feedback @AHeise ,
   I added channel tests and implemented `getBufferReceivedListener`.
   Please take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * d43b046ab4c15ed91ac706b210faaaad4de1a392 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11948: [FLINK-17467] Align channels on savepoint in UC mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11948:
URL: https://github.com/apache/flink/pull/11948#issuecomment-621327009


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=431",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162758548",
       "triggerID" : "e1d9c5cffe6d375dc1bdf42472692f2756e7e58a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d54111c2ca84686374c6e1c787fa8999ad26971c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=434",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162779167",
       "triggerID" : "e3a4c94c3da14004b31914fd8d0aee507d29c17c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=501",
       "triggerID" : "d43b046ab4c15ed91ac706b210faaaad4de1a392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=513",
       "triggerID" : "e58d7190f08069c4b3f782d1be0a46c8201ae783",
       "triggerType" : "PUSH"
     }, {
       "hash" : "939c07544a571fead60da55aacdddae8f0a84966",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1188",
       "triggerID" : "939c07544a571fead60da55aacdddae8f0a84966",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278",
       "triggerID" : "de5f35ff8499cf1c4cc6455b887d1721a8ced837",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6e67644b8eaad167f6397a07320946beb3693748",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1319",
       "triggerID" : "6e67644b8eaad167f6397a07320946beb3693748",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4ce0de30cb5d324dc4a2a030e416de1f7f1a79d UNKNOWN
   * d54111c2ca84686374c6e1c787fa8999ad26971c UNKNOWN
   * de5f35ff8499cf1c4cc6455b887d1721a8ced837 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1278) 
   * 6e67644b8eaad167f6397a07320946beb3693748 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1319) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org