You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/25 15:14:46 UTC
[flink-table-store] branch master updated: [FLINK-26867] Hybrid reading wrong because of disordered commits
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 4936d20 [FLINK-26867] Hybrid reading wrong because of disordered commits
4936d20 is described below
commit 4936d20aa61f62208185b406f10a72c8baefe3d6
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Mar 25 23:14:41 2022 +0800
[FLINK-26867] Hybrid reading wrong because of disordered commits
This closes #63
---
.../sink/global/AbstractCommitterOperator.java | 21 ++++++++++++++-------
.../connector/source/LogHybridSourceFactory.java | 3 +--
.../table/store/connector/ReadWriteTableITCase.java | 2 --
.../flink/table/store/log/LogWriteCallback.java | 5 ++++-
4 files changed, 19 insertions(+), 12 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
index 7b807bb..fad3e8d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
@@ -34,9 +34,6 @@ import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.function.SerializableSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@@ -52,8 +49,6 @@ public abstract class AbstractCommitterOperator<IN, CommT>
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitterOperator.class);
-
/** Record all the inputs until commit. */
private final Deque<IN> inputs = new ArrayDeque<>();
@@ -115,16 +110,28 @@ public abstract class AbstractCommitterOperator<IN, CommT>
@Override
public void endInput() throws Exception {
+ // Suppose the last checkpoint before endInput is 5. Flink Streaming Job calling order:
+ // 1. Receives elements from upstream prepareSnapshotPreBarrier(5)
+ // 2. this.snapshotState(5)
+ // 3. Receives elements from upstream endInput
+ // 4. this.endInput
+ // 5. this.notifyCheckpointComplete(5)
+ // So we should submit all the data in the endInput in order to avoid disordered commits.
+ long checkpointId = Long.MAX_VALUE;
List<IN> poll = pollInputs();
if (!poll.isEmpty()) {
- commit(false, toCommittables(Long.MAX_VALUE, poll));
+ committablesPerCheckpoint.put(checkpointId, toCommittables(checkpointId, poll));
}
+ commitUpToCheckpoint(checkpointId);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
- LOG.info("Committing the state for checkpoint {}", checkpointId);
+ commitUpToCheckpoint(checkpointId);
+ }
+
+ private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, List<CommT>> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
commit(false, committables(headMap));
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
index 0f892fc..e73a4e3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
@@ -44,8 +44,7 @@ public class LogHybridSourceFactory
Snapshot snapshot = enumerator.snapshot();
Map<Integer, Long> logOffsets = null;
if (snapshot != null) {
- // TODO
- // logOffsets = snapshot.getLogOffsets();
+ logOffsets = snapshot.getLogOffsets();
}
return provider.createSource(logOffsets);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 98c2682..86ec5dc 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.types.Row;
-import org.junit.Ignore;
import org.junit.Test;
import java.nio.file.Paths;
@@ -168,7 +167,6 @@ public class ReadWriteTableITCase extends KafkaTableTestBase {
checkFileStorePath(tEnv, managedTable);
}
- @Ignore("changelog case is failed")
@Test
public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
String managedTable = prepareEnvAndWrite(true, true, true, true);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
index 03c62d6..d6c849e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -37,7 +37,10 @@ public class LogWriteCallback implements WriteCallback {
// computeIfAbsent will lock on the key
acc = offsetMap.computeIfAbsent(bucket, k -> new LongAccumulator(Long::max, 0));
} // else lock free
- acc.accumulate(offset);
+
+ // Save the next offset, what we need to provide to the hybrid reading is the starting
+ // offset of the next transaction
+ acc.accumulate(offset + 1);
}
public Map<Integer, Long> offsets() {