You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/04/12 01:40:45 UTC
(paimon) branch master updated: [core] Retain number in Snapshot expire need guard check (#3203)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8bc4faee4 [core] Retain number in Snapshot expire need guard check (#3203)
8bc4faee4 is described below
commit 8bc4faee40357cf9044bdf971d412ffe09481255
Author: xuzifu666 <12...@qq.com>
AuthorDate: Fri Apr 12 09:40:40 2024 +0800
[core] Retain number in Snapshot expire need guard check (#3203)
---
.../apache/paimon/table/ExpireChangelogImpl.java | 4 ++
.../apache/paimon/table/ExpireSnapshotsImpl.java | 4 ++
.../procedure/ExpireSnapshotsProcedureTest.scala | 53 ++++++++++++++++++++++
3 files changed, 61 insertions(+)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index 138b5f3ae..b237fe630 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.Changelog;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -106,6 +107,9 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
return 0;
}
+ Preconditions.checkArgument(
+ retainMax >= retainMin, "retainMax must greater than retainMin.");
+
// the min snapshot to retain from 'changelog.num-retained.max'
// (the maximum number of snapshots to retain)
long min = Math.max(latestSnapshotId - retainMax + 1, earliestChangelogId);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index d14139ad8..50cf39eb8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -106,6 +107,9 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
return 0;
}
+ Preconditions.checkArgument(
+ retainMax >= retainMin, "retainMax must greater than retainMin.");
+
// the min snapshot to retain from 'snapshot.num-retained.max'
// (the maximum number of snapshots to retain)
long min = Math.max(latestSnapshotId - retainMax + 1, earliest);
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index d08b265ae..6fcb48d68 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -83,4 +83,57 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest {
}
}
}
+
+ test("Paimon Procedure: expire snapshots retainMax retainMin value check") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ // snapshot-2
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // snapshot-3
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+ // expire assert throw exception
+ assertThrows[IllegalArgumentException] {
+ spark.sql(
+ "CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 2, retain_min => 3)")
+ }
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}