You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/25 15:14:46 UTC

[flink-table-store] branch master updated: [FLINK-26867] Hybrid reading wrong because of disordered commits

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 4936d20  [FLINK-26867] Hybrid reading wrong because of disordered commits
4936d20 is described below

commit 4936d20aa61f62208185b406f10a72c8baefe3d6
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Mar 25 23:14:41 2022 +0800

    [FLINK-26867] Hybrid reading wrong because of disordered commits
    
    This closes #63
---
 .../sink/global/AbstractCommitterOperator.java      | 21 ++++++++++++++-------
 .../connector/source/LogHybridSourceFactory.java    |  3 +--
 .../table/store/connector/ReadWriteTableITCase.java |  2 --
 .../flink/table/store/log/LogWriteCallback.java     |  5 ++++-
 4 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
index 7b807bb..fad3e8d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
@@ -34,9 +34,6 @@ import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.function.SerializableSupplier;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -52,8 +49,6 @@ public abstract class AbstractCommitterOperator<IN, CommT>
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitterOperator.class);
-
     /** Record all the inputs until commit. */
     private final Deque<IN> inputs = new ArrayDeque<>();
 
@@ -115,16 +110,28 @@ public abstract class AbstractCommitterOperator<IN, CommT>
 
     @Override
     public void endInput() throws Exception {
+        // Suppose the last checkpoint before endInput is 5. Flink Streaming Job calling order:
+        // 1. Receives elements from upstream prepareSnapshotPreBarrier(5)
+        // 2. this.snapshotState(5)
+        // 3. Receives elements from upstream endInput
+        // 4. this.endInput
+        // 5. this.notifyCheckpointComplete(5)
+        // So we should submit all the data in the endInput in order to avoid disordered commits.
+        long checkpointId = Long.MAX_VALUE;
         List<IN> poll = pollInputs();
         if (!poll.isEmpty()) {
-            commit(false, toCommittables(Long.MAX_VALUE, poll));
+            committablesPerCheckpoint.put(checkpointId, toCommittables(checkpointId, poll));
         }
+        commitUpToCheckpoint(checkpointId);
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        LOG.info("Committing the state for checkpoint {}", checkpointId);
+        commitUpToCheckpoint(checkpointId);
+    }
+
+    private void commitUpToCheckpoint(long checkpointId) throws Exception {
         NavigableMap<Long, List<CommT>> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
         commit(false, committables(headMap));
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
index 0f892fc..e73a4e3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
@@ -44,8 +44,7 @@ public class LogHybridSourceFactory
         Snapshot snapshot = enumerator.snapshot();
         Map<Integer, Long> logOffsets = null;
         if (snapshot != null) {
-            // TODO
-            // logOffsets = snapshot.getLogOffsets();
+            logOffsets = snapshot.getLogOffsets();
         }
         return provider.createSource(logOffsets);
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 98c2682..86ec5dc 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.nio.file.Paths;
@@ -168,7 +167,6 @@ public class ReadWriteTableITCase extends KafkaTableTestBase {
         checkFileStorePath(tEnv, managedTable);
     }
 
-    @Ignore("changelog case is failed")
     @Test
     public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
         String managedTable = prepareEnvAndWrite(true, true, true, true);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
index 03c62d6..d6c849e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -37,7 +37,10 @@ public class LogWriteCallback implements WriteCallback {
             // computeIfAbsent will lock on the key
             acc = offsetMap.computeIfAbsent(bucket, k -> new LongAccumulator(Long::max, 0));
         } // else lock free
-        acc.accumulate(offset);
+
+        // Save the next offset, what we need to provide to the hybrid reading is the starting
+        // offset of the next transaction
+        acc.accumulate(offset + 1);
     }
 
     public Map<Integer, Long> offsets() {