You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/04 10:42:35 UTC
[incubator-paimon] branch master updated: [core] Add base commit user and cause to commit conflict messages, also add back pressure as a possible cause for commit conflicts (#1487)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 26264b33c [core] Add base commit user and cause to commit conflict messages, also add back pressure as a possible cause for commit conflicts (#1487)
26264b33c is described below
commit 26264b33cd0bc707819325dee52046f8a26cce43
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jul 4 18:42:31 2023 +0800
[core] Add base commit user and cause to commit conflict messages, also add back pressure as a possible cause for commit conflicts (#1487)
---
.../paimon/operation/FileStoreCommitImpl.java | 104 +++++++++++++--------
.../apache/paimon/operation/FileDeletionTest.java | 2 +-
2 files changed, 67 insertions(+), 39 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3eda2bcdb..96e0ceae1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -192,6 +192,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
LOG.debug("Ready to commit\n" + committable.toString());
}
+ Snapshot latestSnapshot = null;
Long safeLatestSnapshotId = null;
List<ManifestEntry> baseEntries = new ArrayList<>();
@@ -218,15 +219,15 @@ public class FileStoreCommitImpl implements FileStoreCommit {
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from files.
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
- if (latestSnapshotId != null) {
+ latestSnapshot = snapshotManager.latestSnapshot();
+ if (latestSnapshot != null) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
- latestSnapshotId, appendTableFiles, compactTableFiles));
- noConflictsOrFail(baseEntries, appendTableFiles);
- safeLatestSnapshotId = latestSnapshotId;
+ latestSnapshot, appendTableFiles, compactTableFiles));
+ noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles);
+ safeLatestSnapshotId = latestSnapshot.id();
}
tryCommit(
@@ -249,7 +250,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
// This optimization is mainly used to decrease the number of times we read from files.
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendTableFiles);
- noConflictsOrFail(baseEntries, compactTableFiles);
+ noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles);
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
}
@@ -490,7 +491,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
Snapshot.CommitKind commitKind,
Long safeLatestSnapshotId) {
while (true) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
if (tryCommitOnce(
tableFiles,
changelogFiles,
@@ -499,7 +500,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
watermark,
logOffsets,
commitKind,
- latestSnapshotId,
+ latestSnapshot,
safeLatestSnapshotId)) {
break;
}
@@ -514,13 +515,13 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
while (true) {
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
- if (latestSnapshotId != null) {
+ if (latestSnapshot != null) {
List<ManifestEntry> currentEntries =
- scan.withSnapshot(latestSnapshotId)
+ scan.withSnapshot(latestSnapshot)
.withPartitionFilter(partitionFilter)
.plan()
.files();
@@ -535,12 +536,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
// collect index files
- Snapshot snapshot = snapshotManager.snapshot(latestSnapshotId);
- if (snapshot.indexManifest() != null) {
+ if (latestSnapshot.indexManifest() != null) {
RowDataToObjectArrayConverter converter =
new RowDataToObjectArrayConverter(partitionType);
List<IndexManifestEntry> entries =
- indexManifestFile.read(snapshot.indexManifest());
+ indexManifestFile.read(latestSnapshot.indexManifest());
for (IndexManifestEntry entry : entries) {
if (partitionFilter == null
|| partitionFilter.test(converter.convert(entry.partition()))) {
@@ -560,7 +560,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
watermark,
logOffsets,
Snapshot.CommitKind.OVERWRITE,
- latestSnapshotId,
+ latestSnapshot,
null)) {
break;
}
@@ -576,10 +576,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
- Long latestSnapshotId,
+ Snapshot latestSnapshot,
Long safeLatestSnapshotId) {
long newSnapshotId =
- latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+ latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
if (LOG.isDebugEnabled()) {
@@ -593,14 +593,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
}
- Snapshot latestSnapshot = null;
- if (latestSnapshotId != null) {
- if (!latestSnapshotId.equals(safeLatestSnapshotId)) {
- // latestSnapshotId is different from the snapshot id we've checked for conflicts,
- // so we have to check again
- noConflictsOrFail(latestSnapshotId, tableFiles);
- }
- latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+ if (latestSnapshot != null && !Objects.equals(latestSnapshot.id(), safeLatestSnapshotId)) {
+ // latestSnapshotId is different from the snapshot id we've checked for conflicts,
+ // so we have to check again
+ noConflictsOrFail(latestSnapshot.commitUser(), latestSnapshot, tableFiles);
}
Snapshot newSnapshot;
@@ -777,7 +773,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@SafeVarargs
private final List<ManifestEntry> readAllEntriesFromChangedPartitions(
- long snapshotId, List<ManifestEntry>... changes) {
+ Snapshot snapshot, List<ManifestEntry>... changes) {
List<BinaryRow> changedPartitions =
Arrays.stream(changes)
.flatMap(Collection::stream)
@@ -785,7 +781,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
.distinct()
.collect(Collectors.toList());
try {
- return scan.withSnapshot(snapshotId)
+ return scan.withSnapshot(snapshot)
.withPartitionFilter(changedPartitions)
.plan()
.files();
@@ -794,11 +790,16 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
}
- private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
- noConflictsOrFail(readAllEntriesFromChangedPartitions(snapshotId, changes), changes);
+ private void noConflictsOrFail(
+ String baseCommitUser, Snapshot latestSnapshot, List<ManifestEntry> changes) {
+ noConflictsOrFail(
+ baseCommitUser,
+ readAllEntriesFromChangedPartitions(latestSnapshot, changes),
+ changes);
}
- private void noConflictsOrFail(List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
+ private void noConflictsOrFail(
+ String baseCommitUser, List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
List<ManifestEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(changes);
@@ -810,7 +811,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
} catch (Throwable e) {
LOG.warn("File deletion conflicts detected! Give up committing.", e);
throw createConflictException(
- "File deletion conflicts detected! Give up committing.", baseEntries, changes);
+ "File deletion conflicts detected! Give up committing.",
+ baseCommitUser,
+ baseEntries,
+ changes,
+ e);
}
// fast exit for file store without keys
@@ -842,27 +847,47 @@ public class FileStoreCommitImpl implements FileStoreCommit {
+ a.identifier().toString(pathFactory)
+ "\n"
+ b.identifier().toString(pathFactory),
+ baseCommitUser,
baseEntries,
- changes);
+ changes,
+ null);
}
}
}
}
private RuntimeException createConflictException(
- String message, List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
+ String message,
+ String baseCommitUser,
+ List<ManifestEntry> baseEntries,
+ List<ManifestEntry> changes,
+ Throwable cause) {
String possibleCauses =
String.join(
"\n",
+ "Don't panic!",
"Conflicts during commits are normal and this failure is intended to resolve the conflicts.",
"Conflicts are mainly caused by the following scenarios:",
- "1. Multiple jobs are writing into the same partition at the same time, you can use "
- + "https://paimon.apache.org/docs/master/maintenance/write-performance/#dedicated-compaction-job"
- + " to support multiple writing.",
- "2. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.",
+ "1. Your job is suffering from back-pressuring.",
+ " There are too many snapshots waiting to be committed "
+ + "and an exception occurred during the commit procedure "
+ + "(most probably due to checkpoint timeout).",
+ " See https://paimon.apache.org/docs/master/maintenance/write-performance/ "
+ + "for how to improve writing performance.",
+ "2. Multiple jobs are writing into the same partition at the same time "
+ + "(you'll probably see different base commit user and current commit user below).",
+ " You can use "
+ + "https://paimon.apache.org/docs/master/maintenance/write-performance/#dedicated-compaction-job "
+ + "to support multiple writing.",
+ "3. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.",
" The job will fail continuously in this scenario to protect metadata from corruption.",
" You can either recover from the latest savepoint, "
+ "or you can revert the table to the snapshot corresponding to the old savepoint.");
+ String commitUserString =
+ "Base commit user is: "
+ + baseCommitUser
+ + "; Current commit user is: "
+ + commitUser;
String baseEntriesString =
"Base entries are:\n"
+ baseEntries.stream()
@@ -878,9 +903,12 @@ public class FileStoreCommitImpl implements FileStoreCommit {
+ "\n\n"
+ possibleCauses
+ "\n\n"
+ + commitUserString
+ + "\n\n"
+ baseEntriesString
+ "\n\n"
- + changesString);
+ + changesString,
+ cause);
}
private void cleanUpTmpManifests(
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index be602318f..095a2b638 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -592,7 +592,7 @@ public class FileDeletionTest {
null,
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
- store.snapshotManager().latestSnapshotId(),
+ store.snapshotManager().latestSnapshot(),
null);
}
}