You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/03/28 02:13:15 UTC

[iceberg] branch master updated: Flink: Fix UPSERT delete file metadata (#4364)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 340a0c5  Flink: Fix UPSERT delete file metadata (#4364)
340a0c5 is described below

commit 340a0c5f30a7fb1e33dde4c1f082fd4cb9f7472f
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Sun Mar 27 19:12:55 2022 -0700

    Flink: Fix UPSERT delete file metadata (#4364)
    
    Co-authored-by: liliwei <hi...@gmail.com>
---
 build.gradle                                       |   2 +
 .../java/org/apache/iceberg/io/BaseTaskWriter.java |   7 +-
 .../iceberg/io/TestTaskEqualityDeltaWriter.java    |   6 +
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |   5 +
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |   5 +
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  13 +-
 .../flink/sink/RowDataTaskWriterFactory.java       |   9 +-
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  | 287 +++++++++++++++++++++
 8 files changed, 331 insertions(+), 3 deletions(-)

diff --git a/build.gradle b/build.gradle
index 639e82a..63d257f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -140,6 +140,8 @@ subprojects {
       }
     })
 
+    maxHeapSize = "1500m"
+
     testLogging {
       events "failed"
       exceptionFormat "full"
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 46f997e..0d927fb 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -115,6 +115,11 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
      */
     protected abstract StructLike asStructLike(T data);
 
+    /**
+     * Wrap the passed in key of a row as a {@link StructLike}
+     */
+    protected abstract StructLike asStructLikeKey(T key);
+
     public void write(T row) throws IOException {
       PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
 
@@ -167,7 +172,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
      * @param key is the projected data whose columns are the same as the equality fields.
      */
     public void deleteKey(T key) throws IOException {
-      if (!internalPosDelete(asStructLike(key))) {
+      if (!internalPosDelete(asStructLikeKey(key))) {
         eqDeleteWriter.write(key);
       }
     }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index f373b3a..82c1944 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -461,6 +461,7 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase {
       deltaWriter.delete(row);
     }
 
+    // The caller of this function is responsible for passing in a record with only the key fields
     public void deleteKey(Record key) throws IOException {
       deltaWriter.deleteKey(key);
     }
@@ -479,6 +480,11 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase {
       protected StructLike asStructLike(Record row) {
         return row;
       }
+
+      @Override
+      protected StructLike asStructLikeKey(Record data) {
+        return data;
+      }
     }
   }
 
diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129..9b5d01c 100644
--- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -103,5 +103,10 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
     protected StructLike asStructLike(RowData data) {
       return wrapper.wrap(data);
     }
+
+    @Override
+    protected StructLike asStructLikeKey(RowData data) {
+      throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review");
+    }
   }
 }
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129..fad9486 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -103,5 +103,10 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
     protected StructLike asStructLike(RowData data) {
       return wrapper.wrap(data);
     }
+
+    @Override
+    protected StructLike asStructLikeKey(RowData data) {
+      throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review");
+    }
   }
 }
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129..16262b2 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
 import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
@@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
   private final Schema schema;
   private final Schema deleteSchema;
   private final RowDataWrapper wrapper;
+  private final RowDataWrapper keyWrapper;
+  private final RowDataProjection keyProjection;
   private final boolean upsert;
 
   BaseDeltaTaskWriter(PartitionSpec spec,
@@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
     this.schema = schema;
     this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+    this.keyWrapper =  new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+    this.keyProjection = RowDataProjection.create(schema, deleteSchema);
     this.upsert = upsert;
   }
 
@@ -74,7 +80,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
       case INSERT:
       case UPDATE_AFTER:
         if (upsert) {
-          writer.delete(row);
+          writer.deleteKey(keyProjection.wrap(row));
         }
         writer.write(row);
         break;
@@ -103,5 +109,10 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
     protected StructLike asStructLike(RowData data) {
       return wrapper.wrap(data);
     }
+
+    @Override
+    protected StructLike asStructLikeKey(RowData data) {
+      return keyWrapper.wrap(data);
+    }
   }
 }
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 2849100..5144305 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -35,6 +35,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
@@ -69,8 +71,13 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
 
     if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+    } else if (upsert) {
+      // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
+      // may differ from the deleted row other than the primary key fields, and the delete file must contain values
+      // that are correct for the deleted row. Therefore, only write the equality delete fields.
+      this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+          ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
     } else {
-      // TODO provide the ability to customize the equality-delete row schema.
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
           ArrayUtil.toIntArray(equalityFieldIds), schema, null);
     }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
new file mode 100644
index 0000000..6ec35e2
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkUpsert extends FlinkCatalogTestBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final boolean isStreamingJob;
+  private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+  private TableEnvironment tEnv;
+
+  public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) {
+    super(catalogName, baseNamespace);
+    this.isStreamingJob = isStreamingJob;
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+    tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+  }
+
+  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
+  public static Iterable<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) {
+      for (Boolean isStreaming : new Boolean[] {true, false}) {
+        // Only test with one catalog as this is a file operation concern.
+        // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop catalog.
+        String catalogName = "testhadoop";
+        Namespace baseNamespace = Namespace.of("default");
+        parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
+      }
+    }
+    return parameters;
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+            .newInstance();
+        if (isStreamingJob) {
+          settingsBuilder.inStreamingMode();
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+          env.enableCheckpointing(400);
+          env.setMaxParallelism(2);
+          env.setParallelism(2);
+          tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+        } else {
+          settingsBuilder.inBatchMode();
+          tEnv = TableEnvironment.create(settingsBuilder.build());
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  @Override
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+  }
+
+  @Override
+  @After
+  public void clean() {
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  @Test
+  public void testUpsertAndQuery() {
+    String tableName = "test_upsert_query";
+    LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+    LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+
+    sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) " +
+            "PARTITIONED BY (province) WITH %s",
+        tableName, toWithClause(tableUpsertProps));
+
+    try {
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'a', TO_DATE('2022-03-01'))," +
+          "(2, 'b', TO_DATE('2022-03-01'))," +
+          "(1, 'b', TO_DATE('2022-03-01'))",
+          tableName);
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'a', TO_DATE('2022-03-02'))," +
+          "(5, 'b', TO_DATE('2022-03-02'))," +
+          "(1, 'b', TO_DATE('2022-03-02'))",
+          tableName);
+
+      List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
+          rowsOn20220301);
+
+      List<Row> rowsOn20220302 = Lists.newArrayList(
+          Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
+          rowsOn20220302);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyEqualToPartitionKey() {
+    // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
+    String tableName = "upsert_on_data_key";
+    try {
+      sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa')," +
+          "(2, 'aaa')," +
+          "(3, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'aaa')," +
+          "(5, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+
+      sql("INSERT INTO %s VALUES " +
+          "(6, 'aaa')," +
+          "(7, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+    String tableName = "upsert_on_pk_at_schema_start";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    try {
+      sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 1)," +
+          "('aaa', TO_DATE('2022-03-01'), 2)," +
+          "('bbb', TO_DATE('2022-03-01'), 3)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 4)," +
+          "('bbb', TO_DATE('2022-03-01'), 5)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 6)," +
+          "('bbb', TO_DATE('2022-03-01'), 7)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+    // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key fields
+    // are located at the end of the flink schema.
+    String tableName = "upsert_on_pk_at_schema_end";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    try {
+      sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa', TO_DATE('2022-03-01'))," +
+          "(2, 'aaa', TO_DATE('2022-03-01'))," +
+          "(3, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'aaa', TO_DATE('2022-03-01'))," +
+          "(5, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+
+      sql("INSERT INTO %s VALUES " +
+          "(6, 'aaa', TO_DATE('2022-03-01'))," +
+          "(7, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+}