You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/12 14:24:44 UTC
[iceberg] branch 0.13.x updated: Flink 1.12: Log a warning message when upsert is enabled (#4754)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new 70369fcdc9 Flink 1.12: Log a warning message when upsert is enabled (#4754)
70369fcdc9 is described below
commit 70369fcdc994d34108ef929d89044ada70157867
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Thu May 12 07:24:35 2022 -0700
Flink 1.12: Log a warning message when upsert is enabled (#4754)
---
.../java/org/apache/iceberg/flink/sink/FlinkSink.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 6714ae8357..b6329c8d00 100644
--- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -366,6 +366,24 @@ public class FlinkSink {
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+ // `upsert` mode should not be used in Flink 1.12 due to correctness issues.
+ // As part of a patch release, apache-iceberg-flink-runtime_1.12:0.13.2,
+ // it has been decided the best course of action would be to log a warning
+ // asking people to upgrade, as Flink 1.12 has been deprecated in upstream Apache Flink
+ // for some time as well as will be removed in the next major Iceberg release, Iceberg 0.14.0.
+ //
+ // But we allow the configuration given that it's a patch release and the change would be otherwise
+ // too breaking for a patch release.
+ //
+ // See https://github.com/apache/iceberg/pull/4364 for more information.
+ if (upsertMode) {
+ LOG.error(
+ "This table sink is running in upsert mode. Upsert mode should not be used with Flink 1.12 because " +
+ "it will write incorrect delete file metadata, which could prevent deletes from being correctly" +
+ "applied. Upgrading to Flink 1.13+ is recommended. " +
+ "To safely use Flink 1.12, set manifest metrics to counts only.");
+ }
+
// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
Preconditions.checkState(!overwrite,