You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/12 14:24:44 UTC

[iceberg] branch 0.13.x updated: Flink 1.12: Log a warning message when upsert is enabled (#4754)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new 70369fcdc9 Flink 1.12: Log a warning message when upsert is enabled (#4754)
70369fcdc9 is described below

commit 70369fcdc994d34108ef929d89044ada70157867
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Thu May 12 07:24:35 2022 -0700

    Flink 1.12: Log a warning message when upsert is enabled (#4754)
---
 .../java/org/apache/iceberg/flink/sink/FlinkSink.java  | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 6714ae8357..b6329c8d00 100644
--- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -366,6 +366,24 @@ public class FlinkSink {
       boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
           UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
 
+      // `upsert` mode should not be used in Flink 1.12 due to correctness issues.
+      // As part of a patch release, apache-iceberg-flink-runtime_1.12:0.13.2,
+      // it has been decided the best course of action would be to log a warning
+      // asking people to upgrade, as Flink 1.12 has been deprecated in upstream Apache Flink
+      // for some time as well as will be removed in the next major Iceberg release, Iceberg 0.14.0.
+      //
+      // But we allow the configuration given that it's a patch release and the change would be otherwise
+      // too breaking for a patch release.
+      //
+      // See https://github.com/apache/iceberg/pull/4364 for more information.
+      if (upsertMode) {
+        LOG.error(
+            "This table sink is running in upsert mode. Upsert mode should not be used with Flink 1.12 because " +
+            "it will write incorrect delete file metadata, which could prevent deletes from being correctly" +
+            "applied. Upgrading to Flink 1.13+ is recommended. " +
+            "To safely use Flink 1.12, set manifest metrics to counts only.");
+      }
+
       // Validate the equality fields and partition fields if we enable the upsert mode.
       if (upsertMode) {
         Preconditions.checkState(!overwrite,