You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/03/28 02:13:15 UTC
[iceberg] branch master updated: Flink: Fix UPSERT delete file metadata (#4364)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 340a0c5 Flink: Fix UPSERT delete file metadata (#4364)
340a0c5 is described below
commit 340a0c5f30a7fb1e33dde4c1f082fd4cb9f7472f
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Sun Mar 27 19:12:55 2022 -0700
Flink: Fix UPSERT delete file metadata (#4364)
Co-authored-by: liliwei <hi...@gmail.com>
---
build.gradle | 2 +
.../java/org/apache/iceberg/io/BaseTaskWriter.java | 7 +-
.../iceberg/io/TestTaskEqualityDeltaWriter.java | 6 +
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 5 +
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 5 +
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 13 +-
.../flink/sink/RowDataTaskWriterFactory.java | 9 +-
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 287 +++++++++++++++++++++
8 files changed, 331 insertions(+), 3 deletions(-)
diff --git a/build.gradle b/build.gradle
index 639e82a..63d257f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -140,6 +140,8 @@ subprojects {
}
})
+ maxHeapSize = "1500m"
+
testLogging {
events "failed"
exceptionFormat "full"
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 46f997e..0d927fb 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -115,6 +115,11 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
*/
protected abstract StructLike asStructLike(T data);
+ /**
+ * Wrap the passed in key of a row as a {@link StructLike}
+ */
+ protected abstract StructLike asStructLikeKey(T key);
+
public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
@@ -167,7 +172,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
- if (!internalPosDelete(asStructLike(key))) {
+ if (!internalPosDelete(asStructLikeKey(key))) {
eqDeleteWriter.write(key);
}
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index f373b3a..82c1944 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -461,6 +461,7 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase {
deltaWriter.delete(row);
}
+ // The caller of this function is responsible for passing in a record with only the key fields
public void deleteKey(Record key) throws IOException {
deltaWriter.deleteKey(key);
}
@@ -479,6 +480,11 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase {
protected StructLike asStructLike(Record row) {
return row;
}
+
+ @Override
+ protected StructLike asStructLikeKey(Record data) {
+ return data;
+ }
}
}
diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129..9b5d01c 100644
--- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -103,5 +103,10 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ throw new UnsupportedOperationException("Not implemented for Flink 1.12 during PR review");
+ }
}
}
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129..fad9486 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -103,5 +103,10 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ throw new UnsupportedOperationException("Not implemented for Flink 1.13 during PR review");
+ }
}
}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129..16262b2 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
@@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
+ private final RowDataWrapper keyWrapper;
+ private final RowDataProjection keyProjection;
private final boolean upsert;
BaseDeltaTaskWriter(PartitionSpec spec,
@@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+ this.keyProjection = RowDataProjection.create(schema, deleteSchema);
this.upsert = upsert;
}
@@ -74,7 +80,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
- writer.delete(row);
+ writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
@@ -103,5 +109,10 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ return keyWrapper.wrap(data);
+ }
}
}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 2849100..5144305 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -35,6 +35,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
@@ -69,8 +71,13 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row
+ // may differ from the deleted row other than the primary key fields, and the delete file must contain values
+ // that are correct for the deleted row. Therefore, only write the equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
new file mode 100644
index 0000000..6ec35e2
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkUpsert extends FlinkCatalogTestBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private final boolean isStreamingJob;
+ private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+ private TableEnvironment tEnv;
+
+ public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) {
+ super(catalogName, baseNamespace);
+ this.isStreamingJob = isStreamingJob;
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+ tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+ }
+
+ @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
+ public static Iterable<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop catalog.
+ String catalogName = "testhadoop";
+ Namespace baseNamespace = Namespace.of("default");
+ parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
+ }
+ }
+ return parameters;
+ }
+
+ @Override
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+ .newInstance();
+ if (isStreamingJob) {
+ settingsBuilder.inStreamingMode();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ env.enableCheckpointing(400);
+ env.setMaxParallelism(2);
+ env.setParallelism(2);
+ tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+ } else {
+ settingsBuilder.inBatchMode();
+ tEnv = TableEnvironment.create(settingsBuilder.build());
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ @Override
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ @Override
+ @After
+ public void clean() {
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ @Test
+ public void testUpsertAndQuery() {
+ String tableName = "test_upsert_query";
+ LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+ LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+
+ sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) " +
+ "PARTITIONED BY (province) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ try {
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'a', TO_DATE('2022-03-01'))," +
+ "(2, 'b', TO_DATE('2022-03-01'))," +
+ "(1, 'b', TO_DATE('2022-03-01'))",
+ tableName);
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'a', TO_DATE('2022-03-02'))," +
+ "(5, 'b', TO_DATE('2022-03-02'))," +
+ "(1, 'b', TO_DATE('2022-03-02'))",
+ tableName);
+
+ List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
+ rowsOn20220301);
+
+ List<Row> rowsOn20220302 = Lists.newArrayList(
+ Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
+ rowsOn20220302);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyEqualToPartitionKey() {
+ // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
+ String tableName = "upsert_on_data_key";
+ try {
+ sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa')," +
+ "(2, 'aaa')," +
+ "(3, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'aaa')," +
+ "(5, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+
+ sql("INSERT INTO %s VALUES " +
+ "(6, 'aaa')," +
+ "(7, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+ String tableName = "upsert_on_pk_at_schema_start";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ try {
+ sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 1)," +
+ "('aaa', TO_DATE('2022-03-01'), 2)," +
+ "('bbb', TO_DATE('2022-03-01'), 3)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 4)," +
+ "('bbb', TO_DATE('2022-03-01'), 5)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 6)," +
+ "('bbb', TO_DATE('2022-03-01'), 7)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+ // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key fields
+ // are located at the end of the flink schema.
+ String tableName = "upsert_on_pk_at_schema_end";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ try {
+ sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa', TO_DATE('2022-03-01'))," +
+ "(2, 'aaa', TO_DATE('2022-03-01'))," +
+ "(3, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'aaa', TO_DATE('2022-03-01'))," +
+ "(5, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+
+ sql("INSERT INTO %s VALUES " +
+ "(6, 'aaa', TO_DATE('2022-03-01'))," +
+ "(7, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+}