You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/02/10 22:02:23 UTC

[flink] branch master updated: [FLINK-26062][state/changelog] Replace poll() with remove() for PQ states

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9034b3c  [FLINK-26062][state/changelog] Replace poll() with remove() for PQ states
9034b3c is described below

commit 9034b3c32875536b8b7d99aa31c7a2e6c942c811
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 9 21:02:20 2022 +0100

    [FLINK-26062][state/changelog] Replace poll() with remove() for PQ states
    
    PriorityQueue elements with equal priority can be polled in
    different order before and after recovery.
    This may result in e.g. timers being removed or not being fired
    
    This change addresses this by recording poll() as remove(),
    which is based on equals.
---
 .../ChangelogKeyGroupedPriorityQueue.java          | 27 +++++++++++----------
 .../changelog/PriorityQueueStateChangeLogger.java  | 25 -------------------
 .../PriorityQueueStateChangeLoggerImpl.java        |  8 +------
 .../state/changelog/StateChangeOperation.java      |  4 +---
 .../restore/PriorityQueueStateChangeApplier.java   |  3 ---
 .../state/changelog/ChangelogPqStateTest.java      | 28 +++++++++++++++-------
 .../PriorityQueueStateChangeLoggerImplTest.java    | 22 -----------------
 7 files changed, 36 insertions(+), 81 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
index fba87cb..058be6d 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
@@ -42,12 +42,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class ChangelogKeyGroupedPriorityQueue<T>
         implements KeyGroupedInternalPriorityQueue<T>, ChangelogState {
     private final KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue;
-    private final PriorityQueueStateChangeLogger<T> logger;
+    private final StateChangeLogger<T, Void> logger;
     private final TypeSerializer<T> serializer;
 
     public ChangelogKeyGroupedPriorityQueue(
             KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue,
-            PriorityQueueStateChangeLogger<T> logger,
+            StateChangeLogger<T, Void> logger,
             TypeSerializer<T> serializer) {
         this.delegatedPriorityQueue = checkNotNull(delegatedPriorityQueue);
         this.logger = checkNotNull(logger);
@@ -63,11 +63,10 @@ public class ChangelogKeyGroupedPriorityQueue<T>
     @Override
     public T poll() {
         T polled = delegatedPriorityQueue.poll();
-        try {
-            logger.stateElementPolled();
-        } catch (IOException e) {
-            ExceptionUtils.rethrow(e);
-        }
+        // Record poll as remove to avoid non-deterministic replay:
+        // elements with equal priority can be polled in different order before and after recovrey,
+        // resulting in e.g. timers being removed or not fired
+        logRemoval(polled);
         return polled;
     }
 
@@ -87,11 +86,7 @@ public class ChangelogKeyGroupedPriorityQueue<T>
     @Override
     public boolean remove(T toRemove) {
         boolean removed = delegatedPriorityQueue.remove(toRemove);
-        try {
-            logger.valueElementRemoved(out -> serializer.serialize(toRemove, out), null);
-        } catch (IOException e) {
-            ExceptionUtils.rethrow(e);
-        }
+        logRemoval(toRemove);
         return removed;
     }
 
@@ -142,4 +137,12 @@ public class ChangelogKeyGroupedPriorityQueue<T>
     public void resetWritingMetaFlag() {
         logger.resetWritingMetaFlag();
     }
+
+    private void logRemoval(T toRemove) {
+        try {
+            logger.valueElementRemoved(out -> serializer.serialize(toRemove, out), null);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLogger.java
deleted file mode 100644
index d8695fe..0000000
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLogger.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.changelog;
-
-import java.io.IOException;
-
-interface PriorityQueueStateChangeLogger<T> extends StateChangeLogger<T, Void> {
-
-    void stateElementPolled() throws IOException;
-}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index 8aa5437..7725500 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -27,8 +27,7 @@ import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class PriorityQueueStateChangeLoggerImpl<K, T> extends AbstractStateChangeLogger<K, T, Void>
-        implements PriorityQueueStateChangeLogger<T> {
+class PriorityQueueStateChangeLoggerImpl<K, T> extends AbstractStateChangeLogger<K, T, Void> {
     private final TypeSerializer<T> serializer;
 
     PriorityQueueStateChangeLoggerImpl(
@@ -49,9 +48,4 @@ class PriorityQueueStateChangeLoggerImpl<K, T> extends AbstractStateChangeLogger
     @Override
     protected void serializeScope(Void unused, DataOutputViewStreamWrapper out)
             throws IOException {}
-
-    @Override
-    public void stateElementPolled() throws IOException {
-        log(StateChangeOperation.REMOVE_FIRST_ELEMENT, null);
-    }
 }
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java
index c0a75bd..91d5f94 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java
@@ -45,10 +45,8 @@ public enum StateChangeOperation {
     ADD_OR_UPDATE_ELEMENT((byte) 6),
     /** Scope: key + namespace + element (e.g. user map remove or iterator remove). */
     REMOVE_ELEMENT((byte) 7),
-    /** Scope: key + namespace, first element (e.g. priority queue poll). */
-    REMOVE_FIRST_ELEMENT((byte) 8),
     /** State metadata (name, serializers, etc.). */
-    METADATA((byte) 9);
+    METADATA((byte) 8);
     private final byte code;
 
     StateChangeOperation(byte code) {
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java
index b3a5894..08d60a2 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java
@@ -35,9 +35,6 @@ class PriorityQueueStateChangeApplier<T> implements StateChangeApplier {
     @Override
     public void apply(StateChangeOperation operation, DataInputView in) throws Exception {
         switch (operation) {
-            case REMOVE_FIRST_ELEMENT:
-                queue.poll();
-                break;
             case ADD_ELEMENT:
                 int numElements = in.readInt();
                 for (int i = 0; i < numElements; i++) {
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
index 3632a84..7b69781 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
@@ -62,6 +62,22 @@ public class ChangelogPqStateTest {
     }
 
     @Test
+    public void testPollRecorded() throws Exception {
+        testRecorded(
+                singletonList("x"),
+                ChangelogKeyGroupedPriorityQueue::poll,
+                logger -> assertTrue(logger.stateElementRemoved));
+    }
+
+    @Test
+    public void testRemoveRecorded() throws Exception {
+        testRecorded(
+                singletonList("x"),
+                state -> state.remove("x"),
+                logger -> assertTrue(logger.stateElementRemoved));
+    }
+
+    @Test
     public void testAddAllRecorded() throws Exception {
         testRecorded(
                 emptyList(),
@@ -113,19 +129,13 @@ public class ChangelogPqStateTest {
         assertion.accept(logger);
     }
 
-    private static class TestPriorityQueueChangeLogger<T>
-            implements PriorityQueueStateChangeLogger<T> {
+    private static class TestPriorityQueueChangeLogger<T> implements StateChangeLogger<T, Void> {
         public boolean stateElementChanged;
         public boolean stateCleared;
         public boolean stateElementRemoved;
         public boolean stateElementAdded;
 
         @Override
-        public void stateElementPolled() {
-            stateElementRemoved = true;
-        }
-
-        @Override
         public void valueUpdated(T newState, Void ns) {
             stateElementChanged = true;
         }
@@ -147,8 +157,8 @@ public class ChangelogPqStateTest {
 
         @Override
         public void valueElementAdded(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Void ns)
-                throws IOException {
+                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer,
+                Void ns) {
             stateElementAdded = true;
         }
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
index 946e4ed..d41b447 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -18,15 +18,9 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 
-import java.io.IOException;
-import java.util.Optional;
-
-import static org.apache.flink.state.changelog.StateChangeOperation.REMOVE_FIRST_ELEMENT;
-
 /** {@link PriorityQueueStateChangeLoggerImpl} test. */
 public class PriorityQueueStateChangeLoggerImplTest extends StateChangeLoggerTestBase<Void> {
 
@@ -41,22 +35,6 @@ public class PriorityQueueStateChangeLoggerImplTest extends StateChangeLoggerTes
     }
 
     @Override
-    protected Optional<Tuple2<Integer, StateChangeOperation>> log(
-            StateChangeOperation op,
-            String element,
-            StateChangeLogger<String, Void> logger,
-            InternalKeyContextImpl<String> keyContext)
-            throws IOException {
-        if (op == REMOVE_FIRST_ELEMENT) {
-            keyContext.setCurrentKey(element);
-            ((PriorityQueueStateChangeLogger<String>) logger).stateElementPolled();
-            return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(), op));
-        } else {
-            return super.log(op, element, logger, keyContext);
-        }
-    }
-
-    @Override
     protected Void getNamespace(String element) {
         return null;
     }