You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/15 10:19:13 UTC

[GitHub] [flink] curcur opened a new pull request #13648: Single task result partition type pr

curcur opened a new pull request #13648:
URL: https://github.com/apache/flink/pull/13648


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8354",
       "triggerID" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365",
       "triggerID" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c306ae3258d3e27f22abafe3db27eeb413467dac",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c306ae3258d3e27f22abafe3db27eeb413467dac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ccacf6664b0f6ec3630a939f8dd155f53226e4b7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365) 
   * c306ae3258d3e27f22abafe3db27eeb413467dac UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] AHeise merged pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
AHeise merged pull request #13648:
URL: https://github.com/apache/flink/pull/13648


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13648: Single task result partition type pr

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709101715


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 4ed05009530e34f77deff767a04cd4c14c15378d (Thu Oct 15 10:22:29 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **Invalid pull request title: No valid Jira ID provided**
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508372546



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Is it the same netty thread after reconnection? (same question as above)
   And from the call hierarchy, I see it's called not only from the netty thread.
   
   In either case, I think here it's easier to avoid concurrency issues than to investigate whether they can happen.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   And if we only release before creation, this whole threading interaction model would be simplified in a great way.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   I have evaluated these two methods before, and I feel set view.parent -> null may be a cleaner way (that's why I proposed null out parent). Conceptually, it breaks the cut between the old view and the new view; Implementation wise, it can limit the (parent == null) handling mostly within `PipelinedApproximateSubpartition` or probably a little bit in `PipelinedSubpartition`, while in the reference check way, we have to change the interface and touch all the subpartitions that implements `PipelinedSubpartition`. Notice that not just `pollNext()` needs the check, everything that touches parent needs a check.
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I do a follow-up one to null-out the parent? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   And also, it is easier to review, and for me to focus on tests after null-out parent as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r510009258



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBufferCleanupRequired = false;

Review comment:
       `@GuardedBy("buffers")` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714599452


   Thanks, @rkhachatryan for clarifying the problem! 
   I agree there is a problem if a downstream task continuously fails multiple times, or an orphan task execution may exist for a short period of time after new execution is running (as described in the FLIP).
   
   Here is an idea of how to cleanly and thoroughly solve this kind of problem:
   1. We go with the simplified release view version: only release view before a new creation (in thread2). That says we won't clean up view when downstream task disconnects (`releaseView` would not be called from the reference copy of view) (in thread1 or 2).
      - This would greatly simplify the threading model
      - This won't cause any resource leak, since view release is only to notify the upstream result partition to releaseOnConsumption when all subpartitions are consumed in PipelinedSubPartitionView. In our case, we do not release result partition on consumption any way (the result partition is put in track in JobMaster, similar to the ResultParition.blocking Type).
   
   2. Each view is associated with a downstream task execution version
      - This is making sense because we actually have different versions of view now, corresponding to the vertex.version of the downstream task.
      - createView is performed only if the new version to create is greater than the existing one
      - If we decide to create a new view, the old view's parent (subpartition) is set --> invalid
   
   I think this way, we can completely disconnect the old view with the subpartition. Besides that, the working handler in use would always hold the freshest view reference.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   In short:
   **The current behavior of PIPELINED is** ->
   Release partition as soon as consumer exits
   Release partition as soon as producer fails/canceled
   
   **Current behavior of PIPELINED_APPOXIMATE** ->
   Release partition as soon as producer fails/canceled
   Release partition when the job exists
   
   **I think what Till prefer is to unify Pipelined Family to**
   Release partition when producer exits.
   
   **And my question is whether we can unify Blocking + Pieliened Family to**
   Producer release partition when producer fails/canceled
   Release partition when the job exists
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508353713



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -259,9 +260,10 @@ BufferAndBacklog pollBuffer() {
 			}
 
 			while (!buffers.isEmpty()) {
-				BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
+				BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength = buffers.peek();
+				BufferConsumer bufferConsumer = requireNonNull(bufferConsumerWithPartialRecordLength).getBufferConsumer();

Review comment:
       I think this IDE warning doesn't make sense here so.
   I'd prefer to alter IDE behavior rather than code (e.g. by adding `@SuppressWarnings("ConstantConditions")`).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1dcce3b9301d178ae2e3ab05888260b05f24b15a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016) 
   * d3709faf9515053ca9873382f5b0ce83c127ff2d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**
   
   I couldn't see potential risks we can not do this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13648: Single task result partition type pr

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4ed05009530e34f77deff767a04cd4c14c15378d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   1. Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508405933



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       In Task#releaseResources
   ```
    if (isCanceledOrFailed()) {
   				partitionWriter.fail(getFailureCause());
   			}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
tillrohrmann edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-717803324


   Sorry for joining the discussion so late but a couple of questions came up when discussing scheduler changes with Yuan offline. I wanted to ask why we need a special `ResultPartitionType` for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   As far as I understand the existing `ResultPartitionType.PIPELINED(_BOUNDED)` cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a `Task`? We could say that a `Task` can only terminate once the pipelined result partition has been consumed. Moreover, a `Task` will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the `Task` reaches a terminal state).
   
   I would love to hear your feedback @pnowojski, @zhijiangW, @curcur and @rkhachatryan and also learn more about you reasoning to introduce a new result partition type.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508374852



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Yes, that's so true upon reconnection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508374852



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Yes, that's so true upon reconnection. Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   1. Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508404190



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       But I can think of a possible case that might have a problem:
   
   Thread1: create view
   Thread2: release view, create view
   Thread1: release view
   
   Let me think of this tonight. In the worst case, we can always release view before create a new one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508348714



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       Thanks for the explanation.
   
   Can elaborate on 
   > In task failure case, the produced partition would be released by its corresponding RecordWriter.
   
   I don't see any calls from `RecordWriter` to release its `targetPartition`. Do you mean they will be added in the next diff too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-712565866


   Hey @rkhachatryan , thanks so much for reviewing the code!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   I have evaluated these two methods before, and I feel set view.parent -> null may be a cleaner way (that's why I proposed null out parent). Conceptually, it breaks the connection between the old view and the new view; Implementation wise, it can limit the (parent == null) handling mostly within `PipelinedApproximateSubpartition` or probably a little bit in `PipelinedSubpartition`, while in the reference check way, we have to change the interface and touch all the subpartitions that implements `PipelinedSubpartition`. Notice that not just `pollNext()` needs the check, everything that touches parent needs a check.
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I do a follow-up one to null-out the parent? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   And also, it is easier to review, and for me to focus on tests after null-out parent as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8354",
       "triggerID" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365",
       "triggerID" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c306ae3258d3e27f22abafe3db27eeb413467dac",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8668",
       "triggerID" : "c306ae3258d3e27f22abafe3db27eeb413467dac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ccacf6664b0f6ec3630a939f8dd155f53226e4b7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365) 
   * c306ae3258d3e27f22abafe3db27eeb413467dac Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8668) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1dcce3b9301d178ae2e3ab05888260b05f24b15a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016) 
   * d3709faf9515053ca9873382f5b0ce83c127ff2d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8354",
       "triggerID" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365",
       "triggerID" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c306ae3258d3e27f22abafe3db27eeb413467dac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8668",
       "triggerID" : "c306ae3258d3e27f22abafe3db27eeb413467dac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "40c0741086cd9cadd88165d58f59c3e3d04b3deb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8689",
       "triggerID" : "40c0741086cd9cadd88165d58f59c3e3d04b3deb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a8a44c098ddafaa6b849fdcb418bfda907298c9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8743",
       "triggerID" : "0a8a44c098ddafaa6b849fdcb418bfda907298c9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a8a44c098ddafaa6b849fdcb418bfda907298c9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8743) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-721218263


   Thanks for your detailed response @curcur and the explanation of your reasoning. I agree that isolating changes is a very good argument for introducing a separate implementation.
   
   Concerning your thought whether all result partition types should be explicitly managed by the JM I think it is interesting to think it through. It would definitely help to get rid of some special cases. On the other hand, it would always require a network roundtrip to release the resources.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714599452


   Thanks, @rkhachatryan for clarifying the problem! 
   I agree there is a problem if a downstream task continuously fails multiple times, or an orphan task execution may exist for a short period of time after new execution is running (as described in the FLIP).
   
   Here is an idea of how to cleanly and thoroughly solve this kind of problem:
   1. We go with the simplified release view version: only release view before a new creation (in thread2). That says we won't clean up view when downstream task disconnects (`releaseView` would not be called from the reference copy of view) (in thread1 or 2).
      - This would greatly simplify the threading model
      - This won't cause any resource leak, since view release is only to notify the upstream result partition to releaseOnConsumption when all subpartitions are consumed in PipelinedSubPartitionView. In our case, we do not release result partition on consumption any way (the result partition is put in track in JobMaster, similar to the ResultParition.blocking Type).
   
   2. Each view is associated with a downstream task execution version
      - This is making sense because we actually have different versions of view now, corresponding to the vertex.version of the downstream task.
      - createView is performed only if the new version to create is greater than the existing one
      - If we decide to create a new view, the old view's parent (subpartition) is set --> invalid
   
   I think this way, we can completely disconnect the old view with the subpartition. Besides that, the working handler would always hold the freshest view reference.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed upon downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view again, since a view can only be released once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508236580



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();

Review comment:
       the build() method is a normal build method that is used in the normal build as well.
   
   probably just keep it as it is?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508405933



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       In Task#releaseResources
                       ```
     if (isCanceledOrFailed()) {
   				partitionWriter.fail(getFailureCause());
   			}
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       In Task#releaseResources
                         if (isCanceledOrFailed()) {
   				partitionWriter.fail(getFailureCause());
   			}

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       In Task#releaseResources
                        ```
    if (isCanceledOrFailed()) {
   				partitionWriter.fail(getFailureCause());
   			}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713516220


   addressed @rkhachatryan 's comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508234412



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       That's a good question, I probably should add some description somewhere to explain this.
   
   For view release, the `PipelinedSubpartitionView#releaseAllResources` (the original one) notifies its parent's parent (the result partition) that the corresponding subpartition has already been consumed (able to release). If all its subpartitions are released, the result partition is released.
   
   In `PipelinedApproximateSubpartitionView#releaseAllResources`, we only set the readView to null; The partition is set tracked in `JobMasterPartitionTrackerImpl#startTrackingPartition` . The change would be included in the next diff (draft PR https://github.com/apache/flink/pull/13677/commits/7d31593b3db7cc2a2448c69750ab0963e189c051), with the failover strategy change altogether.
   
   In that case, the result partition would be released in case of the job exit.
   
   In task failure case, the produced partition would be released by its corresponding RecordWriter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed up downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   thread2 release the view thread1 holds the reference before creating a new view.
   When thread1 ties to release the view from the old reference, as long as we disconnects the connection between the old view and the subpartition, we should be fine.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8354",
       "triggerID" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ff9581bc2381e224c042f954bac0a7d8c371845a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8354) 
   * ccacf6664b0f6ec3630a939f8dd155f53226e4b7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation and no other places, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the motivation. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   In short:
   **The current behavior of PIPELINED is** ->
   Release partition as soon as consumer exits
   Release partition as soon as producer fails/canceled
   
   **Current behavior of PIPELINED_APPOXIMATE** ->
   Release partition as soon as producer fails/canceled
   Release partition when the job exists
   
   **I think what Till prefer is to unify Pipelined Family to**
   Release partition when producer exits.
   
   **And my question is whether we can unify Blocking + Pieliened Family to**
   Producer release partition when producer fails/canceled
   Release partition when the job exists
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   In short:
   **The current behavior of PIPELINED is** ->
   - release partition as soon as consumer exits
   - release partition as soon as producer fails/canceled
   **Current behavior of PIPELINED_APPOXIMATE** ->
   - do nothing when consumer exits
   - release partition as soon as producer fails/canceled
   - release partition when the job exists
   
   **I think what Till prefer is to unify Pipelined Family to**
   **Producer release partition when producer exits.**
   
   **And my question is whether we can unify Blocking + Pieliened Family to**
   - Producer release partition when producer fails/canceled
   - Release partition when the job exists
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed up downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   thread2 releases the view thread1 holding the reference before creating a new view. thread1 can not releases the view again, a view can only be released once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed upon downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view (through view reference) again afterwards, since a view can only be released once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   I would slightly prefer to set view.parent -> null;  Conceptually, it breaks the connection between the old view and its parent; Implementation wise, it can limit the (parent == null) handling mostly within `PipelinedApproximateSubpartitionView`. Notice that not just `pollNext()` needs the check, I think everything that touches view.parent needs a check.
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I do a follow-up one to null-out the parent? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   It is easier to review as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508249209



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;
+		isPartialBuffer = true;

Review comment:
       That a good name!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508248106



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
##########
@@ -130,8 +132,15 @@ public ResultPartition create(
 				bufferCompressor,
 				bufferPoolFactory);
 
+			BiFunction<Integer, PipelinedResultPartition, PipelinedSubpartition> factory;
+			if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
+				factory = PipelinedApproximateSubpartition::new;
+			} else {
+				factory = PipelinedSubpartition::new;
+			}
+

Review comment:
       LOL, I've changed from this version -> the fancy version based on Arvid's suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508250129



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       creation and release happen in the same netty server thread.
   
   a view can be released multiple times before creation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block? I think in the existing code, almost all access is guarded by the lock. But I have a simpler solution just in case. See the last paragraph.
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up above.
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. The updates are guarded by the lock. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation and no other places, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed upon downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view again afterwards, since a view can only be released once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   `null out parent` may have a problems of race condition.
    Notice that not just `pollNext()` needs the check, I think everything that touches view.parent needs a check. 
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I do a follow-up one to null-out or check reference? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   It is easier to review as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-720296877


   Hey @pnowojski , I rebased it again.
   
   0a8a44c Azure: SUCCESS
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed upon downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the view again, since a view can only be released once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-717803324


   Sorry for joining the discussion so late but a couple of questions came up when discussing scheduler changes with Yuan offline. I wanted to ask why we need a special `ResultPartitionType` for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   As far as I understand the existing `ResultPartitionType.PIPELINED(_BOUNDED)` cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a `Task`? We could say that a `Task` can only terminate once the pipelined result partition has been consumed. Moreover, a `Task` will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the `Task` reaches a terminal state).
   
   I would love to hear your feedback @pnowojski, @zhijiangW and @rkhachatryan and also learn more about you reasoning to introduce a new result partition type.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-717237157


   Synced up offline, I think we might have a slightly different understanding of "what the problem is"
   
   1. In any case, there won't be a "correctness" issue, because in any case, if a view reads something that not belongs to itself, the downstream would fail due to sequenceNumber check.
   2. What I mean by "the problem" is to avoid a race condition that causes such failures, which will be addressed in [FLINK-19774]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
tillrohrmann edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-717803324


   Sorry for joining the discussion so late but a couple of questions came up when discussing scheduler changes with Yuan offline. I wanted to ask why we need a special `ResultPartitionType` for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   As far as I understand the existing `ResultPartitionType.PIPELINED(_BOUNDED)` cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a `Task`? We could say that a `Task` can only terminate once the pipelined result partition has been consumed. Moreover, a `Task` will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the `Task` reaches a terminal state).
   
   I would love to hear your feedback @pnowojski, @zhijiangW, @curcur and @rkhachatryan and also learn more about your reasoning to introduce a new result partition type.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508404190



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       But I can think of a possible case that might have a problem:
   
   Thread1: create view
   Thread2: release view, create view
   Thread1: release view (the release is way too slow)
   
   Let me think of this tonight. In the worst case, we can always release view before create a new one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r509536700



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -259,9 +260,10 @@ BufferAndBacklog pollBuffer() {
 			}
 
 			while (!buffers.isEmpty()) {
-				BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
+				BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength = buffers.peek();
+				BufferConsumer bufferConsumer = requireNonNull(bufferConsumerWithPartialRecordLength).getBufferConsumer();

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the motivation. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unify them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   In short:
   **The current behavior of PIPELINED is** ->
   Release partition as soon as consumer exits
   Release partition as soon as producer fails/canceled
   
   **Current behavior of PIPELINED_APPOXIMATE** ->
   Release partition as soon as producer fails/canceled
   Release partition when the job exists
   
   **I think what Till prefer is to unify Pipelined Family to**
   Release partition when producer exits.
   
   **And my question is whether we can unify Blocking + Pieliened Family to**
   Producer release partition when producer fails/canceled
   Release partition when the job exists
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508345496



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Can't this thread change upon reconnection?
   What about `isBlockedByCheckpoint`? It's read by other threads.
   And even if it would be safe today, leaving this method without even a check seems dangerous in the future.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508250129



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       creation and release happen in the same netty server thread.
   
   a view can be released multiple times before creation. The release behavior is idempotent. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**
   
   I couldn't see potential risks we can not do this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-717237157


   Synced up offline, I think we might have a slightly different understanding of "what the problem is"
   
   1. In any case, there won't be a "correctness" issue, because in any case, if a view reads something that it should not, the downstream would fail due to sequenceNumber check.
   2. What I mean by "the problem" is to avoid a race condition that causes such failures, which will be addressed in [FLINK-19774]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   I have evaluated these two methods before, and I feel set view.parent -> null may be a cleaner way (that's why I proposed null out parent). Conceptually, it breaks the connection between the old view and its parent; Implementation wise, it can limit the (parent == null) handling mostly within `PipelinedApproximateSubpartition` or probably a little bit in `PipelinedSubpartition`, while in the reference check way, we have to change the interface and touch all the subpartitions that implements `PipelinedSubpartition`. Notice that not just `pollNext()` needs the check, everything that touches parent needs a check.
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I do a follow-up one to null-out the parent? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   And also, it is easier to review, and for me to focus on tests after null-out parent as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d259609d97fa73cd79b3dd400e2868f6be1c5e89 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747) 
   * 1dcce3b9301d178ae2e3ab05888260b05f24b15a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508234412



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       That's a good question, I probably should add some description somewhere to explain this.
   
   For view release, the `PipelinedSubpartitionView#releaseAllResources` (the original one) notifies its parent's parent (the result partition) that the corresponding subpartition has already been consumed (able to release). If all its subpartitions are released, the result partition is released.
   
   In `PipelinedApproximateSubpartitionView#releaseAllResources`, we only set the readView to null; The partition is set tracked in `JobMasterPartitionTrackerImpl#startTrackingPartition` . The change would be included in the next diff (draft PR https://github.com/apache/flink/pull/13677/commits/7d31593b3db7cc2a2448c69750ab0963e189c051), with the failover strategy change altogether.
   
   In that case, the result partition would be released in case of job exit.
   
   In task failure case, the produced partition would be released by the recordwriter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709101715


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit d259609d97fa73cd79b3dd400e2868f6be1c5e89 (Fri Oct 16 10:58:01 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19632).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   `null out parent` may have a problems of race condition.
    Notice that not just `pollNext()` needs the check, I think everything that touches view.parent needs a check. 
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I do a follow-up one to null-out the parent? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   It is easier to review as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3709faf9515053ca9873382f5b0ce83c127ff2d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163) 
   * ff9581bc2381e224c042f954bac0a7d8c371845a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8354",
       "triggerID" : "ff9581bc2381e224c042f954bac0a7d8c371845a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365",
       "triggerID" : "ccacf6664b0f6ec3630a939f8dd155f53226e4b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ccacf6664b0f6ec3630a939f8dd155f53226e4b7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8365) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up above.
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once, and guarded by the lock.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation and no other places, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508193395



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       The creation and release all go through the netty server thread, so I do not think it needs a lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508235769



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -259,9 +260,10 @@ BufferAndBacklog pollBuffer() {
 			}
 
 			while (!buffers.isEmpty()) {
-				BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
+				BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength = buffers.peek();
+				BufferConsumer bufferConsumer = requireNonNull(bufferConsumerWithPartialRecordLength).getBufferConsumer();

Review comment:
       add here to avoid IDE check warning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-716973831


   Hey @rkhachatryan and @pnowojski , thanks for the response.
   
   > We realized that if we check view reference in subpartition (or null out view.parent) then the downstream which view was overwritten by an older thread, will sooner or later be fenced by the upstream (or checkpoint will time out).
   > FLINK-19774 then becomes only an optimization.
   
   1. Do you have preferences on `check view reference` vs `null out view.parent`? 
   I have evaluated these two methods before, and I feel set view.parent -> null may be a cleaner way (that's why I proposed null out parent). Conceptually, it breaks the cut between the old view and the new view; Implementation wise, it can limit the (parent == null) handling mostly within `PipelinedApproximateSubpartition` or probably a little bit in `PipelinedSubpartition`, while in the reference check way, we have to change the interface and touch all the subpartitions that implements `PipelinedSubpartition`. Notice that not just `pollNext()` needs the check, everything that touches parent needs a check.
   
   2. How this can solve the problem of a new view is replaced by an old one? Let's say downstream reconnects, asking for a new view; the new view is created; replaced by an old view that is triggered by an old handler event. The new view's parent is null-out. Then what will happen? I do not think "fenced by the upstream (or checkpoint will time out)" can **solve** this problem, it just ends with more failures caused by this problem.
   
   > So we can prevent most of the issues without touching deployment descriptors.
   > Therefore, I think it makes sense to implement it in this PR. Sorry for changing the decision.
   
   I am overall fine to put the check within this PR. However, may I ask how it is different from we have this PR as it is, and I fire a follow-up one to null-out the parent? 
   In this PR, my main purpose is to introduce a different result partition type, and scheduler changes are based upon this new type. That's the main reason I prefer to do it in a follow-up PR otherwise the scheduler part is blocked.
   
   And also, it is easier to review, and for me to focus on tests after null-out parent as well.
   
   > Another concern is a potential resource leak if downstream continuously fail without notifying the upstream (instances of CreditBasedSequenceNumberingViewReader will accumulate). Can you create a follow-up ticket for that?
   > This can be addressed by firing user event (see PartitionRequestQueue.userEventTriggered).
   
   Yes, that's a good point.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**
   
   I couldn't see potential risks we can not do this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the motivation. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unify them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On a second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   In short:
   **The current behavior of PIPELINED is** ->
   Release partition as soon as consumer exits
   Release partition as soon as producer fails/canceled
   
   **Current behavior of PIPELINED_APPOXIMATE** ->
   Release partition as soon as producer fails/canceled
   Release partition when the job exists
   
   **I think what Till prefer is to unify Pipelined Family to**
   Release partition when producer exits.
   
   **And my question is whether we can unify Blocking + Pieliened Family to**
   Producer release partition when producer fails/canceled
   Release partition when the job exists
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508382240



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Would this possibly happen?
   
   > Thread1: call subpartition.createReadView() - create view1
   > Thread2: obtain a reference to view1 
   It is not possible to access to view1 through a different thread, unless downstream reconnects, meaning either thread1 releases the view upon disconnecting from downstream or a different thread (thread2) reconnects and release the view; that would be guarded by the buffer lock as you suggested.
   
   > Thread1: call subpartition.createReadView() - create view2
   > Thread2: call view1.releaseAllResources <-- nulls out subpartition.readView; view2 is now corrupt?
   same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163",
       "triggerID" : "d3709faf9515053ca9873382f5b0ce83c127ff2d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3709faf9515053ca9873382f5b0ce83c127ff2d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8163) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508405933



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       In Task#releaseResources
   ```
                           if (isCanceledOrFailed()) {
   				partitionWriter.fail(getFailureCause());
   			}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
     - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).
   
   > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
   
   > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management of result partitions into JobMaster, no matter is a blocking family or pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   In short:
   **The current behavior of PIPELINED is** ->
   - release partition as soon as consumer exits
   - release partition as soon as producer fails/canceled
   
   **Current behavior of PIPELINED_APPOXIMATE** ->
   - do nothing when consumer exits
   - release partition as soon as producer fails/canceled
   - release partition when the job exists
   
   **I think what Till prefer is to unify Pipelined Family to**
   - Producer release partition when producer exits.
   
   **And my question is whether we can unify Blocking + Pieliened Family to**
   - Producer release partition when producer fails/canceled
   - Release partition when the job exists
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714599452


   Thanks, @rkhachatryan for clarifying the problem! 
   I agree there is a problem if a downstream task continuously fails multiple times, or an orphan task execution may exist for a short period of time after new execution is running (as described in the FLIP).
   
   Here is an idea of how to cleanly and thoroughly solve this kind of problem:
   1. We go with the simplified release view version: only release view before a new creation (in thread2). That says we won't clean up view when downstream task disconnects (`releaseView` would not be called from the reference copy of view) (in thread1 or 2).
      - This would greatly simplify the threading model
      - This won't cause any resource leak, since view release is only to notify the upstream result partition to releaseOnConsumption when all subpartitions are consumed in PipelinedSubPartitionView. In our case, we do not release result partition on consumption any way (the result partition is put in track in JobMaster, similar to the ResultParition.blocking Type).
   
   2. Each view is associated with a downstream task execution version
      - This is making sense because we actually have different versions of view now, corresponding to the vertex.version of the downstream task.
      - createView is performed only if the new version to create is greater than the existing one
      - If we decide to create a new view, the old view's parent (subpartition) is set --> invalid
   
   I think this way, we can completely disconnect the old view with the subpartition. Besides that, the working handler in use would always hold the freshest view reference.
   
   
   I will have 1) in this PR; I will publish a Jira ticket for 2).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block?
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up above.
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation and no other places, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: Single task result partition type pr

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4ed05009530e34f77deff767a04cd4c14c15378d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508382240



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Would this possibly happen?
   
   > Thread1: call subpartition.createReadView() - create view1
   > Thread2: obtain a reference to view1 
   
   It is not possible to access to view1 through a different thread, unless downstream reconnects, meaning either thread1 releases the view upon disconnecting from downstream or a different thread (thread2) reconnects and release the view; that would be guarded by the buffer lock as you suggested.
   
   > Thread1: call subpartition.createReadView() - create view2
   > Thread2: call view1.releaseAllResources <-- nulls out subpartition.readView; view2 is now corrupt?
   same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction model would be simplified in a great way.**
   
   I couldn't see potential risks we can not do this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508405933



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       In `Task#releaseResources`
   ```
                           if (isCanceledOrFailed()) {
   				partitionWriter.fail(getFailureCause());
   			}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508348714



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       Thanks for the explanation.
   
   Can you please elaborate on 
   > In task failure case, the produced partition would be released by its corresponding RecordWriter.
   
   I don't see any calls from `RecordWriter` to release its `targetPartition`. Do you mean they will be added in the next diff too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714613721


   Follow-up ticket:
   
   https://issues.apache.org/jira/browse/FLINK-19774


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   Aren't they guarded by the synchronization block? I think in the existing code, almost all access is guarded by the lock. But I have a simpler solution just in case. See the last sentence.
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up above.
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. The updates are guarded by the lock. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   I am actually having an idea to simplify this whole model:
   **If we only release before creation and no other places, this whole threading interaction model would be simplified in a great way. That says only one new netty thread can release the view**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-714211577


   > * Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   >   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   That's true, in that case, let's not `releaseView()` during downstream task cancelation? And `releaseView()` is done only before creating a new view? 
   
   > * Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   >   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?
   
   I think this is the same question I answered in the write-up: 
   In short, it won't be possible, because a view can only be released once and this is guarded by the release flag of the view, details quoted below. 
   
   - What if the netty thread1 release view after netty thread2 recreates the view?
   Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.
   
   **And if we only release before creation, this whole threading interaction model would be simplified in a great way. That says only one netty thread can release the view**
   
   I couldn't see potential risks we can not do this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r509191005



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       added the lock.
   
   two netty threads may require the same view to release.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r509537030



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       Got it, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d259609d97fa73cd79b3dd400e2868f6be1c5e89 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r509213967



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -259,9 +260,10 @@ BufferAndBacklog pollBuffer() {
 			}
 
 			while (!buffers.isEmpty()) {
-				BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
+				BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength = buffers.peek();
+				BufferConsumer bufferConsumer = requireNonNull(bufferConsumerWithPartialRecordLength).getBufferConsumer();

Review comment:
       I agree, let's just remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r507745730



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
##########
@@ -71,7 +71,17 @@
 	 * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
 	 * no checkpoint barriers.
 	 */
-	PIPELINED_BOUNDED(true, true, true, false);
+	PIPELINED_BOUNDED(true, true, true, false),
+
+	/**
+	 * Pipelined partitions with a bounded (local) buffer pool to support downstream task to
+	 * continue consuming data after reconnection in Approximate Local-Recovery.
+	 *
+	 * <p>Pipelined results can be consumed only once by a single consumer at one time.
+	 * {@link #PIPELINED_APPROXIMATE} is different from {@link #PIPELINED_BOUNDED} in that
+	 * {@link #PIPELINED_APPROXIMATE} is not decomposed automatically after consumption.
+	 */
+	PIPELINED_APPROXIMATE(true, true, true, true);

Review comment:
       Can you please explain why this partition type is bounded?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",

Review comment:
       I think this message should mention that a new view is being created.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       The writes in this method should be done under a lock, right?
   But I'm not sure that all execution paths do acquire this lock.
   Should we add `synchronized (buffers)` or `checkState(Thread.holdsLock)`?
   

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;
+		isPartialBuffer = true;
+		isBlockedByCheckpoint = false;
+		sequenceNumber = 0;
+	}
+
+	@Override
+	public String toString() {

Review comment:
       I couldn't find any differences from `super.toString` other than class name.
   Can we just replace in super `"PipelinedSubpartition` with `getSiimpleClassName` instead of overriding?
   
   ditto: view

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();

Review comment:
       nit: `super.buildSliceBuffer` ?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * View over a pipelined in-memory only subpartition allowing reconnecting.
+ */
+public class PipelinedApproximateSubpartitionView extends PipelinedSubpartitionView {
+
+	PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition parent, BufferAvailabilityListener listener) {
+		super(parent, listener);
+	}
+
+	@Override
+	public void releaseAllResources() {

Review comment:
       I think this method is called not only upon downstream RPC, but also on task shutdown and other cases.
   If so, completely skipping of `super.releaseAllResources` can lead to resource leaks in those cases.
   WDYT?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -259,9 +260,10 @@ BufferAndBacklog pollBuffer() {
 			}
 
 			while (!buffers.isEmpty()) {
-				BufferConsumer bufferConsumer = buffers.peek().getBufferConsumer();
+				BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength = buffers.peek();
+				BufferConsumer bufferConsumer = requireNonNull(bufferConsumerWithPartialRecordLength).getBufferConsumer();

Review comment:
       I think there is no point in adding explicit `requireNonNull` just before dereferencing it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       I'm concerned about a potential race condition here (even with `synchronized` added).
   
   Consider a case:
   Thread1: call `subpartition.createReadView()` - create `view1`
   Thread2: obtain a reference to `view1`
   Thread1: call `subpartition.createReadView()` - create `view2`
   Thread2: call `view1.releaseAllResources` <-- nulls out subpartition.readView; `view2` is now corrupt?
   
   WDYT?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;
+		isPartialBuffer = true;

Review comment:
       The name `isPartialBuffer` is a bit misleading to me because it implies that partial buffer was emitted.
   But in fact, this field reflects that the view was released.
   How about `isPartialBufferCleanupRequired`?
   

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
##########
@@ -130,8 +132,15 @@ public ResultPartition create(
 				bufferCompressor,
 				bufferPoolFactory);
 
+			BiFunction<Integer, PipelinedResultPartition, PipelinedSubpartition> factory;
+			if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
+				factory = PipelinedApproximateSubpartition::new;
+			} else {
+				factory = PipelinedSubpartition::new;
+			}
+

Review comment:
       nit: I'd prefer this simple ternary if in a loop:
   ```
   for (int i = 0; i < subpartitions.length; i++) {
       subpartitions[i] = type == ResultPartitionType.PIPELINED_APPROXIMATE ?
           new PipelinedApproximateSubpartition(i, pipelinedPartition) :
           new PipelinedSubpartition(i, pipelinedPartition);
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508382240



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;

Review comment:
       Would this possibly happen?
   
   > Thread1: call subpartition.createReadView() - create view1
   > Thread2: obtain a reference to view1 
   
   It is not possible to access to view1 through a different thread, unless downstream reconnects, meaning either thread1 releases the view upon disconnecting from downstream or a different thread (thread2) reconnects and release the view; that would be guarded by the buffer lock as you suggested.
   
   > Thread1: call subpartition.createReadView() - create view2
   > Thread2: call view1.releaseAllResources <-- nulls out subpartition.readView; view2 is now corrupt?
   
   same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed upon downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   - Thread2 releases the view that thread1 holds the reference on before creating a new view. Thread1 can not release the old view (through view reference) again afterwards, since a view can only be released once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713798578


   Thanks a lot for the in-depth analysis and for updating the PR!
   It answered my original question about thread-safety.
   
   But I still have two other concerns:
   1. Visilibity in normal case: none of the felds written in `releaseView` are `volatile`. So in normal case (`t1:release` then `t2:createReadView`) `t2` can see some inconsistent state. For example, `readView == null`, but `isPartialBufferCleanupRequired == false`. Right?
   Maybe call `releaseView()`  from `createReadView()` unconditionally?
   
   2. Overwites when release is slow: won't `t1` overwrite changes to `PipelinedSubpartition` made already by `t2`? For example, reset `sequenceNumber` after `t2` has sent some data?
   Maybe `PipelinedSubpartition.readerView` should be `AtomicReference` and then we can guard `PipelinedApproximateSubpartition.releaseView()` by CAS on it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d259609d97fa73cd79b3dd400e2868f6be1c5e89 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747) 
   * 1dcce3b9301d178ae2e3ab05888260b05f24b15a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4ed05009530e34f77deff767a04cd4c14c15378d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677) 
   * d259609d97fa73cd79b3dd400e2868f6be1c5e89 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508193541



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A pipelined in-memory only subpartition, which allows to reconnecting after failure.
+ * Only one view is allowed at a time to read teh subpartition.
+ */
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+	private boolean isPartialBuffer = false;
+
+	PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+		super(index, parent);
+	}
+
+	@Override
+	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) {
+		synchronized (buffers) {
+			checkState(!isReleased);
+
+			// if the view is not released yet
+			if (readView != null) {
+				LOG.info("{} ReadView for Subpartition {} of {} has not been released!",
+					parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+				releaseView();
+			}
+
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), getSubPartitionIndex(), parent.getPartitionId());
+
+			readView = new PipelinedApproximateSubpartitionView(this, availabilityListener);
+		}
+
+		return readView;
+	}
+
+	@Override
+	Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
+		if (isPartialBuffer) {
+			isPartialBuffer = !buffer.cleanupPartialRecord();
+		}
+
+		return buffer.build();
+	}
+
+	void releaseView() {
+		LOG.info("Releasing view of subpartition {} of {}.", getSubPartitionIndex(), parent.getPartitionId());
+		readView = null;
+		isPartialBuffer = true;
+		isBlockedByCheckpoint = false;
+		sequenceNumber = 0;
+	}
+
+	@Override
+	public String toString() {

Review comment:
       That's a good suggestion!! Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-709153806


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7677",
       "triggerID" : "4ed05009530e34f77deff767a04cd4c14c15378d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7747",
       "triggerID" : "d259609d97fa73cd79b3dd400e2868f6be1c5e89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016",
       "triggerID" : "1dcce3b9301d178ae2e3ab05888260b05f24b15a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1dcce3b9301d178ae2e3ab05888260b05f24b15a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8016) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-720296877


   Hey @pnowojski , I rebased it again.
   
   0a8a44c Azure: SUCCESS
   
   https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8743&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #13648:
URL: https://github.com/apache/flink/pull/13648#discussion_r508193083



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
##########
@@ -71,7 +71,17 @@
 	 * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
 	 * no checkpoint barriers.
 	 */
-	PIPELINED_BOUNDED(true, true, true, false);
+	PIPELINED_BOUNDED(true, true, true, false),
+
+	/**
+	 * Pipelined partitions with a bounded (local) buffer pool to support downstream task to
+	 * continue consuming data after reconnection in Approximate Local-Recovery.
+	 *
+	 * <p>Pipelined results can be consumed only once by a single consumer at one time.
+	 * {@link #PIPELINED_APPROXIMATE} is different from {@link #PIPELINED_BOUNDED} in that
+	 * {@link #PIPELINED_APPROXIMATE} is not decomposed automatically after consumption.
+	 */
+	PIPELINED_APPROXIMATE(true, true, true, true);

Review comment:
       It is similar to `bounded` in `Pipelined_Bounded`: use a fixed limit on the buffer pool size




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery

Posted by GitBox <gi...@apache.org>.
curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-713341581


   Thanks @rkhachatryan so much for the great question on how different threads access the same view (in other words, threading model in netty + task thread interaction on view). Thanks @zhijiangW a lot for the explanation of the underlying threading model based on offline-discussion and great insights.
   
   I've spent some time on different paths of how the view is created/released in different scenarios, here is what I think is happening. @zhijiangW , please correct me if I miss anything, thanks!
   
   For a streaming job, each `view` is created through PartitionRequestServerHandler (one of the netty worker threads)'s PartitionRequest. The created view is owned by subPartition, and a reference of the view is returned and bounded with a reader.  Reader belongs to PartitionRequestServerHandler.
   
   `PipelinedApproximateSubpartition#releaseView` is used in two places:
   1. PipelinedApproximateSubpartition#createView  (From Netty thread, it can see the current version of view belonging to the subpartition)
   2. PipelinedApproximateSubpartition#releaseAllResources,
        This is called from two places, 
        - one from `reader`, the reader uses its own reference of the view to release resources. This is from downstream task cancelation. 
        - one from `subPartition release`. This is fine, the subpartition releases its own view.
   
   Two questions:
   1. Is the old view continue to read data if not disposed of successfully before the new view is created?
   - No, because the reader and the view would be removed up downstream task's cancelation request;
   
   2. What if the netty thread1 release view after netty thread2 recreates the view?
   thread2 release the view thread1 holds the reference before creating a new view.
   When thread1 ties to release the view from the old reference, as long as we disconnects the connection between the old view and the subpartition, we should be fine.
   
   In that sense, we can simply set the view.parent = null when releasing the view.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org