You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/04/12 01:40:45 UTC

(paimon) branch master updated: [core] Retain number in Snapshot expire need guard check (#3203)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bc4faee4 [core] Retain number in Snapshot expire need guard check (#3203)
8bc4faee4 is described below

commit 8bc4faee40357cf9044bdf971d412ffe09481255
Author: xuzifu666 <12...@qq.com>
AuthorDate: Fri Apr 12 09:40:40 2024 +0800

    [core] Retain number in Snapshot expire need guard check (#3203)
---
 .../apache/paimon/table/ExpireChangelogImpl.java   |  4 ++
 .../apache/paimon/table/ExpireSnapshotsImpl.java   |  4 ++
 .../procedure/ExpireSnapshotsProcedureTest.scala   | 53 ++++++++++++++++++++++
 3 files changed, 61 insertions(+)

diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index 138b5f3ae..b237fe630 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.Changelog;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -106,6 +107,9 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
             return 0;
         }
 
+        Preconditions.checkArgument(
+                retainMax >= retainMin, "retainMax must greater than retainMin.");
+
         // the min snapshot to retain from 'changelog.num-retained.max'
         // (the maximum number of snapshots to retain)
         long min = Math.max(latestSnapshotId - retainMax + 1, earliestChangelogId);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index d14139ad8..50cf39eb8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -106,6 +107,9 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
             return 0;
         }
 
+        Preconditions.checkArgument(
+                retainMax >= retainMin, "retainMax must greater than retainMin.");
+
         // the min snapshot to retain from 'snapshot.num-retained.max'
         // (the maximum number of snapshots to retain)
         long min = Math.max(latestSnapshotId - retainMax + 1, earliest);
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index d08b265ae..6fcb48d68 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -83,4 +83,57 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
       }
     }
   }
+
+  test("Paimon Procedure: expire snapshots retainMax retainMin value check") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            // snapshot-2
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // snapshot-3
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+            // expire assert throw exception
+            assertThrows[IllegalArgumentException] {
+              spark.sql(
+                "CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 2, retain_min => 3)")
+            }
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }