You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/04 10:42:35 UTC

[incubator-paimon] branch master updated: [core] Add base commit user and cause to commit conflict messages, also add back pressure as a possible cause for commit conflicts (#1487)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 26264b33c [core] Add base commit user and cause to commit conflict messages, also add back pressure as a possible cause for commit conflicts (#1487)
26264b33c is described below

commit 26264b33cd0bc707819325dee52046f8a26cce43
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jul 4 18:42:31 2023 +0800

    [core] Add base commit user and cause to commit conflict messages, also add back pressure as a possible cause for commit conflicts (#1487)
---
 .../paimon/operation/FileStoreCommitImpl.java      | 104 +++++++++++++--------
 .../apache/paimon/operation/FileDeletionTest.java  |   2 +-
 2 files changed, 67 insertions(+), 39 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3eda2bcdb..96e0ceae1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -192,6 +192,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             LOG.debug("Ready to commit\n" + committable.toString());
         }
 
+        Snapshot latestSnapshot = null;
         Long safeLatestSnapshotId = null;
         List<ManifestEntry> baseEntries = new ArrayList<>();
 
@@ -218,15 +219,15 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             // If there are no other jobs committing at the same time,
             // we can skip conflict checking in tryCommit method.
             // This optimization is mainly used to decrease the number of times we read from files.
-            Long latestSnapshotId = snapshotManager.latestSnapshotId();
-            if (latestSnapshotId != null) {
+            latestSnapshot = snapshotManager.latestSnapshot();
+            if (latestSnapshot != null) {
                 // it is possible that some partitions only have compact changes,
                 // so we need to contain all changes
                 baseEntries.addAll(
                         readAllEntriesFromChangedPartitions(
-                                latestSnapshotId, appendTableFiles, compactTableFiles));
-                noConflictsOrFail(baseEntries, appendTableFiles);
-                safeLatestSnapshotId = latestSnapshotId;
+                                latestSnapshot, appendTableFiles, compactTableFiles));
+                noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles);
+                safeLatestSnapshotId = latestSnapshot.id();
             }
 
             tryCommit(
@@ -249,7 +250,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             // This optimization is mainly used to decrease the number of times we read from files.
             if (safeLatestSnapshotId != null) {
                 baseEntries.addAll(appendTableFiles);
-                noConflictsOrFail(baseEntries, compactTableFiles);
+                noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles);
                 // assume this compact commit follows just after the append commit created above
                 safeLatestSnapshotId += 1;
             }
@@ -490,7 +491,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             Snapshot.CommitKind commitKind,
             Long safeLatestSnapshotId) {
         while (true) {
-            Long latestSnapshotId = snapshotManager.latestSnapshotId();
+            Snapshot latestSnapshot = snapshotManager.latestSnapshot();
             if (tryCommitOnce(
                     tableFiles,
                     changelogFiles,
@@ -499,7 +500,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                     watermark,
                     logOffsets,
                     commitKind,
-                    latestSnapshotId,
+                    latestSnapshot,
                     safeLatestSnapshotId)) {
                 break;
             }
@@ -514,13 +515,13 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets) {
         while (true) {
-            Long latestSnapshotId = snapshotManager.latestSnapshotId();
+            Snapshot latestSnapshot = snapshotManager.latestSnapshot();
 
             List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
             List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
-            if (latestSnapshotId != null) {
+            if (latestSnapshot != null) {
                 List<ManifestEntry> currentEntries =
-                        scan.withSnapshot(latestSnapshotId)
+                        scan.withSnapshot(latestSnapshot)
                                 .withPartitionFilter(partitionFilter)
                                 .plan()
                                 .files();
@@ -535,12 +536,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                 }
 
                 // collect index files
-                Snapshot snapshot = snapshotManager.snapshot(latestSnapshotId);
-                if (snapshot.indexManifest() != null) {
+                if (latestSnapshot.indexManifest() != null) {
                     RowDataToObjectArrayConverter converter =
                             new RowDataToObjectArrayConverter(partitionType);
                     List<IndexManifestEntry> entries =
-                            indexManifestFile.read(snapshot.indexManifest());
+                            indexManifestFile.read(latestSnapshot.indexManifest());
                     for (IndexManifestEntry entry : entries) {
                         if (partitionFilter == null
                                 || partitionFilter.test(converter.convert(entry.partition()))) {
@@ -560,7 +560,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                     watermark,
                     logOffsets,
                     Snapshot.CommitKind.OVERWRITE,
-                    latestSnapshotId,
+                    latestSnapshot,
                     null)) {
                 break;
             }
@@ -576,10 +576,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
-            Long latestSnapshotId,
+            Snapshot latestSnapshot,
             Long safeLatestSnapshotId) {
         long newSnapshotId =
-                latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1;
+                latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
         Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
 
         if (LOG.isDebugEnabled()) {
@@ -593,14 +593,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             }
         }
 
-        Snapshot latestSnapshot = null;
-        if (latestSnapshotId != null) {
-            if (!latestSnapshotId.equals(safeLatestSnapshotId)) {
-                // latestSnapshotId is different from the snapshot id we've checked for conflicts,
-                // so we have to check again
-                noConflictsOrFail(latestSnapshotId, tableFiles);
-            }
-            latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+        if (latestSnapshot != null && !Objects.equals(latestSnapshot.id(), safeLatestSnapshotId)) {
+            // latestSnapshotId is different from the snapshot id we've checked for conflicts,
+            // so we have to check again
+            noConflictsOrFail(latestSnapshot.commitUser(), latestSnapshot, tableFiles);
         }
 
         Snapshot newSnapshot;
@@ -777,7 +773,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
 
     @SafeVarargs
     private final List<ManifestEntry> readAllEntriesFromChangedPartitions(
-            long snapshotId, List<ManifestEntry>... changes) {
+            Snapshot snapshot, List<ManifestEntry>... changes) {
         List<BinaryRow> changedPartitions =
                 Arrays.stream(changes)
                         .flatMap(Collection::stream)
@@ -785,7 +781,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                         .distinct()
                         .collect(Collectors.toList());
         try {
-            return scan.withSnapshot(snapshotId)
+            return scan.withSnapshot(snapshot)
                     .withPartitionFilter(changedPartitions)
                     .plan()
                     .files();
@@ -794,11 +790,16 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
     }
 
-    private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
-        noConflictsOrFail(readAllEntriesFromChangedPartitions(snapshotId, changes), changes);
+    private void noConflictsOrFail(
+            String baseCommitUser, Snapshot latestSnapshot, List<ManifestEntry> changes) {
+        noConflictsOrFail(
+                baseCommitUser,
+                readAllEntriesFromChangedPartitions(latestSnapshot, changes),
+                changes);
     }
 
-    private void noConflictsOrFail(List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
+    private void noConflictsOrFail(
+            String baseCommitUser, List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
         List<ManifestEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(changes);
 
@@ -810,7 +811,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         } catch (Throwable e) {
             LOG.warn("File deletion conflicts detected! Give up committing.", e);
             throw createConflictException(
-                    "File deletion conflicts detected! Give up committing.", baseEntries, changes);
+                    "File deletion conflicts detected! Give up committing.",
+                    baseCommitUser,
+                    baseEntries,
+                    changes,
+                    e);
         }
 
         // fast exit for file store without keys
@@ -842,27 +847,47 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                                     + a.identifier().toString(pathFactory)
                                     + "\n"
                                     + b.identifier().toString(pathFactory),
+                            baseCommitUser,
                             baseEntries,
-                            changes);
+                            changes,
+                            null);
                 }
             }
         }
     }
 
     private RuntimeException createConflictException(
-            String message, List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
+            String message,
+            String baseCommitUser,
+            List<ManifestEntry> baseEntries,
+            List<ManifestEntry> changes,
+            Throwable cause) {
         String possibleCauses =
                 String.join(
                         "\n",
+                        "Don't panic!",
                         "Conflicts during commits are normal and this failure is intended to resolve the conflicts.",
                         "Conflicts are mainly caused by the following scenarios:",
-                        "1. Multiple jobs are writing into the same partition at the same time, you can use "
-                                + "https://paimon.apache.org/docs/master/maintenance/write-performance/#dedicated-compaction-job"
-                                + " to support multiple writing.",
-                        "2. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.",
+                        "1. Your job is suffering from back-pressuring.",
+                        "   There are too many snapshots waiting to be committed "
+                                + "and an exception occurred during the commit procedure "
+                                + "(most probably due to checkpoint timeout).",
+                        "   See https://paimon.apache.org/docs/master/maintenance/write-performance/ "
+                                + "for how to improve writing performance.",
+                        "2. Multiple jobs are writing into the same partition at the same time "
+                                + "(you'll probably see different base commit user and current commit user below).",
+                        "   You can use "
+                                + "https://paimon.apache.org/docs/master/maintenance/write-performance/#dedicated-compaction-job "
+                                + "to support multiple writing.",
+                        "3. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.",
                         "   The job will fail continuously in this scenario to protect metadata from corruption.",
                         "   You can either recover from the latest savepoint, "
                                 + "or you can revert the table to the snapshot corresponding to the old savepoint.");
+        String commitUserString =
+                "Base commit user is: "
+                        + baseCommitUser
+                        + "; Current commit user is: "
+                        + commitUser;
         String baseEntriesString =
                 "Base entries are:\n"
                         + baseEntries.stream()
@@ -878,9 +903,12 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                         + "\n\n"
                         + possibleCauses
                         + "\n\n"
+                        + commitUserString
+                        + "\n\n"
                         + baseEntriesString
                         + "\n\n"
-                        + changesString);
+                        + changesString,
+                cause);
     }
 
     private void cleanUpTmpManifests(
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index be602318f..095a2b638 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -592,7 +592,7 @@ public class FileDeletionTest {
                         null,
                         Collections.emptyMap(),
                         Snapshot.CommitKind.APPEND,
-                        store.snapshotManager().latestSnapshotId(),
+                        store.snapshotManager().latestSnapshot(),
                         null);
     }
 }