You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Cai Liuyang (Jira)" <ji...@apache.org> on 2022/01/17 05:59:00 UTC

[jira] [Updated] (FLINK-25664) Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

     [ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cai Liuyang updated FLINK-25664:
--------------------------------
    Description: 
For now, there might be case like:
 # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false)
 # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true)
 # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition)
 ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true
 ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1
 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false)
 # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1

In conclusion,There are three case we should trigger notifyDataAvailable:
    case1: only have one finished buffer (handled by add)
    case2: only have one unfinished buffer (handled by flush)
    case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled)
{code:java}
// test code for this case
// add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest 
@Test
public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
        throws Exception {
    blockSubpartitionByCheckpoint(1);

    subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
    subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));

    assertEquals(1, availablityListener.getNumNotifications());
    readView.resumeConsumption();
    subpartition.flush();
    assertEquals(2, availablityListener.getNumNotifications());
} {code}

  was:
For now, there might be case like:
 # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false)
 # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true)
 # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition)
 ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true
 ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1
 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false)
 # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1

In conclusion,There are three case we should trigger notifyDataAvailable:
    case1: only have one finished buffer (handled by add)
    case2: only have one unfinished buffer (handled by flush)
    case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled)
{code:java}
// test code for this case
// add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest will 
@Test
public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
        throws Exception {
    blockSubpartitionByCheckpoint(1);

    subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
    subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));

    assertEquals(1, availablityListener.getNumNotifications());
    readView.resumeConsumption();
    subpartition.flush();
    assertEquals(2, availablityListener.getNumNotifications());
} {code}


> Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25664
>                 URL: https://issues.apache.org/jira/browse/FLINK-25664
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Cai Liuyang
>            Priority: Major
>
> For now, there might be case like:
>  # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false)
>  # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true)
>  # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition)
>  ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true
>  ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1
>  # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false)
>  # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1
> In conclusion,There are three case we should trigger notifyDataAvailable:
>     case1: only have one finished buffer (handled by add)
>     case2: only have one unfinished buffer (handled by flush)
>     case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled)
> {code:java}
> // test code for this case
> // add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest 
> @Test
> public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
>         throws Exception {
>     blockSubpartitionByCheckpoint(1);
>     subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
>     subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
>     assertEquals(1, availablityListener.getNumNotifications());
>     readView.resumeConsumption();
>     subpartition.flush();
>     assertEquals(2, availablityListener.getNumNotifications());
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)