You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/25 08:05:54 UTC

[incubator-paimon] branch release-0.4 updated (608b32b13 -> 81d35563b)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


    from 608b32b13 [flink] Support 'substring' for cdc computed column (#1140)
     new 66c2a5fa8 [doc] Updated use case bullet point (#1150)
     new 64a5f51ed [flink] support system function truncate for computed column in CDC (#1148)
     new 14693ac15 [core] Spark writer support kryo serialization (#1149)
     new b84203955 [doc][flink] add user doc for CDC computed column function `truncate`. (#1157)
     new 36eb45fb0 [bug] Fixed manifest list naming issue. (#1163)
     new 03a306853 [doc] Add document page for operations on files (#1158)
     new 91a466780 [doc] Improve doc of cdc ingestion (#1138)
     new cb8ea51fd [flink] Introduce paimon-flink-action jar to execute actions (#1169)
     new 7a9844778 [doc] Introduce how the read.batch-size option can impact memory consumption when compaction in write-performance page (#1175)
     new 5f0127405 [license] Remove maven wrapper files and add Path to license (#1178)
     new d081ec883 [license] Notice for flink-cdc-connectors and doris
     new be3dd40af [flink] Replace org.apache.flink.util.function.SerializableFunction by Paimon's implementation. (#1183)
     new edaec472b [license] Fix compile and doris license
     new 806c2aca0 [hotfix][doc] update some unclear phrases (#1187)
     new d81f2ec3e [test] Increase timeout in MySqlSyncTableActionITCase.testAllTypes
     new 712c2ff1e [doc] Fix Typos in README and docs.README (#1202)
     new 82019b6fb [doc] Fix sql syntax in append-only-table(#1209)
     new 70630ae36 [ci] Remove sonar check (#1206)
     new e3c074c9b [hotfix][doc] fix typos and improve phrases of some content. (#1201)
     new 81d35563b [flink] Should pass FlinkFileIOLoader when creating catalog in flink actions (#1218)

The 20 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/check-licensing.yml              |   4 +-
 .github/workflows/code-analysys.yml                |  43 ---
 .github/workflows/e2e-tests-1.14-jdk11.yml         |   4 +-
 .github/workflows/e2e-tests-1.14.yml               |   4 +-
 .github/workflows/e2e-tests-1.15-jdk11.yml         |   4 +-
 .github/workflows/e2e-tests-1.15.yml               |   4 +-
 .github/workflows/e2e-tests-1.16-jdk11.yml         |   4 +-
 .github/workflows/e2e-tests-1.16.yml               |   4 +-
 .github/workflows/e2e-tests-1.17-jdk11.yml         |   4 +-
 .github/workflows/e2e-tests-1.17.yml               |   4 +-
 .github/workflows/publish_snapshot.yml             |   4 +-
 .github/workflows/unitcase-flink-jdk11.yml         |   4 +-
 .github/workflows/unitcase-jdk11.yml               |   4 +-
 .github/workflows/utitcase-flink.yml               |   4 +-
 .github/workflows/utitcase.yml                     |   4 +-
 .gitignore                                         |   3 +-
 .mvn/wrapper/maven-wrapper.properties              |  18 -
 LICENSE                                            |   3 +
 NOTICE                                             |  10 +
 README.md                                          |   3 +-
 docs/content/concepts/append-only-table.md         |   2 +-
 docs/content/concepts/file-operations.md           | 381 +++++++++++++++++++++
 docs/content/engines/flink.md                      |   6 +-
 docs/content/how-to/cdc-ingestion.md               |  47 +--
 docs/content/how-to/writing-tables.md              |  40 +--
 docs/content/maintenance/rescale-bucket.md         |   2 +-
 docs/content/maintenance/write-performance.md      |  10 +-
 .../shortcodes/generated/mysql_sync_database.html  |  51 +++
 .../shortcodes/generated/mysql_sync_table.html     |  47 +++
 docs/static/img/cdc-ingestion-commit.png           | Bin 0 -> 668838 bytes
 docs/static/img/cdc-ingestion-compact.png          | Bin 0 -> 1127761 bytes
 docs/static/img/cdc-ingestion-source.png           | Bin 0 -> 411986 bytes
 docs/static/img/cdc-ingestion-topology.png         | Bin 0 -> 382727 bytes
 docs/static/img/cdc-ingestion-write.png            | Bin 0 -> 601989 bytes
 docs/static/img/file-operations-0.png              | Bin 0 -> 347962 bytes
 docs/static/img/file-operations-1.png              | Bin 0 -> 543256 bytes
 docs/static/img/file-operations-2.png              | Bin 0 -> 741038 bytes
 docs/static/img/file-operations-3.png              | Bin 0 -> 856310 bytes
 docs/static/img/file-operations-4.png              | Bin 0 -> 690946 bytes
 mvnw                                               | 316 -----------------
 mvnw.cmd                                           | 188 ----------
 .../org/apache/paimon/manifest/ManifestList.java   |   2 +-
 .../apache/paimon/manifest/ManifestListTest.java   |  10 +
 paimon-docs/README.md                              |   2 +-
 paimon-e2e-tests/pom.xml                           |  17 +
 .../apache/paimon/tests/FlinkActionsE2eTest.java   |  20 +-
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      |   8 +-
 .../tests/cdc/MySqlComputedColumnE2ETest.java      |   4 +-
 .../paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java  |   4 +-
 .../test/resources-filtered/docker-compose.yaml    |   8 +-
 .../paimon-flink-action/README.md                  |  16 +-
 .../pom.xml                                        |  21 +-
 .../apache/paimon/flink/action/FlinkActions.java   |   8 -
 .../apache/paimon/flink/FlinkCatalogFactory.java   |   9 +
 .../org/apache/paimon/flink/action/Action.java     |   2 +-
 .../org/apache/paimon/flink/action/ActionBase.java |   9 +-
 .../apache/paimon/flink/action/CompactAction.java  |   5 +-
 .../apache/paimon/flink/action/FlinkActions.java   |  15 +-
 .../paimon/flink/action/cdc/mysql/Expression.java  | 102 +++++-
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |  13 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |   9 +-
 .../action/cdc/mysql/MySqlSyncTableAction.java     |   9 +-
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  13 +-
 .../paimon/flink/sink/CommitterOperator.java       |   2 +-
 .../apache/paimon/flink/sink/CompactorSink.java    |   2 +-
 .../apache/paimon/flink/sink/FileStoreSink.java    |   2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   2 +-
 .../apache/paimon/flink/sink/cdc/FlinkCdcSink.java |   2 +-
 .../ChangelogWithKeyFileStoreTableITCase.java      |   8 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  15 +-
 .../action/cdc/mysql/TruncateComputerTest.java     | 140 ++++++++
 paimon-flink/pom.xml                               |   1 +
 .../java/org/apache/paimon/spark/SparkWrite.java   |  40 ++-
 .../org/apache/paimon/spark/SparkWriteITCase.java  |  10 +-
 .../paimon/spark/SparkWriteWIthKyroITCase.java     |  44 +++
 pom.xml                                            |  49 ---
 tools/ci/sonar_check.sh                            |   2 +-
 77 files changed, 996 insertions(+), 855 deletions(-)
 delete mode 100644 .github/workflows/code-analysys.yml
 delete mode 100644 .mvn/wrapper/maven-wrapper.properties
 create mode 100644 docs/content/concepts/file-operations.md
 create mode 100644 docs/layouts/shortcodes/generated/mysql_sync_database.html
 create mode 100644 docs/layouts/shortcodes/generated/mysql_sync_table.html
 create mode 100644 docs/static/img/cdc-ingestion-commit.png
 create mode 100644 docs/static/img/cdc-ingestion-compact.png
 create mode 100644 docs/static/img/cdc-ingestion-source.png
 create mode 100644 docs/static/img/cdc-ingestion-topology.png
 create mode 100644 docs/static/img/cdc-ingestion-write.png
 create mode 100644 docs/static/img/file-operations-0.png
 create mode 100644 docs/static/img/file-operations-1.png
 create mode 100644 docs/static/img/file-operations-2.png
 create mode 100644 docs/static/img/file-operations-3.png
 create mode 100644 docs/static/img/file-operations-4.png
 delete mode 100755 mvnw
 delete mode 100644 mvnw.cmd
 copy docs/content/versions.md => paimon-flink/paimon-flink-action/README.md (63%)
 copy paimon-flink/{paimon-flink-1.16 => paimon-flink-action}/pom.xml (79%)
 copy paimon-flink/{paimon-flink-common => paimon-flink-action}/src/main/java/org/apache/paimon/flink/action/FlinkActions.java (81%)
 create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java
 create mode 100644 paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWIthKyroITCase.java


[incubator-paimon] 07/20: [doc] Improve doc of cdc ingestion (#1138)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 91a4667804a708703f1bfb66bdeb948e78d68d5b
Author: s7monk <34...@users.noreply.github.com>
AuthorDate: Wed May 17 14:22:45 2023 +0800

    [doc] Improve doc of cdc ingestion (#1138)
---
 docs/content/how-to/cdc-ingestion.md               | 39 ++++-------------
 .../shortcodes/generated/mysql_sync_database.html  | 51 ++++++++++++++++++++++
 .../shortcodes/generated/mysql_sync_table.html     | 47 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 31 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md
index a905d144d..9f425aae1 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -30,6 +30,12 @@ Paimon supports synchronizing changes from different databases using change data
 
 ## MySQL
 
+### Prepare CDC Bundled Jar
+
+```
+flink-sql-connector-mysql-cdc-*.jar
+```
+
 ### Synchronizing Tables
 
 By using [MySqlSyncTableAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction) in a Flink DataStream job or directly through `flink run`, users can synchronize one or multiple tables from MySQL into one Paimon table.
@@ -52,22 +58,7 @@ To use this feature through `flink run`, run the following shell command.
     [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
 ```
 
-* `--warehouse` is the path to Paimon warehouse.
-* `--database` is the database name in Paimon catalog.
-* `--table` is the Paimon table name.
-* `--partition-keys` are the partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example `dt,hh,mm`.
-* `--primary-keys` are the primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example `buyer_id,seller_id`.
-* `--computed-column` are the definitions of computed columns. The argument field is from MySQL table field name. Supported expressions are:
-  * year(date-column): Extract year from a DATE, DATETIME or TIMESTAMP. Output is an INT value represent the year.
-  * substring(column,beginInclusive): Get column.substring(beginInclusive). Output is a STRING.
-  * substring(column,beginInclusive,endExclusive): Get column.substring(beginInclusive,endExclusive). Output is a STRING.
-  * truncate(column,width): truncate column by width. Output type is same with column.
-    * If the column is a STRING, truncate(column,width) will truncate the string to width characters, namely `value.substring(0, width)`.
-    * If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive.
-    * If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.
-* `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password`, `database-name` and `table-name` are required configurations, others are optional. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options) for a complete list of configurations.
-* `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations.
-* `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations. 
+{{< generated/mysql_sync_table >}}
 
 If the Paimon table you specify does not exist, this action will automatically create the table. Its schema will be derived from all specified MySQL tables. If the Paimon table already exists, its schema will be compared against the schema of all specified MySQL tables.
 
@@ -132,21 +123,7 @@ To use this feature through `flink run`, run the following shell command.
     [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
 ```
 
-* `--warehouse` is the path to Paimon warehouse.
-* `--database` is the database name in Paimon catalog.
-* `--ignore-incompatible` is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible, 
-an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.
-* `--table-prefix` is the prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables 
-to have "ods_" as prefix, you can specify `--table-prefix ods_`.
-* `--table-suffix` is the suffix of all Paimon tables to be synchronized. The usage is same as `--table-prefix`.
-* `--including-tables` is used to specify which source tables are to be synchronized. You must use '|' to separate multiple
-tables. Regular expression is supported, for example, specifying `--including-tables test|paimon.*` means to synchronize
-table 'test' and all tables start with 'paimon'.
-* `--excluding-tables` is used to specify which source tables are not to be synchronized. The usage is same as `--including-tables`.
-`--excluding-tables` has higher priority than `--including-tables` if you specified both.
-* `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password` and `database-name` are required configurations, others are optional. Note that `database-name` should be the exact name of the MySQL databse you want to synchronize. It can't be a regular expression. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connecto [...]
-* `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations.
-* `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. All Paimon sink table will be applied the same set of configurations. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations.
+{{< generated/mysql_sync_database >}}
 
 Only tables with primary keys will be synchronized.
 
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html
new file mode 100644
index 000000000..c57ca35d2
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -0,0 +1,51 @@
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--ignore-incompatible</h5></td>
+        <td>It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.</td>
+    </tr>
+    <tr>
+        <td><h5>--table-prefix</h5></td>
+        <td>The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table-prefix ods_".</td>
+    </tr>
+    <tr>
+        <td><h5>--table-suffix</h5></td>
+        <td>The suffix of all Paimon tables to be synchronized. The usage is same as "--table-prefix".</td>
+    </tr>
+    <tr>
+        <td><h5>--including-tables</h5></td>
+        <td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
+    </tr>
+    <tr>
+        <td><h5>--excluding-tables</h5></td>
+        <td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including-tables". "--excluding-tables" has higher priority than "--including-tables" if you specified both.</td>
+    </tr>
+    <tr>
+        <td><h5>--mysql-conf</h5></td>
+        <td>The configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a> for a complete list of configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--catalog-conf</h5></td>
+        <td>The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of catalog configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--table-conf</h5></td>
+        <td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html b/docs/layouts/shortcodes/generated/mysql_sync_table.html
new file mode 100644
index 000000000..12ff50f61
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -0,0 +1,47 @@
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--warehouse</h5></td>
+        <td>The path to Paimon warehouse.</td>
+    </tr>
+    <tr>
+        <td><h5>--database</h5></td>
+        <td>The database name in Paimon catalog.</td>
+    </tr>
+    <tr>
+        <td><h5>--table</h5></td>
+        <td>The Paimon table name.</td>
+    </tr>
+    <tr>
+        <td><h5>--partition-keys</h5></td>
+        <td>The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm".</td>
+    </tr>
+    <tr>
+        <td><h5>--primary-keys</h5></td>
+        <td>The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id".</td>
+    </tr>
+    <tr>
+        <td><h5>--computed-column</h5></td>
+        <td>The definitions of computed columns. The argument field is from MySQL table field name. <br /><br />Supported expressions are: <ul><li>year(date-column): Extract year from a DATE, DATETIME or TIMESTAMP. Output is an INT value represent the year.</li><li>substring(column,beginInclusive): Get column.substring(beginInclusive). Output is a STRING.</li><li>substring(column,beginInclusive,endExclusive): Get column.substring(beginInclusive,endExclusive). Output is a STRING.</li><li> [...]
+    </tr>
+    <tr>
+        <td><h5>--mysql-conf</h5></td>
+        <td>The configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a> for a complete list of configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--catalog-conf</h5></td>
+        <td>The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of catalog configurations.</td>
+    </tr>
+    <tr>
+        <td><h5>--table-conf</h5></td>
+        <td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file


[incubator-paimon] 16/20: [doc] Fix Typos in README and docs.README (#1202)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 712c2ff1e33c472998cb183cdcf531589156ab08
Author: minseok <99...@users.noreply.github.com>
AuthorDate: Tue May 23 10:59:41 2023 +0900

    [doc] Fix Typos in README and docs.README (#1202)
---
 README.md             | 2 +-
 paimon-docs/README.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index 9f98be940..802b51e4e 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
 
 Paimon is a streaming data lake platform that supports high-speed data ingestion, change data tracking and efficient real-time analytics.
 
-Background and documentation is available at https://paimon.apache.org
+Background and documentation are available at https://paimon.apache.org
 
 Paimon's former name was Flink Table Store, developed from the Flink community. The architecture refers to some design concepts of Iceberg.
 Thanks to Apache Flink and Apache Iceberg.
diff --git a/paimon-docs/README.md b/paimon-docs/README.md
index 4769275d4..4c17b705c 100644
--- a/paimon-docs/README.md
+++ b/paimon-docs/README.md
@@ -30,7 +30,7 @@ To integrate an `*Options` class from another package, add another module-packag
 
 The files can be generated by running `mvn package -Pgenerate-docs -pl paimon-docs -nsu -DskipTests`, and can be integrated into the documentation using `{{ include generated/<file-name> >}}`.
 
-**NOTE:** You need to make sure that the changed jar has been installed to the local maven repository.
+**NOTE:** You need to make sure that the changed jar has been installed in the local maven repository.
 
 The documentation must be regenerated whenever
 * an `*Options` class was added or removed


[incubator-paimon] 17/20: [doc] Fix sql syntax in append-only-table(#1209)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 82019b6fbed6b6a0ad8ded5ecced6d36ac9f87ee
Author: sqdgtq <49...@users.noreply.github.com>
AuthorDate: Tue May 23 17:15:34 2023 +0800

    [doc] Fix sql syntax in append-only-table(#1209)
---
 docs/content/concepts/append-only-table.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/content/concepts/append-only-table.md b/docs/content/concepts/append-only-table.md
index 2e483acee..fd6ea6d82 100644
--- a/docs/content/concepts/append-only-table.md
+++ b/docs/content/concepts/append-only-table.md
@@ -107,7 +107,7 @@ CREATE TABLE T (
 ) WITH (...);
 
 -- launch a bounded streaming job to read paimon_table
-SELECT window_start, window_end, SUM(f0) FROM
+SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
  TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;
 ```
 


[incubator-paimon] 19/20: [hotfix][doc] fix typos and improve phrases of some content. (#1201)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit e3c074c9b8481e702a929189d437024a38567334
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed May 24 11:32:02 2023 +0800

    [hotfix][doc] fix typos and improve phrases of some content. (#1201)
---
 docs/content/concepts/file-operations.md | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/docs/content/concepts/file-operations.md b/docs/content/concepts/file-operations.md
index e0387ebd6..49f523565 100644
--- a/docs/content/concepts/file-operations.md
+++ b/docs/content/concepts/file-operations.md
@@ -84,11 +84,10 @@ Run the following insert statement in Flink SQL:
 INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');
 ```
 
-After the Flink job finishes, the records are written into Paimon table, which 
-is done by a successful `commit`. The records are visible to user
-as can be verified by `SELECT * FROM T` which return a single row. 
-The commit creates a snapshot under path `/tmp/paimon/default.db/T/snapshot/snapshot-1`. 
-The resulting file layout as of snapshot-1 is as follows:
+Once the Flink job is completed, the records are written to the Paimon table through a successful `commit`.
+Users can verify the visibility of these records by executing the query `SELECT * FROM T` which will return a single row. 
+The commit process creates a snapshot located at the path `/tmp/paimon/default.db/T/snapshot/snapshot-1`. 
+The resulting file layout at snapshot-1 is as described below:
 
 {{< img src="/img/file-operations-0.png">}}
 
@@ -133,7 +132,7 @@ baseManifestList (manifest-list-1-base in the above graph), which is effectively
 `manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1` is the 
 deltaManifestList (manifest-list-1-delta in the above graph), which 
 contains a list of manifest entries that perform operations on data 
-files, which, in this case, is `manifest-B-0`.
+files, which, in this case, is `manifest-1-0`.
 
 
 Now let's insert a batch of records across different partitions and 
@@ -310,9 +309,9 @@ has actually used this schema yet until the next commit.
 Remind that the marked data files are not truly deleted until the snapshot expires and 
 no consumer depends on the snapshot. For more information, see [Expiring Snapshots]({{< ref "maintenance/expiring-snapshots" >}}).
 
-During snapshot expire, the range of snapshots are determined first and then marked data files in these snapshots will be 
-deleted. A data file is `marked` only when there's a manifest entry of kind `DELETE` pointing to that data file, so that it 
-will not be used by next snapshots and can be safely deleted.
+During the process of snapshot expiration, the range of snapshots is initially determined, and then data files within these snapshots are marked for deletion. 
+A data file is `marked` for deletion only when there is a manifest entry of kind `DELETE` that references that specific data file. 
+This marking ensures that the file will not be utilized by subsequent snapshots and can be safely removed.
 
 
 Let's say all 4 snapshots in the above diagram are about to expire. The expire process is as follows:


[incubator-paimon] 18/20: [ci] Remove sonar check (#1206)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 70630ae366fe128fccd497009865d4f23a3b0f53
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed May 24 10:21:20 2023 +0800

    [ci] Remove sonar check (#1206)
---
 .github/workflows/code-analysys.yml | 43 --------------------------------
 .gitignore                          |  3 +--
 README.md                           |  1 -
 pom.xml                             | 49 -------------------------------------
 4 files changed, 1 insertion(+), 95 deletions(-)

diff --git a/.github/workflows/code-analysys.yml b/.github/workflows/code-analysys.yml
deleted file mode 100644
index d00c4715f..000000000
--- a/.github/workflows/code-analysys.yml
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: SonarCloud
-on:
-  schedule:
-    - cron: '0 1 * * *'
-  workflow_dispatch:
-
-concurrency:
-  group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
-  cancel-in-progress: true
-
-jobs:
-  build:
-    runs-on: ubuntu-latest
-    timeout-minutes: 120
-    steps:
-      - uses: actions/checkout@v2
-        with:
-          submodules: true
-      - name: Set up JDK 11
-        uses: actions/setup-java@v2
-        with:
-          java-version: 11
-          distribution: 'adopt'
-      - name: Run SonarCloud Analysis
-        run: bash ./tools/ci/sonar_check.sh
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-          SONAR_TOKEN: ${{ secrets.SONARCLOUD_TOKEN }}
diff --git a/.gitignore b/.gitignore
index c1f00b464..d2e9c21ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,5 +16,4 @@ target
 .DS_Store
 *.ipr
 *.iws
-test/coverage
-dependency-reduced-pom.xml
\ No newline at end of file
+dependency-reduced-pom.xml
diff --git a/README.md b/README.md
index 802b51e4e..cf913bf86 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,6 @@
 
 [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
 [![Get on Slack](https://img.shields.io/badge/slack-join-orange.svg)](https://the-asf.slack.com/archives/C053Q2NCW8G)
-[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=apache_incubator-paimon&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=apache_incubator-paimon)
 
 Paimon is a streaming data lake platform that supports high-speed data ingestion, change data tracking and efficient real-time analytics.
 
diff --git a/pom.xml b/pom.xml
index 2f0154063..dbddf37d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,6 @@ under the License.
         <json-smart.version>2.4.9</json-smart.version>
         <avro.version>1.11.1</avro.version>
         <kafka.version>3.2.3</kafka.version>
-        <jacoco.version>0.8.8</jacoco.version>
     </properties>
 
     <dependencies>
@@ -189,14 +188,6 @@ under the License.
             <version>${awaitility.version}</version>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.jacoco</groupId>
-            <artifactId>org.jacoco.agent</artifactId>
-            <version>${jacoco.version}</version>
-            <classifier>runtime</classifier>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <dependencyManagement>
@@ -434,7 +425,6 @@ under the License.
                         <test.randomization.seed>${test.randomization.seed}</test.randomization.seed>
                         <test.flink.main.version>${test.flink.main.version}</test.flink.main.version>
                         <test.flink.version>${test.flink.version}</test.flink.version>
-                        <jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
                     </systemPropertyVariables>
                     <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC</argLine>
                 </configuration>
@@ -689,45 +679,6 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
-            <!-- add plugin jacoco report -->
-            <plugin>
-                <groupId>org.jacoco</groupId>
-                <artifactId>jacoco-maven-plugin</artifactId>
-                <version>${jacoco.version}</version>
-                <configuration>
-                    <skip>false</skip>
-                    <dataFile>${project.build.directory}/jacoco.exec</dataFile>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>default-prepare-agent</id>
-                        <goals>
-                            <goal>prepare-agent</goal>
-                        </goals>
-                        <configuration>
-                            <propertyName>surefireArgLine</propertyName>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>default-instrument</id>
-                        <goals>
-                            <goal>instrument</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>default-restore-instrumented-classes</id>
-                        <goals>
-                            <goal>restore-instrumented-classes</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>default-report</id>
-                        <goals>
-                            <goal>report</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
 
         <pluginManagement>


[incubator-paimon] 03/20: [core] Spark writer support kryo serialization (#1149)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 14693ac15054c989a69aee2738d6cf38a946c17e
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Mon May 15 22:04:20 2023 +0800

    [core] Spark writer support kryo serialization (#1149)
---
 .../java/org/apache/paimon/spark/SparkWrite.java   | 40 +++++++++++++++++---
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 10 +++--
 .../paimon/spark/SparkWriteWIthKyroITCase.java     | 44 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 9 deletions(-)

diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkWrite.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkWrite.java
index 611d9ed48..ca3bfe7a2 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkWrite.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.sink.InnerTableCommit;
 
 import org.apache.spark.api.java.function.Function;
@@ -32,8 +33,11 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.connector.write.V1Write;
 import org.apache.spark.sql.sources.InsertableRelation;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Spark {@link V1Write}, it is required to use v1 write for grouping by bucket. */
 public class SparkWrite implements V1Write {
@@ -41,6 +45,8 @@ public class SparkWrite implements V1Write {
     private final Table table;
     private final Lock.Factory lockFactory;
 
+    private final CommitMessageSerializer serializer = new CommitMessageSerializer();
+
     public SparkWrite(Table table, Lock.Factory lockFactory) {
         this.table = table;
         this.lockFactory = lockFactory;
@@ -55,11 +61,11 @@ public class SparkWrite implements V1Write {
 
             BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
             List<CommitMessage> committables =
-                    data.toJavaRDD()
-                            .groupBy(new ComputeBucket(writeBuilder))
-                            .mapValues(new WriteRecords(writeBuilder))
-                            .values()
-                            .reduce(new ListConcat<>());
+                    data.toJavaRDD().groupBy(new ComputeBucket(writeBuilder))
+                            .mapValues(new WriteRecords(writeBuilder)).values()
+                            .mapPartitions(SparkWrite::serializeCommitMessages).collect().stream()
+                            .map(this::deserializeCommitMessage)
+                            .collect(Collectors.toList());
             try (BatchTableCommit tableCommit =
                     ((InnerTableCommit) writeBuilder.newCommit()).withLock(lockFactory.create())) {
                 tableCommit.commit(committables);
@@ -69,6 +75,30 @@ public class SparkWrite implements V1Write {
         };
     }
 
+    private static Iterator<byte[]> serializeCommitMessages(Iterator<List<CommitMessage>> iters) {
+        List<byte[]> serialized = new ArrayList<>();
+        CommitMessageSerializer innerSerializer = new CommitMessageSerializer();
+        while (iters.hasNext()) {
+            List<CommitMessage> commitMessages = iters.next();
+            for (CommitMessage commitMessage : commitMessages) {
+                try {
+                    serialized.add(innerSerializer.serialize(commitMessage));
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to serialize CommitMessage's object", e);
+                }
+            }
+        }
+        return serialized.iterator();
+    }
+
+    private CommitMessage deserializeCommitMessage(byte[] bytes) {
+        try {
+            return serializer.deserialize(serializer.getVersion(), bytes);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to deserialize CommitMessage's object", e);
+        }
+    }
+
     private static class ComputeBucket implements Function<Row, Integer> {
 
         private final BatchWriteBuilder writeBuilder;
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index e934a5d16..b9df2bec9 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Comparator;
@@ -33,13 +34,14 @@ import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** ITCase for spark reader. */
+/** ITCase for spark writer. */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class SparkWriteITCase {
 
-    private static SparkSession spark = null;
+    protected SparkSession spark = null;
 
     @BeforeAll
-    public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
+    public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
         Path warehousePath = new Path("file:" + tempDir.toString());
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
@@ -49,7 +51,7 @@ public class SparkWriteITCase {
     }
 
     @AfterAll
-    public static void stopMetastoreAndSpark() {
+    public void stopMetastoreAndSpark() {
         if (spark != null) {
             spark.stop();
             spark = null;
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWIthKyroITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWIthKyroITCase.java
new file mode 100644
index 000000000..7fa703bb3
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWIthKyroITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.fs.Path;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+/** ITCase for spark writer with kryo serializer. */
+public class SparkWriteWIthKyroITCase extends SparkWriteITCase {
+
+    @BeforeAll
+    @Override
+    public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        spark =
+                SparkSession.builder()
+                        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+                        .master("local[2]")
+                        .getOrCreate();
+        spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
+        spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString());
+        spark.sql("CREATE DATABASE paimon.db");
+        spark.sql("USE paimon.db");
+    }
+}


[incubator-paimon] 13/20: [license] Fix compile and doris license

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit edaec472bb2b4b101b4baefa1be8653f0a68a647
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon May 22 10:19:59 2023 +0800

    [license] Fix compile and doris license
---
 NOTICE                                                                | 2 +-
 .../paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/NOTICE b/NOTICE
index 7516802b4..e10f3b594 100644
--- a/NOTICE
+++ b/NOTICE
@@ -23,5 +23,5 @@ flink-cdc-connectors
 Copyright 2022 Ververica Inc.
 Apache Flink, FlinkĀ®, ApacheĀ®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.
 
-Apache Doris
+Flink Connector for Apache Doris
 Copyright 2018-2022 The Apache Software Foundation
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 7c831f066..8e0f15a72 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 
-/* This file is based on source code from JsonDebeziumSchemaSerializer in the Doris Project
- * (https://doris.apache.org/), licensed by the Apache Software Foundation (ASF) under the
+/* This file is based on source code from JsonDebeziumSchemaSerializer in the doris-flink-connector
+ * (https://github.com/apache/doris-flink-connector/), licensed by the Apache Software Foundation (ASF) under the
  * Apache License, Version 2.0. See the NOTICE file distributed with this work for additional
  *  information regarding copyright ownership. */
 


[incubator-paimon] 12/20: [flink] Replace org.apache.flink.util.function.SerializableFunction by Paimon's implementation. (#1183)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit be3dd40af50061678994da249c04e40891fb7d57
Author: Weijie Guo <re...@163.com>
AuthorDate: Sun May 21 20:56:56 2023 +0800

    [flink] Replace org.apache.flink.util.function.SerializableFunction by Paimon's implementation. (#1183)
---
 .../src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java   | 2 +-
 .../src/main/java/org/apache/paimon/flink/sink/CompactorSink.java       | 2 +-
 .../src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java       | 2 +-
 .../src/main/java/org/apache/paimon/flink/sink/FlinkSink.java           | 2 +-
 .../src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java    | 2 +-
 5 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 37c34c101..e30867b54 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -18,6 +18,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -27,7 +28,6 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.function.SerializableFunction;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index 45975f219..c41bc0b43 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -20,10 +20,10 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.function.SerializableFunction;
 
 /** {@link FlinkSink} for dedicated compact jobs. */
 public class CompactorSink extends FlinkSink<RowData> {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index cd2ff68d1..777c25dc4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.function.SerializableFunction;
 
 import javax.annotation.Nullable;
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d0a456d26..f1436cb47 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
@@ -36,7 +37,6 @@ import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.function.SerializableFunction;
 
 import java.io.Serializable;
 import java.util.UUID;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 1deda066e..43f7efee6 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -29,9 +29,9 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.function.SerializableFunction;
 
 /**
  * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema change if necessary.


[incubator-paimon] 14/20: [hotfix][doc] update some unclear phrases (#1187)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 806c2aca0789acdd650156591562bb408f6f1119
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Mon May 22 11:20:02 2023 +0800

    [hotfix][doc] update some unclear phrases (#1187)
---
 docs/content/engines/flink.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 4d19d4351..bd00fdd59 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -88,7 +88,7 @@ cp paimon-flink-*.jar <FLINK_HOME>/lib/
 **Step 3: Copy Hadoop Bundled Jar**
 
 {{< hint info >}}
-If the machine is in a hadoop environment, please ensure the value of the environment variable `HADOOP_CLASSPATH`, you do not need to use the following pre-bundled Hadoop jar.
+If the machine is in a hadoop environment, please ensure the value of the environment variable `HADOOP_CLASSPATH` include path to the common Hadoop libraries, you do not need to use the following pre-bundled Hadoop jar.
 {{< /hint >}}
 
 [Download](https://flink.apache.org/downloads.html) Pre-bundled Hadoop jar and copy the jar file to the `lib` directory of your Flink home.
@@ -124,7 +124,7 @@ You can now start Flink SQL client to execute SQL scripts.
 
 ```sql
 -- if you're trying out Paimon in a distributed environment,
--- warehouse path should be set to a shared file system, such as HDFS or OSS
+-- the warehouse path should be set to a shared file system, such as HDFS or OSS
 CREATE CATALOG my_catalog WITH (
     'type'='paimon',
     'warehouse'='file:/tmp/paimon'


[incubator-paimon] 09/20: [doc] Introduce how the read.batch-size option can impact memory consumption when compaction in write-performance page (#1175)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 7a98447789a6499b767e609e0ac8b7882e747fed
Author: wgcn <10...@qq.com>
AuthorDate: Fri May 19 14:53:08 2023 +0800

    [doc] Introduce how the read.batch-size option can impact memory consumption when compaction in write-performance page (#1175)
---
 docs/content/maintenance/write-performance.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md
index 3dafff492..86c92e959 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -213,4 +213,5 @@ There are three main places in Paimon writer that takes up memory:
 
 * Writer's memory buffer, shared and preempted by all writers of a single task. This memory value can be adjusted by the `write-buffer-size` table property.
 * Memory consumed when merging several sorted runs for compaction. Can be adjusted by the `num-sorted-run.compaction-trigger` option to change the number of sorted runs to be merged.
+* If the row is very large, reading too many lines of data at once can consume a lot of memory when making a compaction. Reducing the `read.batch-size` option can alleviate the impact of this case.
 * The memory consumed by writing columnar (ORC, Parquet, etc.) file, which is not adjustable.


[incubator-paimon] 20/20: [flink] Should pass FlinkFileIOLoader when creating catalog in flink actions (#1218)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 81d35563b7bdcaba764373afe1bf42447604e814
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Wed May 24 17:37:03 2023 +0800

    [flink] Should pass FlinkFileIOLoader when creating catalog in flink actions (#1218)
---
 .../main/java/org/apache/paimon/flink/FlinkCatalogFactory.java   | 9 +++++++++
 .../src/main/java/org/apache/paimon/flink/action/ActionBase.java | 9 +++------
 .../paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java   | 9 +++------
 .../paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java      | 9 +++------
 4 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index 982cff2ae..abc8b10fe 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -69,4 +69,13 @@ public class FlinkCatalogFactory implements org.apache.flink.table.factories.Cat
                 catalogName,
                 context.options().get(DEFAULT_DATABASE));
     }
+
+    public static FlinkCatalog createCatalog(String catalogName, Catalog catalog) {
+        return new FlinkCatalog(catalog, catalogName, Catalog.DEFAULT_DATABASE);
+    }
+
+    public static Catalog createPaimonCatalog(Options catalogOptions) {
+        return CatalogFactory.createCatalog(
+                CatalogContext.create(catalogOptions, new FlinkFileIOLoader()));
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index de310148c..d2eb764cb 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -20,10 +20,9 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.utils.TableEnvironmentUtils;
@@ -53,8 +52,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.catalog.Catalog.DEFAULT_DATABASE;
-
 /** Abstract base of {@link Action}. */
 public abstract class ActionBase implements Action {
 
@@ -81,8 +78,8 @@ public abstract class ActionBase implements Action {
     ActionBase(String warehouse, String databaseName, String tableName, Options catalogOptions) {
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
         identifier = new Identifier(databaseName, tableName);
-        catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
-        flinkCatalog = new FlinkCatalog(catalog, catalogName, DEFAULT_DATABASE);
+        catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog);
 
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 3bce9d896..6aa02ddf6 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -19,9 +19,8 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -161,10 +160,8 @@ public class MySqlSyncDatabaseAction implements Action {
                         + "If you want to sync several MySQL tables into one Paimon table, "
                         + "use mysql-sync-table instead.");
         Catalog catalog =
-                CatalogFactory.createCatalog(
-                        CatalogContext.create(
-                                new Options(catalogConfig)
-                                        .set(CatalogOptions.WAREHOUSE, warehouse)));
+                FlinkCatalogFactory.createPaimonCatalog(
+                        Options.fromMap(catalogConfig).set(CatalogOptions.WAREHOUSE, warehouse));
         boolean caseSensitive = catalog.caseSensitive();
 
         if (!caseSensitive) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 3bb2a507f..ef7cfde61 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -19,9 +19,8 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -145,10 +144,8 @@ public class MySqlSyncTableAction implements Action {
         MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(mySqlConfig);
 
         Catalog catalog =
-                CatalogFactory.createCatalog(
-                        CatalogContext.create(
-                                new Options(catalogConfig)
-                                        .set(CatalogOptions.WAREHOUSE, warehouse)));
+                FlinkCatalogFactory.createPaimonCatalog(
+                        Options.fromMap(catalogConfig).set(CatalogOptions.WAREHOUSE, warehouse));
         boolean caseSensitive = catalog.caseSensitive();
 
         if (!caseSensitive) {


[incubator-paimon] 11/20: [license] Notice for flink-cdc-connectors and doris

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit d081ec88362b66183a3c2968c4a41fe5b23d0547
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun May 21 20:43:35 2023 +0800

    [license] Notice for flink-cdc-connectors and doris
---
 LICENSE                                                     |  2 ++
 NOTICE                                                      | 10 ++++++++++
 .../action/cdc/mysql/MySqlDebeziumJsonEventParser.java      | 13 ++++++-------
 .../paimon/flink/action/cdc/mysql/MySqlTypeUtils.java       | 13 ++++++-------
 4 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/LICENSE b/LICENSE
index 48c931558..e0f7b9630 100644
--- a/LICENSE
+++ b/LICENSE
@@ -213,6 +213,8 @@ paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.
 paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
 paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java
 paimon-filesystems/paimon-s3-impl/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java
+paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
 
 MIT License
diff --git a/NOTICE b/NOTICE
index a1502d1e1..7516802b4 100644
--- a/NOTICE
+++ b/NOTICE
@@ -10,8 +10,18 @@ Copyright 2014-2023 The Apache Software Foundation
 Apache Hadoop
 Copyright 2006 and onwards The Apache Software Foundation.
 
+Apache Hadoop
+Copyright 2006 and onwards The Apache Software Foundation.
+
 PalDB
 Copyright 2015 LinkedIn Corp
 
 AWS SDK for Java
 Copyright 2010-2014 Amazon.com, Inc. or its affiliates
+
+flink-cdc-connectors
+Copyright 2022 Ververica Inc.
+Apache Flink, FlinkĀ®, ApacheĀ®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.
+
+Apache Doris
+Copyright 2018-2022 The Apache Software Foundation
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 2ba877d08..7c831f066 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -16,6 +16,11 @@
  * limitations under the License.
  */
 
+/* This file is based on source code from JsonDebeziumSchemaSerializer in the Doris Project
+ * (https://doris.apache.org/), licensed by the Apache Software Foundation (ASF) under the
+ * Apache License, Version 2.0. See the NOTICE file distributed with this work for additional
+ *  information regarding copyright ownership. */
+
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -52,13 +57,7 @@ import java.util.Optional;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/**
- * {@link EventParser} for MySQL Debezium JSON.
- *
- * <p>Some implementation is referenced from <a
- * href="https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java">apache
- * / doris-flink-connector</a>.
- */
+/** {@link EventParser} for MySQL Debezium JSON. */
 public class MySqlDebeziumJsonEventParser implements EventParser<String> {
 
     private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 48869c24a..c4471b032 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -16,6 +16,11 @@
  * limitations under the License.
  */
 
+/* This file is based on source code from MySqlTypeUtils in the flink-cdc-connectors Project
+ * (https://ververica.github.io/flink-cdc-connectors/), licensed by the Apache Software Foundation (ASF)
+ * under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.types.DataType;
@@ -25,13 +30,7 @@ import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
-/**
- * Converts from MySQL type to {@link DataType}.
- *
- * <p>Mostly referenced from <a
- * href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java">ververica
- * / flink-cdc-connectors</a>.
- */
+/** Converts from MySQL type to {@link DataType}. */
 public class MySqlTypeUtils {
 
     // ------ MySQL Type ------


[incubator-paimon] 05/20: [bug] Fixed manifest list naming issue. (#1163)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 36eb45fb01326f80bc62d378f15116e613921446
Author: Dian Qi <qi...@163.com>
AuthorDate: Tue May 16 17:01:31 2023 +0800

    [bug] Fixed manifest list naming issue. (#1163)
---
 .../src/main/java/org/apache/paimon/manifest/ManifestList.java |  2 +-
 .../test/java/org/apache/paimon/manifest/ManifestListTest.java | 10 ++++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index c11536713..34ebe3308 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -110,7 +110,7 @@ public class ManifestList extends ObjectsFile<ManifestFileMeta> {
                     new ManifestFileMetaSerializer(),
                     fileFormat.createReaderFactory(metaType),
                     fileFormat.createWriterFactory(metaType),
-                    pathFactory.manifestFileFactory(),
+                    pathFactory.manifestListFactory(),
                     cache);
         }
     }
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index 824d3e219..63e3676a5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.FileStorePathFactory;
 
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
@@ -74,6 +75,15 @@ public class ManifestListTest {
         }
     }
 
+    @Test
+    public void testManifestListNaming() {
+        List<ManifestFileMeta> metas = generateData();
+        ManifestList manifestList = createManifestList(tempDir.toString());
+
+        String manifestListName = manifestList.write(metas);
+        assertThat(manifestListName.startsWith("manifest-list-")).isTrue();
+    }
+
     private List<ManifestFileMeta> generateData() {
         Random random = new Random();
         List<ManifestFileMeta> metas = new ArrayList<>();


[incubator-paimon] 10/20: [license] Remove maven wrapper files and add Path to license (#1178)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 5f012740553fc031500a488e86c34ba68b8f9aaf
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Sun May 21 20:38:25 2023 +0800

    [license] Remove maven wrapper files and add Path to license (#1178)
---
 .github/workflows/check-licensing.yml      |   4 +-
 .github/workflows/e2e-tests-1.14-jdk11.yml |   4 +-
 .github/workflows/e2e-tests-1.14.yml       |   4 +-
 .github/workflows/e2e-tests-1.15-jdk11.yml |   4 +-
 .github/workflows/e2e-tests-1.15.yml       |   4 +-
 .github/workflows/e2e-tests-1.16-jdk11.yml |   4 +-
 .github/workflows/e2e-tests-1.16.yml       |   4 +-
 .github/workflows/e2e-tests-1.17-jdk11.yml |   4 +-
 .github/workflows/e2e-tests-1.17.yml       |   4 +-
 .github/workflows/publish_snapshot.yml     |   4 +-
 .github/workflows/unitcase-flink-jdk11.yml |   4 +-
 .github/workflows/unitcase-jdk11.yml       |   4 +-
 .github/workflows/utitcase-flink.yml       |   4 +-
 .github/workflows/utitcase.yml             |   4 +-
 .mvn/wrapper/maven-wrapper.properties      |  18 --
 LICENSE                                    |   1 +
 mvnw                                       | 316 -----------------------------
 mvnw.cmd                                   | 188 -----------------
 tools/ci/sonar_check.sh                    |   2 +-
 19 files changed, 30 insertions(+), 551 deletions(-)

diff --git a/.github/workflows/check-licensing.yml b/.github/workflows/check-licensing.yml
index 93043b23f..4c73f3a00 100644
--- a/.github/workflows/check-licensing.yml
+++ b/.github/workflows/check-licensing.yml
@@ -43,12 +43,12 @@ jobs:
         run: |
           set -o pipefail
 
-          ./mvnw clean deploy ${{ env.MVN_COMMON_OPTIONS }} -Dmaven.test.skip=true \
+          mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -Dmaven.test.skip=true \
             -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \
             | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
 
       - name: Check licensing
         run: |
-          ./mvnw ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-licensing -N \
+          mvn ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-licensing -N \
             -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) ${{ env.MVN_VALIDATION_DIR }}" \
             -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
diff --git a/.github/workflows/e2e-tests-1.14-jdk11.yml b/.github/workflows/e2e-tests-1.14-jdk11.yml
index fcc1463c9..88f5de468 100644
--- a/.github/workflows/e2e-tests-1.14-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.14-jdk11.yml
@@ -45,11 +45,11 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.14
-        run: ./mvnw clean install -DskipTests -Pflink-1.14
+        run: mvn clean install -DskipTests -Pflink-1.14
       - name: Test Flink 1.14
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.14
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.14
diff --git a/.github/workflows/e2e-tests-1.14.yml b/.github/workflows/e2e-tests-1.14.yml
index cd75a290c..505a95b06 100644
--- a/.github/workflows/e2e-tests-1.14.yml
+++ b/.github/workflows/e2e-tests-1.14.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.14
-        run: ./mvnw clean install -DskipTests -Pflink-1.14
+        run: mvn clean install -DskipTests -Pflink-1.14
       - name: Test Flink 1.14
         timeout-minutes: 60
         run: |
@@ -53,4 +53,4 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.14
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.14
diff --git a/.github/workflows/e2e-tests-1.15-jdk11.yml b/.github/workflows/e2e-tests-1.15-jdk11.yml
index 48ae5a338..92692002e 100644
--- a/.github/workflows/e2e-tests-1.15-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.15-jdk11.yml
@@ -45,11 +45,11 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.15
-        run: ./mvnw clean install -DskipTests -Pflink-1.15
+        run: mvn clean install -DskipTests -Pflink-1.15
       - name: Test Flink 1.15
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.15
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.15
diff --git a/.github/workflows/e2e-tests-1.15.yml b/.github/workflows/e2e-tests-1.15.yml
index ce827f8b6..7485506c3 100644
--- a/.github/workflows/e2e-tests-1.15.yml
+++ b/.github/workflows/e2e-tests-1.15.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.15
-        run: ./mvnw clean install -DskipTests -Pflink-1.15
+        run: mvn clean install -DskipTests -Pflink-1.15
       - name: Test Flink 1.15
         timeout-minutes: 60
         run: |
@@ -53,4 +53,4 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.15
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.15
diff --git a/.github/workflows/e2e-tests-1.16-jdk11.yml b/.github/workflows/e2e-tests-1.16-jdk11.yml
index b7acacaca..96f841a54 100644
--- a/.github/workflows/e2e-tests-1.16-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.16-jdk11.yml
@@ -45,11 +45,11 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.16
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test Flink 1.16
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
diff --git a/.github/workflows/e2e-tests-1.16.yml b/.github/workflows/e2e-tests-1.16.yml
index bb2f85762..b0b149ce1 100644
--- a/.github/workflows/e2e-tests-1.16.yml
+++ b/.github/workflows/e2e-tests-1.16.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.16
-        run: ./mvnw clean install -DskipTests -Pflink-1.16
+        run: mvn clean install -DskipTests -Pflink-1.16
       - name: Test Flink 1.16
         timeout-minutes: 60
         run: |
@@ -53,4 +53,4 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.16
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.16
diff --git a/.github/workflows/e2e-tests-1.17-jdk11.yml b/.github/workflows/e2e-tests-1.17-jdk11.yml
index 1f46eb8b9..94c6516d0 100644
--- a/.github/workflows/e2e-tests-1.17-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.17-jdk11.yml
@@ -45,11 +45,11 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.17
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test Flink 1.17
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
diff --git a/.github/workflows/e2e-tests-1.17.yml b/.github/workflows/e2e-tests-1.17.yml
index 7c622c041..ff30f4f5e 100644
--- a/.github/workflows/e2e-tests-1.17.yml
+++ b/.github/workflows/e2e-tests-1.17.yml
@@ -44,7 +44,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.17
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test Flink 1.17
         timeout-minutes: 60
         run: |
@@ -52,4 +52,4 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
diff --git a/.github/workflows/publish_snapshot.yml b/.github/workflows/publish_snapshot.yml
index 481c92ad7..38e304b4f 100644
--- a/.github/workflows/publish_snapshot.yml
+++ b/.github/workflows/publish_snapshot.yml
@@ -65,10 +65,10 @@ jobs:
           echo "<password>$ASF_PASSWORD</password>" >> $tmp_settings
           echo "</server></servers></settings>" >> $tmp_settings
           
-          ./mvnw --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip -DskipTests -Papache-release
+          mvn --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip -DskipTests -Papache-release
 
           mv $tmp_settings /tmp/paimon-shade
           cd /tmp/paimon-shade
-          ./mvnw --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip -DskipTests -Papache-release
+          mvn --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip -DskipTests -Papache-release
           
           rm $tmp_settings
diff --git a/.github/workflows/unitcase-flink-jdk11.yml b/.github/workflows/unitcase-flink-jdk11.yml
index e9184bd09..391322433 100644
--- a/.github/workflows/unitcase-flink-jdk11.yml
+++ b/.github/workflows/unitcase-flink-jdk11.yml
@@ -45,11 +45,11 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+          mvn clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
diff --git a/.github/workflows/unitcase-jdk11.yml b/.github/workflows/unitcase-jdk11.yml
index 33f6c35cd..ed74180ff 100644
--- a/.github/workflows/unitcase-jdk11.yml
+++ b/.github/workflows/unitcase-jdk11.yml
@@ -45,11 +45,11 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+          mvn clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
diff --git a/.github/workflows/utitcase-flink.yml b/.github/workflows/utitcase-flink.yml
index 532c81cad..8749942ce 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -44,7 +44,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test
         timeout-minutes: 60
         run: |
@@ -52,4 +52,4 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+          mvn clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index d01e409c0..5a6e56c08 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build
-        run: ./mvnw clean install -DskipTests
+        run: mvn clean install -DskipTests
       - name: Test
         timeout-minutes: 60
         run: |
@@ -53,4 +53,4 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          ./mvnw clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+          mvn clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
deleted file mode 100644
index 8c79a83ae..000000000
--- a/.mvn/wrapper/maven-wrapper.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.4/apache-maven-3.8.4-bin.zip
-wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar
diff --git a/LICENSE b/LICENSE
index d98f6ed90..48c931558 100644
--- a/LICENSE
+++ b/LICENSE
@@ -208,6 +208,7 @@ for text of these licenses.
 Apache Software Foundation License 2.0
 --------------------------------------
 
+paimon-common/src/main/java/org/apache/paimon/fs/Path.java
 paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
 paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
 paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java
diff --git a/mvnw b/mvnw
deleted file mode 100755
index 5643201c7..000000000
--- a/mvnw
+++ /dev/null
@@ -1,316 +0,0 @@
-#!/bin/sh
-# ----------------------------------------------------------------------------
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# ----------------------------------------------------------------------------
-
-# ----------------------------------------------------------------------------
-# Maven Start Up Batch script
-#
-# Required ENV vars:
-# ------------------
-#   JAVA_HOME - location of a JDK home dir
-#
-# Optional ENV vars
-# -----------------
-#   M2_HOME - location of maven2's installed home dir
-#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
-#     e.g. to debug Maven itself, use
-#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
-#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
-# ----------------------------------------------------------------------------
-
-if [ -z "$MAVEN_SKIP_RC" ] ; then
-
-  if [ -f /usr/local/etc/mavenrc ] ; then
-    . /usr/local/etc/mavenrc
-  fi
-
-  if [ -f /etc/mavenrc ] ; then
-    . /etc/mavenrc
-  fi
-
-  if [ -f "$HOME/.mavenrc" ] ; then
-    . "$HOME/.mavenrc"
-  fi
-
-fi
-
-# OS specific support.  $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-mingw=false
-case "`uname`" in
-  CYGWIN*) cygwin=true ;;
-  MINGW*) mingw=true;;
-  Darwin*) darwin=true
-    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
-    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
-    if [ -z "$JAVA_HOME" ]; then
-      if [ -x "/usr/libexec/java_home" ]; then
-        export JAVA_HOME="`/usr/libexec/java_home`"
-      else
-        export JAVA_HOME="/Library/Java/Home"
-      fi
-    fi
-    ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
-  if [ -r /etc/gentoo-release ] ; then
-    JAVA_HOME=`java-config --jre-home`
-  fi
-fi
-
-if [ -z "$M2_HOME" ] ; then
-  ## resolve links - $0 may be a link to maven's home
-  PRG="$0"
-
-  # need this for relative symlinks
-  while [ -h "$PRG" ] ; do
-    ls=`ls -ld "$PRG"`
-    link=`expr "$ls" : '.*-> \(.*\)$'`
-    if expr "$link" : '/.*' > /dev/null; then
-      PRG="$link"
-    else
-      PRG="`dirname "$PRG"`/$link"
-    fi
-  done
-
-  saveddir=`pwd`
-
-  M2_HOME=`dirname "$PRG"`/..
-
-  # make it fully qualified
-  M2_HOME=`cd "$M2_HOME" && pwd`
-
-  cd "$saveddir"
-  # echo Using m2 at $M2_HOME
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
-  [ -n "$M2_HOME" ] &&
-    M2_HOME=`cygpath --unix "$M2_HOME"`
-  [ -n "$JAVA_HOME" ] &&
-    JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-  [ -n "$CLASSPATH" ] &&
-    CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# For Mingw, ensure paths are in UNIX format before anything is touched
-if $mingw ; then
-  [ -n "$M2_HOME" ] &&
-    M2_HOME="`(cd "$M2_HOME"; pwd)`"
-  [ -n "$JAVA_HOME" ] &&
-    JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
-fi
-
-if [ -z "$JAVA_HOME" ]; then
-  javaExecutable="`which javac`"
-  if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
-    # readlink(1) is not available as standard on Solaris 10.
-    readLink=`which readlink`
-    if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
-      if $darwin ; then
-        javaHome="`dirname \"$javaExecutable\"`"
-        javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
-      else
-        javaExecutable="`readlink -f \"$javaExecutable\"`"
-      fi
-      javaHome="`dirname \"$javaExecutable\"`"
-      javaHome=`expr "$javaHome" : '\(.*\)/bin'`
-      JAVA_HOME="$javaHome"
-      export JAVA_HOME
-    fi
-  fi
-fi
-
-if [ -z "$JAVACMD" ] ; then
-  if [ -n "$JAVA_HOME"  ] ; then
-    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
-      # IBM's JDK on AIX uses strange locations for the executables
-      JAVACMD="$JAVA_HOME/jre/sh/java"
-    else
-      JAVACMD="$JAVA_HOME/bin/java"
-    fi
-  else
-    JAVACMD="`\\unset -f command; \\command -v java`"
-  fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
-  echo "Error: JAVA_HOME is not defined correctly." >&2
-  echo "  We cannot execute $JAVACMD" >&2
-  exit 1
-fi
-
-if [ -z "$JAVA_HOME" ] ; then
-  echo "Warning: JAVA_HOME environment variable is not set."
-fi
-
-CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
-
-# traverses directory structure from process work directory to filesystem root
-# first directory with .mvn subdirectory is considered project base directory
-find_maven_basedir() {
-
-  if [ -z "$1" ]
-  then
-    echo "Path not specified to find_maven_basedir"
-    return 1
-  fi
-
-  basedir="$1"
-  wdir="$1"
-  while [ "$wdir" != '/' ] ; do
-    if [ -d "$wdir"/.mvn ] ; then
-      basedir=$wdir
-      break
-    fi
-    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
-    if [ -d "${wdir}" ]; then
-      wdir=`cd "$wdir/.."; pwd`
-    fi
-    # end of workaround
-  done
-  echo "${basedir}"
-}
-
-# concatenates all lines of a file
-concat_lines() {
-  if [ -f "$1" ]; then
-    echo "$(tr -s '\n' ' ' < "$1")"
-  fi
-}
-
-BASE_DIR=`find_maven_basedir "$(pwd)"`
-if [ -z "$BASE_DIR" ]; then
-  exit 1;
-fi
-
-##########################################################################################
-# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
-# This allows using the maven wrapper in projects that prohibit checking in binary data.
-##########################################################################################
-if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
-    if [ "$MVNW_VERBOSE" = true ]; then
-      echo "Found .mvn/wrapper/maven-wrapper.jar"
-    fi
-else
-    if [ "$MVNW_VERBOSE" = true ]; then
-      echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
-    fi
-    if [ -n "$MVNW_REPOURL" ]; then
-      jarUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
-    else
-      jarUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
-    fi
-    while IFS="=" read key value; do
-      case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
-      esac
-    done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
-    if [ "$MVNW_VERBOSE" = true ]; then
-      echo "Downloading from: $jarUrl"
-    fi
-    wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
-    if $cygwin; then
-      wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
-    fi
-
-    if command -v wget > /dev/null; then
-        if [ "$MVNW_VERBOSE" = true ]; then
-          echo "Found wget ... using wget"
-        fi
-        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
-            wget "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
-        else
-            wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
-        fi
-    elif command -v curl > /dev/null; then
-        if [ "$MVNW_VERBOSE" = true ]; then
-          echo "Found curl ... using curl"
-        fi
-        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
-            curl -o "$wrapperJarPath" "$jarUrl" -f
-        else
-            curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
-        fi
-
-    else
-        if [ "$MVNW_VERBOSE" = true ]; then
-          echo "Falling back to using Java to download"
-        fi
-        javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
-        # For Cygwin, switch paths to Windows format before running javac
-        if $cygwin; then
-          javaClass=`cygpath --path --windows "$javaClass"`
-        fi
-        if [ -e "$javaClass" ]; then
-            if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
-                if [ "$MVNW_VERBOSE" = true ]; then
-                  echo " - Compiling MavenWrapperDownloader.java ..."
-                fi
-                # Compiling the Java class
-                ("$JAVA_HOME/bin/javac" "$javaClass")
-            fi
-            if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
-                # Running the downloader
-                if [ "$MVNW_VERBOSE" = true ]; then
-                  echo " - Running MavenWrapperDownloader.java ..."
-                fi
-                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
-            fi
-        fi
-    fi
-fi
-##########################################################################################
-# End of extension
-##########################################################################################
-
-export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
-if [ "$MVNW_VERBOSE" = true ]; then
-  echo $MAVEN_PROJECTBASEDIR
-fi
-MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
-  [ -n "$M2_HOME" ] &&
-    M2_HOME=`cygpath --path --windows "$M2_HOME"`
-  [ -n "$JAVA_HOME" ] &&
-    JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
-  [ -n "$CLASSPATH" ] &&
-    CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
-  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
-    MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
-fi
-
-# Provide a "standardized" way to retrieve the CLI args that will
-# work with both Windows and non-Windows executions.
-MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
-export MAVEN_CMD_LINE_ARGS
-
-WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
-
-exec "$JAVACMD" \
-  $MAVEN_OPTS \
-  $MAVEN_DEBUG_OPTS \
-  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
-  "-Dmaven.home=${M2_HOME}" \
-  "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
-  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
deleted file mode 100644
index 8a15b7f31..000000000
--- a/mvnw.cmd
+++ /dev/null
@@ -1,188 +0,0 @@
-@REM ----------------------------------------------------------------------------
-@REM Licensed to the Apache Software Foundation (ASF) under one
-@REM or more contributor license agreements.  See the NOTICE file
-@REM distributed with this work for additional information
-@REM regarding copyright ownership.  The ASF licenses this file
-@REM to you under the Apache License, Version 2.0 (the
-@REM "License"); you may not use this file except in compliance
-@REM with the License.  You may obtain a copy of the License at
-@REM
-@REM    http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing,
-@REM software distributed under the License is distributed on an
-@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-@REM KIND, either express or implied.  See the License for the
-@REM specific language governing permissions and limitations
-@REM under the License.
-@REM ----------------------------------------------------------------------------
-
-@REM ----------------------------------------------------------------------------
-@REM Maven Start Up Batch script
-@REM
-@REM Required ENV vars:
-@REM JAVA_HOME - location of a JDK home dir
-@REM
-@REM Optional ENV vars
-@REM M2_HOME - location of maven2's installed home dir
-@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
-@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
-@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
-@REM     e.g. to debug Maven itself, use
-@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
-@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
-@REM ----------------------------------------------------------------------------
-
-@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
-@echo off
-@REM set title of command window
-title %0
-@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
-@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
-
-@REM set %HOME% to equivalent of $HOME
-if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
-
-@REM Execute a user defined script before this one
-if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
-@REM check for pre script, once with legacy .bat ending and once with .cmd ending
-if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %*
-if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %*
-:skipRcPre
-
-@setlocal
-
-set ERROR_CODE=0
-
-@REM To isolate internal variables from possible post scripts, we use another setlocal
-@setlocal
-
-@REM ==== START VALIDATION ====
-if not "%JAVA_HOME%" == "" goto OkJHome
-
-echo.
-echo Error: JAVA_HOME not found in your environment. >&2
-echo Please set the JAVA_HOME variable in your environment to match the >&2
-echo location of your Java installation. >&2
-echo.
-goto error
-
-:OkJHome
-if exist "%JAVA_HOME%\bin\java.exe" goto init
-
-echo.
-echo Error: JAVA_HOME is set to an invalid directory. >&2
-echo JAVA_HOME = "%JAVA_HOME%" >&2
-echo Please set the JAVA_HOME variable in your environment to match the >&2
-echo location of your Java installation. >&2
-echo.
-goto error
-
-@REM ==== END VALIDATION ====
-
-:init
-
-@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
-@REM Fallback to current working directory if not found.
-
-set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
-IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
-
-set EXEC_DIR=%CD%
-set WDIR=%EXEC_DIR%
-:findBaseDir
-IF EXIST "%WDIR%"\.mvn goto baseDirFound
-cd ..
-IF "%WDIR%"=="%CD%" goto baseDirNotFound
-set WDIR=%CD%
-goto findBaseDir
-
-:baseDirFound
-set MAVEN_PROJECTBASEDIR=%WDIR%
-cd "%EXEC_DIR%"
-goto endDetectBaseDir
-
-:baseDirNotFound
-set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
-cd "%EXEC_DIR%"
-
-:endDetectBaseDir
-
-IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
-
-@setlocal EnableExtensions EnableDelayedExpansion
-for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
-@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
-
-:endReadAdditionalConfig
-
-SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
-set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
-set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
-
-set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
-
-FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
-    IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
-)
-
-@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
-@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
-if exist %WRAPPER_JAR% (
-    if "%MVNW_VERBOSE%" == "true" (
-        echo Found %WRAPPER_JAR%
-    )
-) else (
-    if not "%MVNW_REPOURL%" == "" (
-        SET DOWNLOAD_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
-    )
-    if "%MVNW_VERBOSE%" == "true" (
-        echo Couldn't find %WRAPPER_JAR%, downloading it ...
-        echo Downloading from: %DOWNLOAD_URL%
-    )
-
-    powershell -Command "&{"^
-		"$webclient = new-object System.Net.WebClient;"^
-		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
-		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
-		"}"^
-		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
-		"}"
-    if "%MVNW_VERBOSE%" == "true" (
-        echo Finished downloading %WRAPPER_JAR%
-    )
-)
-@REM End of extension
-
-@REM Provide a "standardized" way to retrieve the CLI args that will
-@REM work with both Windows and non-Windows executions.
-set MAVEN_CMD_LINE_ARGS=%*
-
-%MAVEN_JAVA_EXE% ^
-  %JVM_CONFIG_MAVEN_PROPS% ^
-  %MAVEN_OPTS% ^
-  %MAVEN_DEBUG_OPTS% ^
-  -classpath %WRAPPER_JAR% ^
-  "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^
-  %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
-if ERRORLEVEL 1 goto error
-goto end
-
-:error
-set ERROR_CODE=1
-
-:end
-@endlocal & set ERROR_CODE=%ERROR_CODE%
-
-if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost
-@REM check for post script, once with legacy .bat ending and once with .cmd ending
-if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat"
-if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd"
-:skipRcPost
-
-@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
-if "%MAVEN_BATCH_PAUSE%"=="on" pause
-
-if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE%
-
-cmd /C exit /B %ERROR_CODE%
diff --git a/tools/ci/sonar_check.sh b/tools/ci/sonar_check.sh
index 3d93984ef..95673c389 100644
--- a/tools/ci/sonar_check.sh
+++ b/tools/ci/sonar_check.sh
@@ -18,7 +18,7 @@ if [ ! "$SONAR_TOKEN" ]; then
   echo "SONAR_TOKEN environment is null, skip check"
   exit 0
 fi
-./mvnw --batch-mode verify sonar:sonar \
+mvn --batch-mode verify sonar:sonar \
 -Dmaven.test.skip=true -Dsonar.host.url=https://sonarcloud.io \
 -Dsonar.organization=apache \
 -Dsonar.projectKey=apache_incubator-paimon \


[incubator-paimon] 08/20: [flink] Introduce paimon-flink-action jar to execute actions (#1169)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit cb8ea51fd7e2eb3e30d60575e6b951e065ace711
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu May 18 11:49:49 2023 +0800

    [flink] Introduce paimon-flink-action jar to execute actions (#1169)
---
 docs/content/concepts/file-operations.md           |  6 +-
 docs/content/engines/flink.md                      |  2 +
 docs/content/how-to/cdc-ingestion.md               | 12 ++--
 docs/content/how-to/writing-tables.md              | 40 ++++---------
 docs/content/maintenance/write-performance.md      |  9 +--
 paimon-e2e-tests/pom.xml                           | 17 ++++++
 .../apache/paimon/tests/FlinkActionsE2eTest.java   | 20 ++-----
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      |  8 +--
 .../tests/cdc/MySqlComputedColumnE2ETest.java      |  4 +-
 .../paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java  |  4 +-
 .../test/resources-filtered/docker-compose.yaml    |  8 +--
 paimon-flink/paimon-flink-action/README.md         | 26 ++++++++
 paimon-flink/paimon-flink-action/pom.xml           | 69 ++++++++++++++++++++++
 .../apache/paimon/flink/action/FlinkActions.java   |  8 ---
 .../org/apache/paimon/flink/action/Action.java     |  2 +-
 .../apache/paimon/flink/action/CompactAction.java  |  5 +-
 .../apache/paimon/flink/action/FlinkActions.java   | 15 ++---
 .../ChangelogWithKeyFileStoreTableITCase.java      |  8 +--
 paimon-flink/pom.xml                               |  1 +
 19 files changed, 160 insertions(+), 104 deletions(-)

diff --git a/docs/content/concepts/file-operations.md b/docs/content/concepts/file-operations.md
index 3ef9e4d80..e0387ebd6 100644
--- a/docs/content/concepts/file-operations.md
+++ b/docs/content/concepts/file-operations.md
@@ -246,8 +246,7 @@ dedicated compaction job through `flink run`:
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     compact \
     --warehouse <warehouse-path> \
     --database <database-name> \ 
@@ -260,8 +259,7 @@ an example would be (suppose you're already in Flink home)
 
 ```bash
 ./bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    ./lib/paimon-flink-1.17-0.5-SNAPSHOT.jar \
+    ./lib/paimon-flink-action-{{< version >}}.jar \
     compact \
     --path file:///tmp/paimon/default.db/T
 ```
diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 171a33546..4d19d4351 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -42,6 +42,7 @@ Download the jar file with corresponding version.
 | Flink 1.16 | [paimon-flink-1.16-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/{{< version >}}/paimon-flink-1.16-{{< version >}}.jar) |
 | Flink 1.15 | [paimon-flink-1.15-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.15/{{< version >}}/paimon-flink-1.15-{{< version >}}.jar) |
 | Flink 1.14 | [paimon-flink-1.14-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.14/{{< version >}}/paimon-flink-1.14-{{< version >}}.jar) |
+| Flink Action | [paimon-flink-action-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/{{< version >}}/paimon-flink-action-{{< version >}}.jar) |
 
 {{< /stable >}}
 
@@ -53,6 +54,7 @@ Download the jar file with corresponding version.
 | Flink 1.16 | [paimon-flink-1.16-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.16/{{< version >}}/) |
 | Flink 1.15 | [paimon-flink-1.15-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.15/{{< version >}}/) |
 | Flink 1.14 | [paimon-flink-1.14-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.14/{{< version >}}/) |
+| Flink Action | [paimon-flink-action-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/{{< version >}}/) |
 
 {{< /unstable >}}
 
diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md
index 9f425aae1..e0f35a220 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -44,8 +44,7 @@ To use this feature through `flink run`, run the following shell command.
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     mysql-sync-table
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -79,8 +78,7 @@ Example
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     mysql-sync-table \
     --warehouse hdfs:///path/to/warehouse \
     --database test_db \
@@ -108,8 +106,7 @@ To use this feature through `flink run`, run the following shell command.
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     mysql-sync-database
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -146,8 +143,7 @@ Example
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     mysql-sync-database \
     --warehouse hdfs:///path/to/warehouse \
     --database test_db \
diff --git a/docs/content/how-to/writing-tables.md b/docs/content/how-to/writing-tables.md
index 0b408ed5c..b9f504a48 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -206,9 +206,7 @@ Run the following command to submit a drop-partition job for the table.
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     drop-partition \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -224,9 +222,7 @@ For more information of drop-partition, see
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     drop-partition --help
 ```
 
@@ -246,9 +242,7 @@ Run the following command to submit a 'delete' job for the table.
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     delete \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -265,9 +259,7 @@ For more information of 'delete', see
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     delete --help
 ```
 
@@ -314,9 +306,7 @@ Run the following command to submit a 'merge-into' job for the table.
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     merge-into \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -341,9 +331,7 @@ You can pass sqls by '--source-sql <sql> [, --source-sql <sql> ...]' to config e
 -- Find all orders mentioned in the source table, then mark as important if the price is above 100 
 -- or delete if the price is under 10.
 ./flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     merge-into \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -359,9 +347,7 @@ You can pass sqls by '--source-sql <sql> [, --source-sql <sql> ...]' to config e
 -- For matched order rows, increase the price, and if there is no match, insert the order from the 
 -- source table:
 ./flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     merge-into \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -376,9 +362,7 @@ You can pass sqls by '--source-sql <sql> [, --source-sql <sql> ...]' to config e
 -- For not matched by source order rows (which are in the target table and does not match any row in the
 -- source table based on the merge-condition), decrease the price or if the mark is 'trivial', delete them:
 ./flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     merge-into \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -394,9 +378,7 @@ You can pass sqls by '--source-sql <sql> [, --source-sql <sql> ...]' to config e
 -- A --source-sql example: 
 -- Create a temporary view S in new catalog and use it as source table
 ./flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     merge-into \
     --warehouse <warehouse-path> \
     --database <database-name> \
@@ -468,9 +450,7 @@ For more information of 'merge-into', see
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    -Dclassloader.resolve-order=parent-first \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     merge-into --help
 ```
 {{< /tab >}}
diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md
index 1dd82cf3b..3dafff492 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -161,8 +161,7 @@ Run the following command to submit a compaction job for the table.
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     compact \
     --warehouse <warehouse-path> \
     --database <database-name> \ 
@@ -184,8 +183,7 @@ Example
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     compact \
     --warehouse s3:///path/to/warehouse \
     --database test_db \
@@ -201,8 +199,7 @@ For more usage of the compact action, see
 
 ```bash
 <FLINK_HOME>/bin/flink run \
-    -c org.apache.paimon.flink.action.FlinkActions \
-    /path/to/paimon-flink-**-{{< version >}}.jar \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
     compact --help
 ```
 
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 1ccfb7c26..bf6b206d1 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -48,6 +48,13 @@ under the License.
             <scope>runtime</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-action</artifactId>
+            <version>${project.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-hive-connector-common</artifactId>
@@ -135,6 +142,16 @@ under the License.
                             <outputDirectory>/tmp/paimon-e2e-tests-jars
                             </outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.paimon</groupId>
+                            <artifactId>paimon-flink-action</artifactId>
+                            <version>${project.version}</version>
+                            <destFileName>paimon-flink-action.jar</destFileName>
+                            <type>jar</type>
+                            <overWrite>true</overWrite>
+                            <outputDirectory>/tmp/paimon-e2e-tests-jars
+                            </outputDirectory>
+                        </artifactItem>
                         <artifactItem>
                             <groupId>org.apache.paimon</groupId>
                             <artifactId>paimon-hive-connector-common</artifactId>
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
index b592dd169..cd3e23e5f 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
@@ -102,13 +102,10 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                 jobManager.execInContainer(
                         "bin/flink",
                         "run",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
                         "-D",
                         "execution.checkpointing.interval=1s",
-                        "-Dclassloader.resolve-order=parent-first",
                         "--detached",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "compact",
                         "--warehouse",
                         warehousePath,
@@ -167,10 +164,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                         "run",
                         "-p",
                         "1",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
-                        "-Dclassloader.resolve-order=parent-first",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "drop-partition",
                         "--warehouse",
                         warehousePath,
@@ -227,10 +221,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                         "run",
                         "-p",
                         "1",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
-                        "-Dclassloader.resolve-order=parent-first",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "delete",
                         "--warehouse",
                         warehousePath,
@@ -295,10 +286,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                         "run",
                         "-p",
                         "1",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
-                        "-Dclassloader.resolve-order=parent-first",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "merge-into",
                         "--warehouse",
                         warehousePath,
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index d3a222c11..2dac7c6c0 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -108,12 +108,10 @@ public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
                         " ",
                         "bin/flink",
                         "run",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
                         "-D",
                         "execution.checkpointing.interval=1s",
                         "--detached",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "mysql-sync-table",
                         "--warehouse",
                         warehousePath,
@@ -226,12 +224,10 @@ public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
                         " ",
                         "bin/flink",
                         "run",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
                         "-D",
                         "execution.checkpointing.interval=1s",
                         "--detached",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "mysql-sync-database",
                         "--warehouse",
                         warehousePath,
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 60eda1c62..54b962a10 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -46,12 +46,10 @@ public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {
                         " ",
                         "bin/flink",
                         "run",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
                         "-D",
                         "execution.checkpointing.interval=1s",
                         "--detached",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "mysql-sync-table",
                         "--warehouse",
                         warehousePath,
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
index 3ed5595e9..8a2c169e9 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
@@ -73,12 +73,10 @@ public class MySqlIgnoreCaseE2EeTest extends MySqlCdcE2eTestBase {
                         " ",
                         "bin/flink",
                         "run",
-                        "-c",
-                        "org.apache.paimon.flink.action.FlinkActions",
                         "-D",
                         "execution.checkpointing.interval=1s",
                         "--detached",
-                        "lib/paimon-flink.jar",
+                        "lib/paimon-flink-action.jar",
                         "mysql-sync-database",
                         "--warehouse",
                         warehousePath,
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index a24999e47..42a217554 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -31,7 +31,7 @@ services:
       - /tmp/paimon-e2e-tests-jars:/jars
     entrypoint: >
       /bin/bash -c "
-      cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
+      cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
       /jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar /opt/flink/lib ;
       echo 'See FLINK-31659 for why we need the following two steps' ;
       mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
@@ -54,7 +54,7 @@ services:
       - /tmp/paimon-e2e-tests-jars:/jars
     entrypoint: >
       /bin/bash -c "
-      cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
+      cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
       /jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar /opt/flink/lib ;
       echo 'See FLINK-31659 for why we need the following two steps' ;
       mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
@@ -198,7 +198,7 @@ services:
     ports:
       - "8080:8080"
       - "7077:7077"
-    entrypoint: /bin/bash -c "cp /jars/paimon-flink.jar /jars/paimon-spark.jar /spark/jars/ && sh /master.sh"
+    entrypoint: /bin/bash -c "cp /jars/paimon-spark.jar /spark/jars/ && sh /master.sh"
     environment:
       - INIT_DAEMON_STEP=setup_spark
 
@@ -211,7 +211,7 @@ services:
       - /tmp/paimon-e2e-tests-jars:/jars
     ports:
       - "8081:8081"
-    entrypoint: /bin/bash -c "cp /jars/paimon-flink.jar /jars/paimon-spark.jar /spark/jars/ && sh /worker.sh"
+    entrypoint: /bin/bash -c "cp /jars/paimon-spark.jar /spark/jars/ && sh /worker.sh"
     environment:
       - "SPARK_MASTER=spark://spark-master:7077"
 
diff --git a/paimon-flink/paimon-flink-action/README.md b/paimon-flink/paimon-flink-action/README.md
new file mode 100644
index 000000000..a633b7de1
--- /dev/null
+++ b/paimon-flink/paimon-flink-action/README.md
@@ -0,0 +1,26 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Action
+
+This module contains one class FlinkActions.
+
+The reason for extracting it as a separate module is that: When executing the Flink jar job, a jar must be specified.
+If a `paimon-flink.jar` is specified, it may cause various classloader issues, as there are also `paimon-flink.jar`
+in flink/lib and User Classloader, which will cause classes conflicts.
diff --git a/paimon-flink/paimon-flink-action/pom.xml b/paimon-flink/paimon-flink-action/pom.xml
new file mode 100644
index 000000000..3af1de88d
--- /dev/null
+++ b/paimon-flink/paimon-flink-action/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-flink</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>0.5-SNAPSHOT</version>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <artifactId>paimon-flink-action</artifactId>
+    <name>Paimon : Flink : Action</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-paimon</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.paimon.flink.action.FlinkActions</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java b/paimon-flink/paimon-flink-action/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
similarity index 81%
copy from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
copy to paimon-flink/paimon-flink-action/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
index 16c8296f7..c2e7bf64f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
+++ b/paimon-flink/paimon-flink-action/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
@@ -25,14 +25,6 @@ import static org.apache.paimon.flink.action.Action.Factory.printHelp;
 /** Table maintenance actions for Flink. */
 public class FlinkActions {
 
-    // ------------------------------------------------------------------------
-    //  Java API
-    // ------------------------------------------------------------------------
-
-    public static CompactAction compact(String warehouse, String database, String tableName) {
-        return new CompactAction(warehouse, database, tableName);
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 7cb6ca8d3..9cf96eab0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -146,7 +146,7 @@ public interface Action {
             }
         }
 
-        static void printHelp() {
+        public static void printHelp() {
             System.out.println("Usage: <action> [OPTIONS]");
             System.out.println();
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index b525992b6..ef934cf35 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -53,11 +53,12 @@ public class CompactAction extends ActionBase {
     private final CompactorSourceBuilder sourceBuilder;
     private final CompactorSinkBuilder sinkBuilder;
 
-    CompactAction(String warehouse, String database, String tableName) {
+    public CompactAction(String warehouse, String database, String tableName) {
         this(warehouse, database, tableName, new Options());
     }
 
-    CompactAction(String warehouse, String database, String tableName, Options catalogOptions) {
+    public CompactAction(
+            String warehouse, String database, String tableName, Options catalogOptions) {
         super(warehouse, database, tableName, catalogOptions);
         if (!(table instanceof FileStoreTable)) {
             throw new UnsupportedOperationException(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
index 16c8296f7..6ed2d112b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FlinkActions.java
@@ -22,17 +22,14 @@ import java.util.Optional;
 
 import static org.apache.paimon.flink.action.Action.Factory.printHelp;
 
-/** Table maintenance actions for Flink. */
+/**
+ * Table maintenance actions for Flink.
+ *
+ * @deprecated Compatible with older versions of usage
+ */
+@Deprecated
 public class FlinkActions {
 
-    // ------------------------------------------------------------------------
-    //  Java API
-    // ------------------------------------------------------------------------
-
-    public static CompactAction compact(String warehouse, String database, String tableName) {
-        return new CompactAction(warehouse, database, tableName);
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
index 8f7e254c4..05fbede0c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.flink.action.FlinkActions;
+import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.FailingFileIO;
@@ -144,7 +144,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends AbstractTestBase {
         StreamExecutionEnvironment env = createStreamExecutionEnvironment(2000);
         env.setParallelism(1);
         env.setRestartStrategy(RestartStrategies.noRestart());
-        FlinkActions.compact(path, "default", "T").build(env);
+        new CompactAction(path, "default", "T").build(env);
         JobClient client = env.executeAsync();
 
         // write records for a while
@@ -388,7 +388,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends AbstractTestBase {
             StreamExecutionEnvironment env =
                     createStreamExecutionEnvironment(random.nextInt(1900) + 100);
             env.setParallelism(2);
-            FlinkActions.compact(path, "default", "T").build(env);
+            new CompactAction(path, "default", "T").build(env);
             env.executeAsync();
         }
 
@@ -422,7 +422,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends AbstractTestBase {
             StreamExecutionEnvironment env =
                     createStreamExecutionEnvironment(random.nextInt(1900) + 100);
             env.setParallelism(2);
-            FlinkActions.compact(path, "default", "T").build(env);
+            new CompactAction(path, "default", "T").build(env);
             env.executeAsync();
         }
 
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index 0d9e0ccd0..b00d7fce2 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -39,6 +39,7 @@ under the License.
         <module>paimon-flink-1.15</module>
         <module>paimon-flink-1.16</module>
         <module>paimon-flink-1.17</module>
+        <module>paimon-flink-action</module>
     </modules>
 
     <dependencies>


[incubator-paimon] 06/20: [doc] Add document page for operations on files (#1158)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 03a30685366627a79c4f057c4024add92b78ce63
Author: Dian Qi <qi...@163.com>
AuthorDate: Wed May 17 14:03:00 2023 +0800

    [doc] Add document page for operations on files (#1158)
---
 docs/content/concepts/file-operations.md   | 384 +++++++++++++++++++++++++++++
 docs/static/img/cdc-ingestion-commit.png   | Bin 0 -> 668838 bytes
 docs/static/img/cdc-ingestion-compact.png  | Bin 0 -> 1127761 bytes
 docs/static/img/cdc-ingestion-source.png   | Bin 0 -> 411986 bytes
 docs/static/img/cdc-ingestion-topology.png | Bin 0 -> 382727 bytes
 docs/static/img/cdc-ingestion-write.png    | Bin 0 -> 601989 bytes
 docs/static/img/file-operations-0.png      | Bin 0 -> 347962 bytes
 docs/static/img/file-operations-1.png      | Bin 0 -> 543256 bytes
 docs/static/img/file-operations-2.png      | Bin 0 -> 741038 bytes
 docs/static/img/file-operations-3.png      | Bin 0 -> 856310 bytes
 docs/static/img/file-operations-4.png      | Bin 0 -> 690946 bytes
 11 files changed, 384 insertions(+)

diff --git a/docs/content/concepts/file-operations.md b/docs/content/concepts/file-operations.md
new file mode 100644
index 000000000..3ef9e4d80
--- /dev/null
+++ b/docs/content/concepts/file-operations.md
@@ -0,0 +1,384 @@
+---
+title: "File Operations"
+weight: 4
+type: docs
+aliases:
+- /concepts/file-operations.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# File Operations
+
+This article is specifically designed to clarify 
+the impact that various file operations have on files. 
+
+This page provides concrete examples and practical tips for 
+effectively managing them. Furthermore, through an in-depth 
+exploration of operations such as commit and compact, 
+we aim to offer insights into the creation and updates of files.
+
+## Prerequisite
+
+Before delving further into this page, please ensure that you have read through the
+following sections:
+
+1. [Basic Concepts]({{< ref "concepts/basic-concepts" >}}), 
+2. [File Layouts]({{< ref "concepts/file-layouts" >}}) and 
+3. How to use Paimon in [Flink]({{< ref "engines/flink" >}}).
+
+## Create Catalog
+
+Start Flink SQL client via `./sql-client.sh` and execute the following 
+statements one by one to create a Paimon catalog.  
+```sql
+CREATE CATALOG paimon WITH (
+'type' = 'paimon',
+'warehouse' = 'file:///tmp/paimon'
+);
+
+USE CATALOG paimon;
+```
+
+This will only create a directory at given path `file:///tmp/paimon`.
+
+## Create Table
+
+Execute the following create table statement will create a Paimon table with 3 fields:
+
+```sql
+CREATE TABLE T (
+  id BIGINT,
+  a INT,
+  b STRING,
+  dt STRING COMMENT 'timestamp string in format yyyyMMdd',
+  PRIMARY KEY(id, dt) NOT ENFORCED
+) PARTITIONED BY (dt);
+```
+
+This will create Paimon table `T` under the path `/tmp/paimon/default.db/T`, 
+with its schema stored in `/tmp/paimon/default.db/T/schema/schema-0` 
+
+
+## Insert Records Into Table
+
+Run the following insert statement in Flink SQL:
+
+```sql
+INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');
+```
+
+After the Flink job finishes, the records are written into Paimon table, which 
+is done by a successful `commit`. The records are visible to user
+as can be verified by `SELECT * FROM T` which return a single row. 
+The commit creates a snapshot under path `/tmp/paimon/default.db/T/snapshot/snapshot-1`. 
+The resulting file layout as of snapshot-1 is as follows:
+
+{{< img src="/img/file-operations-0.png">}}
+
+The content of snapshot-1 contains metadata of the snapshot, such as manifest list and schema id:
+```json
+{
+  "version" : 3,
+  "id" : 1,
+  "schemaId" : 0,
+  "baseManifestList" : "manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0",
+  "deltaManifestList" : "manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1",
+  "changelogManifestList" : null,
+  "commitUser" : "7d758485-981d-4b1a-a0c6-d34c3eb254bf",
+  "commitIdentifier" : 9223372036854775807,
+  "commitKind" : "APPEND",
+  "timeMillis" : 1684155393354,
+  "logOffsets" : { },
+  "totalRecordCount" : 1,
+  "deltaRecordCount" : 1,
+  "changelogRecordCount" : 0,
+  "watermark" : -9223372036854775808
+}
+```
+
+Remind that a manifest list contains all changes of the snapshot, `baseManifestList` is the base 
+file upon which the changes in `deltaManifestList` is applied. 
+The first commit will result in 1 manifest file, and 2 manifest lists are 
+created (the file names might differ from those in your experiment):
+
+```bash
+./T/manifest:
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1	
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0
+manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0
+```
+`manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0` is the manifest 
+file (manifest-1-0 in the above graph), which stores the information about the data files in the snapshot.
+
+`manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0` is the 
+baseManifestList (manifest-list-1-base in the above graph), which is effectively empty.
+
+`manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1` is the 
+deltaManifestList (manifest-list-1-delta in the above graph), which 
+contains a list of manifest entries that perform operations on data 
+files, which, in this case, is `manifest-B-0`.
+
+
+Now let's insert a batch of records across different partitions and 
+see what happens. In Flink SQL, execute the following statement:
+
+```sql
+INSERT INTO T VALUES 
+(2, 10002, 'varchar00002', '20230502'),
+(3, 10003, 'varchar00003', '20230503'),
+(4, 10004, 'varchar00004', '20230504'),
+(5, 10005, 'varchar00005', '20230505'),
+(6, 10006, 'varchar00006', '20230506'),
+(7, 10007, 'varchar00007', '20230507'),
+(8, 10008, 'varchar00008', '20230508'),
+(9, 10009, 'varchar00009', '20230509'),
+(10, 10010, 'varchar00010', '20230510');
+```
+
+The second `commit` takes place and executing `SELECT * FROM T` will return 
+10 rows. A new snapshot, namely `snapshot-2`, is created and gives us the 
+following physical file layout:
+```bash
+ % ls -atR . 
+./T:
+dt=20230501
+dt=20230502	
+dt=20230503	
+dt=20230504	
+dt=20230505	
+dt=20230506	
+dt=20230507	
+dt=20230508	
+dt=20230509	
+dt=20230510	
+snapshot
+schema
+manifest
+
+./T/snapshot:
+LATEST
+snapshot-2
+EARLIEST
+snapshot-1
+
+./T/manifest:
+manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1	 # delta manifest list for snapshot-2
+manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0  # base manifest list for snapshot-2	
+manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0	 # manifest file for snapshot-2
+
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1	 # delta manifest list for snapshot-1 
+manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0  # base manifest list for snapshot-1
+manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0  # manifest file for snapshot-1
+
+./T/dt=20230501/bucket-0:
+data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc
+
+...
+# each partition has the data written to bucket-0
+...
+
+./T/schema:
+schema-0
+```
+The new file layout as of snapshot-2 looks like
+{{< img src="/img/file-operations-1.png">}}
+
+## Delete Records From Table
+
+Now let's delete records that meet the condition `dt>=20230503`. 
+In Flink SQL, execute the following statement:
+
+```sql
+DELETE FROM T WHERE dt >= '20230503';
+```
+The third `commit` takes place and it gives us `snapshot-3`. Now, listing the files 
+under the table and your will find out no partition is dropped. Instead, a new data 
+file is created for partition `20230503` to `20230510`:
+
+```bash
+./T/dt=20230510/bucket-0:
+data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement 
+data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement
+```
+
+This make sense since we insert a record in the second commit (represented by 
+`+I[10, 10010, 'varchar00010', '20230510']`) and then delete
+the record in the third commit. Executing `SELECT * FROM T` will return 2 rows, namely: 
+```
++I[1, 10001, 'varchar00001', '20230501']
++I[2, 10002, 'varchar00002', '20230502']
+```
+
+The new file layout as of snapshot-3 looks like
+{{< img src="/img/file-operations-2.png">}}
+
+Note that `manifest-3-0` contains 8 manifest entries of `ADD` operation type, 
+corresponding to 8 newly written data files. 
+
+
+
+## Compact Table
+
+As you may have noticed, the number of small files will augment over successive
+snapshots, which may lead to decreased read performance. Therefore, a full-compaction
+is needed in order to reduce the number of small files.
+
+Let's trigger the full-compaction now. Make sure you have set execution mode to `batch` 
+(add an entry `execution.runtime-mode: batch`  in `flink-conf.yaml`) and run a 
+dedicated compaction job through `flink run`:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.paimon.flink.action.FlinkActions \
+    /path/to/paimon-flink-**-{{< version >}}.jar \
+    compact \
+    --warehouse <warehouse-path> \
+    --database <database-name> \ 
+    --table <table-name> \
+    [--partition <partition-name>] \
+    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
+```
+
+an example would be (suppose you're already in Flink home)
+
+```bash
+./bin/flink run \
+    -c org.apache.paimon.flink.action.FlinkActions \
+    ./lib/paimon-flink-1.17-0.5-SNAPSHOT.jar \
+    compact \
+    --path file:///tmp/paimon/default.db/T
+```
+
+All current table files will be compacted and a new snapshot, namely `snapshot-4`, is 
+made and contains the following information:
+
+```json
+{
+  "version" : 3,
+  "id" : 4,
+  "schemaId" : 0,
+  "baseManifestList" : "manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0",
+  "deltaManifestList" : "manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1",
+  "changelogManifestList" : null,
+  "commitUser" : "a3d951d5-aa0e-4071-a5d4-4c72a4233d48",
+  "commitIdentifier" : 9223372036854775807,
+  "commitKind" : "COMPACT",
+  "timeMillis" : 1684163217960,
+  "logOffsets" : { },
+  "totalRecordCount" : 38,
+  "deltaRecordCount" : 20,
+  "changelogRecordCount" : 0,
+  "watermark" : -9223372036854775808
+}
+```
+
+The new file layout as of snapshot-4 looks like
+{{< img src="/img/file-operations-3.png">}}
+
+Note that `manifest-4-0` contains 20 manifest entries (18 `DELETE` operations and 2 `ADD` operations) 
+1. For partition `20230503` to `20230510`, two `DELETE` operations for two data files
+2. For partition `20230501` to `20230502`, one `DELETE` operation and one `ADD` operation 
+   for the same data file.
+
+
+## Alter Table
+Execute the following statement to configure full-compaction:
+```sql
+ALTER TABLE T SET ('full-compaction.delta-commits' = '1');
+```
+
+It will create a new schema for Paimon table, namely `schema-1`, but no snapshot
+has actually used this schema yet until the next commit.
+
+## Expire Snapshots
+
+Remind that the marked data files are not truly deleted until the snapshot expires and 
+no consumer depends on the snapshot. For more information, see [Expiring Snapshots]({{< ref "maintenance/expiring-snapshots" >}}).
+
+During snapshot expire, the range of snapshots are determined first and then marked data files in these snapshots will be 
+deleted. A data file is `marked` only when there's a manifest entry of kind `DELETE` pointing to that data file, so that it 
+will not be used by next snapshots and can be safely deleted.
+
+
+Let's say all 4 snapshots in the above diagram are about to expire. The expire process is as follows:
+
+1. It first deletes all marked data files, and records any changed buckets. 
+   
+2. It then deletes any changelog files and associated manifests. 
+   
+3. Finally, it deletes the snapshots themselves and writes the earliest hint file.
+
+If any directories are left empty after the deletion process, they will be deleted as well.
+
+
+Let's say another snapshot, `snapshot-5` is created and snapshot expiration is triggered. `snapshot-1` to `snapshot-4` are  
+to be deleted. For simplicity, we will only focus on files from previous snapshots, the final layout after snapshot 
+expiration looks like:
+
+{{< img src="/img/file-operations-4.png">}}
+
+As a result, partition `20230503` to `20230510` are physically deleted.
+
+## Flink Stream Write
+
+Finally, we will examine Flink Stream Write by utilizing the example
+of CDC ingestion. This section will address the capturing and writing of 
+change data into Paimon, as well as the mechanisms behind asynchronous compact 
+and snapshot commit and expiration.
+
+To begin, let's take a closer look at the CDC ingestion workflow and 
+the unique roles played by each component involved.
+
+{{< img src="/img/cdc-ingestion-topology.png">}}
+
+1. `MySQL CDC Source` uniformly reads snapshot and incremental data, with `SnapshotReader` reading snapshot data 
+   and `BinlogReader` reading incremental data, respectively.
+2. `Paimon Sink` writes data into Paimon table in bucket level. The `CompactManager` within it will trigger compaction
+   asynchronously.
+3. `Committer Operator` is a singleton responsible for committing and expiring snapshots.
+
+Next, we will go over end-to-end data flow. 
+
+
+{{< img src="/img/cdc-ingestion-source.png">}}
+
+`MySQL Cdc Source` read snapshot and incremental data and emit them to downstream after normalization.
+
+{{< img src="/img/cdc-ingestion-write.png">}}
+
+
+`Paimon Sink` first buffers new records in a heap-based LSM tree, and flushes them to disk when 
+the memory buffer is full. Note that each data file written is a sorted run. At this point, no manifest file and snapshot
+is created. Right before Flink checkpoint takes places, `Paimon Sink` will flush all buffered records and send committable message 
+to downstream, which is read and committed by `Committer Operator` during checkpoint.
+
+{{< img src="/img/cdc-ingestion-commit.png">}}
+
+During checkpoint, `Committer Operator` will create a new snapshot and associate it with manifest lists so that the snapshot  
+contains information about all data files in the table.
+
+{{< img src="/img/cdc-ingestion-compact.png">}}
+
+At later point asynchronous compaction might take place, and the committable produced by `CompactManager` contains information 
+about previous files and merged files so that `Committer Operator` can construct corresponding manifest entries. In this case 
+`Committer Operator` might produce two snapshot during Flink checkpoint, one for data written (snapshot of kind `Append`) and the 
+other for compact (snapshot of kind `Compact`). If no data file is written during checkpoint interval, only snapshot of kind `Compact` 
+will be created. `Committer Operator` will check against snapshot expiration and perform
+physical deletion of marked data files.
\ No newline at end of file
diff --git a/docs/static/img/cdc-ingestion-commit.png b/docs/static/img/cdc-ingestion-commit.png
new file mode 100644
index 000000000..c4ebb9589
Binary files /dev/null and b/docs/static/img/cdc-ingestion-commit.png differ
diff --git a/docs/static/img/cdc-ingestion-compact.png b/docs/static/img/cdc-ingestion-compact.png
new file mode 100644
index 000000000..25dcd5fff
Binary files /dev/null and b/docs/static/img/cdc-ingestion-compact.png differ
diff --git a/docs/static/img/cdc-ingestion-source.png b/docs/static/img/cdc-ingestion-source.png
new file mode 100644
index 000000000..437361eac
Binary files /dev/null and b/docs/static/img/cdc-ingestion-source.png differ
diff --git a/docs/static/img/cdc-ingestion-topology.png b/docs/static/img/cdc-ingestion-topology.png
new file mode 100644
index 000000000..0014b80e7
Binary files /dev/null and b/docs/static/img/cdc-ingestion-topology.png differ
diff --git a/docs/static/img/cdc-ingestion-write.png b/docs/static/img/cdc-ingestion-write.png
new file mode 100644
index 000000000..ac1e5f093
Binary files /dev/null and b/docs/static/img/cdc-ingestion-write.png differ
diff --git a/docs/static/img/file-operations-0.png b/docs/static/img/file-operations-0.png
new file mode 100644
index 000000000..112e7a1de
Binary files /dev/null and b/docs/static/img/file-operations-0.png differ
diff --git a/docs/static/img/file-operations-1.png b/docs/static/img/file-operations-1.png
new file mode 100644
index 000000000..170eed873
Binary files /dev/null and b/docs/static/img/file-operations-1.png differ
diff --git a/docs/static/img/file-operations-2.png b/docs/static/img/file-operations-2.png
new file mode 100644
index 000000000..ecf7ced28
Binary files /dev/null and b/docs/static/img/file-operations-2.png differ
diff --git a/docs/static/img/file-operations-3.png b/docs/static/img/file-operations-3.png
new file mode 100644
index 000000000..25ad93b88
Binary files /dev/null and b/docs/static/img/file-operations-3.png differ
diff --git a/docs/static/img/file-operations-4.png b/docs/static/img/file-operations-4.png
new file mode 100644
index 000000000..808bb599a
Binary files /dev/null and b/docs/static/img/file-operations-4.png differ


[incubator-paimon] 02/20: [flink] support system function truncate for computed column in CDC (#1148)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 64a5f51ed83100cb20a1d979ee541b423a6b98ec
Author: legendtkl <ta...@gmail.com>
AuthorDate: Mon May 15 14:50:26 2023 +0800

    [flink] support system function truncate for computed column in CDC (#1148)
---
 .../paimon/flink/action/cdc/mysql/Expression.java  | 102 ++++++++++++++-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  13 +-
 .../action/cdc/mysql/TruncateComputerTest.java     | 140 +++++++++++++++++++++
 3 files changed, 249 insertions(+), 6 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java
index 45a974dbd..7d24685ef 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java
@@ -25,6 +25,8 @@ import org.apache.paimon.utils.DateTimeUtils;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.List;
@@ -34,7 +36,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
 /** Produce a computation result for computed column. */
 public interface Expression extends Serializable {
 
-    List<String> SUPPORTED_EXPRESSION = Arrays.asList("year", "substring");
+    List<String> SUPPORTED_EXPRESSION = Arrays.asList("year", "substring", "truncate");
 
     /** Return name of referenced field. */
     String fieldReference();
@@ -52,6 +54,8 @@ public interface Expression extends Serializable {
                 return year(fieldReference);
             case "substring":
                 return substring(fieldReference, literals);
+            case "truncate":
+                return truncate(fieldReference, fieldType, literals);
                 // TODO: support more expression
             default:
                 throw new UnsupportedOperationException(
@@ -95,6 +99,15 @@ public interface Expression extends Serializable {
         return new Substring(fieldReference, beginInclusive, endExclusive);
     }
 
+    static Expression truncate(String fieldReference, DataType fieldType, String... literals) {
+        checkArgument(
+                literals.length == 1,
+                String.format(
+                        "'truncate' expression supports one argument, but found '%s'.",
+                        literals.length));
+        return new TruncateComputer(fieldReference, fieldType, literals[0]);
+    }
+
     /** Compute year from a time input. */
     final class YearComputer implements Expression {
 
@@ -165,4 +178,91 @@ public interface Expression extends Serializable {
             }
         }
     }
+
+    /** Truncate numeric/decimal/string value. */
+    final class TruncateComputer implements Expression {
+        private static final long serialVersionUID = 1L;
+
+        private final String fieldReference;
+
+        private DataType fieldType;
+
+        private int width;
+
+        TruncateComputer(String fieldReference, DataType fieldType, String literal) {
+            this.fieldReference = fieldReference;
+            this.fieldType = fieldType;
+            try {
+                this.width = Integer.parseInt(literal);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Invalid width value for truncate function: %s, expected integer.",
+                                literal));
+            }
+        }
+
+        @Override
+        public String fieldReference() {
+            return fieldReference;
+        }
+
+        @Override
+        public DataType outputType() {
+            return fieldType;
+        }
+
+        @Override
+        public String eval(String input) {
+            switch (fieldType.getTypeRoot()) {
+                case TINYINT:
+                case SMALLINT:
+                    return String.valueOf(truncateShort(width, Short.valueOf(input)));
+                case INTEGER:
+                    return String.valueOf(truncateInt(width, Integer.valueOf(input)));
+                case BIGINT:
+                    return String.valueOf(truncateLong(width, Long.valueOf(input)));
+                case DECIMAL:
+                    return truncateDecimal(BigInteger.valueOf(width), new BigDecimal(input))
+                            .toString();
+                case VARCHAR:
+                case CHAR:
+                    checkArgument(
+                            width <= input.length(),
+                            "Invalid width value for truncate function: %s, expected less than or equal to %s.",
+                            width,
+                            input.length());
+                    return input.substring(0, width);
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Unsupported field type for truncate function: %s.",
+                                    fieldType.getTypeRoot().toString()));
+            }
+        }
+
+        private short truncateShort(int width, short value) {
+            return (short) (value - (((value % width) + width) % width));
+        }
+
+        private int truncateInt(int width, int value) {
+            return value - (((value % width) + width) % width);
+        }
+
+        private long truncateLong(int width, long value) {
+            return value - (((value % width) + width) % width);
+        }
+
+        private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) {
+            BigDecimal remainder =
+                    new BigDecimal(
+                            value.unscaledValue()
+                                    .remainder(unscaledWidth)
+                                    .add(unscaledWidth)
+                                    .remainder(unscaledWidth),
+                            value.scale());
+
+            return value.subtract(remainder);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 4f135df39..ee3e7a8fd 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -788,7 +788,8 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                         "_year_datetime=year(_datetime)",
                         "_year_timestamp=year(_timestamp)",
                         "_substring_date1=substring(_date,2)",
-                        "_substring_date2=substring(_timestamp,5,10)");
+                        "_substring_date2=substring(_timestamp,5,10)",
+                        "_truncate_date=truncate(pk,2)");
 
         MySqlSyncTableAction action =
                 new MySqlSyncTableAction(
@@ -829,7 +830,8 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                                 DataTypes.INT(),
                                 DataTypes.INT(),
                                 DataTypes.STRING(),
-                                DataTypes.STRING()
+                                DataTypes.STRING(),
+                                DataTypes.INT()
                             },
                             new String[] {
                                 "pk",
@@ -840,12 +842,13 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
                                 "_year_datetime",
                                 "_year_timestamp",
                                 "_substring_date1",
-                                "_substring_date2"
+                                "_substring_date2",
+                                "_truncate_date"
                             });
             List<String> expected =
                     Arrays.asList(
-                            "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 23-03-23, 09-15]",
-                            "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL]");
+                            "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 23-03-23, 09-15, 0]",
+                            "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
             waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
         }
     }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java
new file mode 100644
index 000000000..c36fb8687
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link Expression.TruncateComputer}. */
+public class TruncateComputerTest {
+
+    private static Object[][] prepareData() {
+        return new Object[][] {
+            {"computedColumnField", "0", new TinyIntType(true), "10", "0"},
+            {"computedColumnField", "1", new TinyIntType(true), "10", "0"},
+            {"computedColumnField", "5", new TinyIntType(true), "10", "0"},
+            {"computedColumnField", "9", new TinyIntType(true), "10", "0"},
+            {"computedColumnField", "10", new TinyIntType(true), "10", "10"},
+            {"computedColumnField", "11", new TinyIntType(true), "10", "10"},
+            {"computedColumnField", "15", new TinyIntType(true), "10", "10"},
+            {"computedColumnField", "-1", new TinyIntType(true), "10", "-10"},
+            {"computedColumnField", "-5", new TinyIntType(true), "10", "-10"},
+            {"computedColumnField", "-9", new TinyIntType(true), "10", "-10"},
+            {"computedColumnField", "-10", new TinyIntType(true), "10", "-10"},
+            {"computedColumnField", "-11", new TinyIntType(true), "10", "-20"},
+            {"computedColumnField", "0", new SmallIntType(true), "10", "0"},
+            {"computedColumnField", "1", new SmallIntType(true), "10", "0"},
+            {"computedColumnField", "5", new SmallIntType(true), "10", "0"},
+            {"computedColumnField", "9", new SmallIntType(true), "10", "0"},
+            {"computedColumnField", "10", new SmallIntType(true), "10", "10"},
+            {"computedColumnField", "11", new SmallIntType(true), "10", "10"},
+            {"computedColumnField", "15", new SmallIntType(true), "10", "10"},
+            {"computedColumnField", "-1", new SmallIntType(true), "10", "-10"},
+            {"computedColumnField", "-5", new SmallIntType(true), "10", "-10"},
+            {"computedColumnField", "-9", new SmallIntType(true), "10", "-10"},
+            {"computedColumnField", "-10", new SmallIntType(true), "10", "-10"},
+            {"computedColumnField", "-11", new SmallIntType(true), "10", "-20"},
+            {"computedColumnField", "0", new IntType(true), "10", "0"},
+            {"computedColumnField", "1", new IntType(true), "10", "0"},
+            {"computedColumnField", "5", new IntType(true), "10", "0"},
+            {"computedColumnField", "9", new IntType(true), "10", "0"},
+            {"computedColumnField", "10", new IntType(true), "10", "10"},
+            {"computedColumnField", "11", new IntType(true), "10", "10"},
+            {"computedColumnField", "15", new IntType(true), "10", "10"},
+            {"computedColumnField", "-1", new IntType(true), "10", "-10"},
+            {"computedColumnField", "-5", new IntType(true), "10", "-10"},
+            {"computedColumnField", "-9", new IntType(true), "10", "-10"},
+            {"computedColumnField", "-10", new IntType(true), "10", "-10"},
+            {"computedColumnField", "-11", new IntType(true), "10", "-20"},
+            {"computedColumnField", "0", new BigIntType(true), "10", "0"},
+            {"computedColumnField", "1", new BigIntType(true), "10", "0"},
+            {"computedColumnField", "5", new BigIntType(true), "10", "0"},
+            {"computedColumnField", "9", new BigIntType(true), "10", "0"},
+            {"computedColumnField", "10", new BigIntType(true), "10", "10"},
+            {"computedColumnField", "11", new BigIntType(true), "10", "10"},
+            {"computedColumnField", "15", new BigIntType(true), "10", "10"},
+            {"computedColumnField", "-1", new BigIntType(true), "10", "-10"},
+            {"computedColumnField", "-5", new BigIntType(true), "10", "-10"},
+            {"computedColumnField", "-9", new BigIntType(true), "10", "-10"},
+            {"computedColumnField", "-10", new BigIntType(true), "10", "-10"},
+            {"computedColumnField", "-11", new BigIntType(true), "10", "-20"},
+            {"computedColumnField", "12.34", new DecimalType(9, 2), "10", "12.30"},
+            {"computedColumnField", "12.30", new DecimalType(9, 2), "10", "12.30"},
+            {"computedColumnField", "12.29", new DecimalType(9, 2), "10", "12.20"},
+            {"computedColumnField", "0.05", new DecimalType(9, 2), "10", "0.00"},
+            {"computedColumnField", "-0.05", new DecimalType(9, 2), "10", "-0.10"},
+            {"computedColumnField", "abcde", new VarCharType(true, 5), "3", "abc"},
+            {"computedColumnField", "abcdefg", new VarCharType(true, 7), "3", "abc"},
+            {"computedColumnField", "abcdefg", new VarCharType(true, 7), "5", "abcde"},
+            {"computedColumnField", "abcdefg", new VarCharType(true, 7), "7", "abcdefg"},
+            {"computedColumnField", "abcde", new CharType(true, 5), "3", "abc"},
+            {"computedColumnField", "abcdefg", new CharType(true, 7), "3", "abc"},
+            {"computedColumnField", "abcdefg", new CharType(true, 7), "5", "abcde"},
+            {"computedColumnField", "abcdefg", new CharType(true, 7), "7", "abcdefg"},
+        };
+    }
+
+    @Test
+    public void testTruncate() {
+        Object[][] testData = prepareData();
+        for (int i = 0; i < testData.length; i++) {
+            String field = (String) testData[i][0];
+            String value = (String) testData[i][1];
+            DataType dataType = (DataType) testData[i][2];
+            String literal = (String) testData[i][3];
+            String expected = (String) testData[i][4];
+
+            Expression.TruncateComputer truncateComputer =
+                    new Expression.TruncateComputer(field, dataType, literal);
+            assertThat(truncateComputer.eval(value)).isEqualTo(expected);
+        }
+    }
+
+    @Test
+    public void testTruncateWithException() {
+        String fieldReference = "computedColumnField";
+        DataType dataType = new CharType(true, 5);
+        Expression.TruncateComputer truncateComputer =
+                new Expression.TruncateComputer(fieldReference, dataType, "7");
+
+        assertThatThrownBy(() -> truncateComputer.eval("abcde"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Invalid width value for truncate function: 7, expected less than or equal to 5.");
+
+        DataType notSupportedDataType = new BooleanType();
+        Expression.TruncateComputer notSupportTruncateComputer =
+                new Expression.TruncateComputer(fieldReference, notSupportedDataType, "7");
+        assertThatThrownBy(() -> notSupportTruncateComputer.eval("true"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported field type for truncate function: BOOLEAN");
+    }
+}


[incubator-paimon] 04/20: [doc][flink] add user doc for CDC computed column function `truncate`. (#1157)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit b842039556d4c55b1e667647914e35ce0380ad56
Author: legendtkl <ta...@gmail.com>
AuthorDate: Tue May 16 10:24:24 2023 +0800

    [doc][flink] add user doc for CDC computed column function `truncate`. (#1157)
---
 docs/content/how-to/cdc-ingestion.md | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md
index 40627e2b9..a905d144d 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -61,6 +61,10 @@ To use this feature through `flink run`, run the following shell command.
   * year(date-column): Extract year from a DATE, DATETIME or TIMESTAMP. Output is an INT value represent the year.
   * substring(column,beginInclusive): Get column.substring(beginInclusive). Output is a STRING.
   * substring(column,beginInclusive,endExclusive): Get column.substring(beginInclusive,endExclusive). Output is a STRING.
+  * truncate(column,width): truncate column by width. Output type is same with column.
+    * If the column is a STRING, truncate(column,width) will truncate the string to width characters, namely `value.substring(0, width)`.
+    * If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive.
+    * If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.
 * `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password`, `database-name` and `table-name` are required configurations, others are optional. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options) for a complete list of configurations.
 * `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations.
 * `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations. 


[incubator-paimon] 01/20: [doc] Updated use case bullet point (#1150)

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 66c2a5fa8eb5d31e710b5a97c6e69c48d07a31f4
Author: mans2singh <ma...@users.noreply.github.com>
AuthorDate: Sun May 14 21:45:35 2023 -0400

    [doc] Updated use case bullet point (#1150)
---
 docs/content/maintenance/rescale-bucket.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/content/maintenance/rescale-bucket.md b/docs/content/maintenance/rescale-bucket.md
index 89e510333..a2d74e2dc 100644
--- a/docs/content/maintenance/rescale-bucket.md
+++ b/docs/content/maintenance/rescale-bucket.md
@@ -148,7 +148,7 @@ and the job's latency keeps increasing. To improve the data freshness, users can
   FROM verified_orders
   WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');
   ```
-- After overwrite job finished, switch back to streaming mode. And now, the parallelism can be increased alongside with bucket number to restore the streaming job from the savepoint 
+- After overwrite job has finished, switch back to streaming mode. And now, the parallelism can be increased alongside with bucket number to restore the streaming job from the savepoint 
 ( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) )
   ```sql
   SET 'execution.runtime-mode' = 'streaming';


[incubator-paimon] 15/20: [test] Increase timeout in MySqlSyncTableActionITCase.testAllTypes

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit d81f2ec3ea239c8e2137a33e0f3aa31c58c50fc1
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon May 22 15:29:25 2023 +0800

    [test] Increase timeout in MySqlSyncTableActionITCase.testAllTypes
---
 .../paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index ee3e7a8fd..1247ac2aa 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -342,7 +342,7 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
     }
 
     @Test
-    @Timeout(30)
+    @Timeout(90)
     public void testAllTypes() throws Exception {
         // the first round checks for table creation
         // the second round checks for running the action on an existing table