You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2022/02/28 06:46:39 UTC
[iceberg] branch master updated: Flink 1.14: Add EqualityFieldKeySelector. (#2898)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3efc838 Flink 1.14: Add EqualityFieldKeySelector. (#2898)
3efc838 is described below
commit 3efc8383138523235a679f3ade5d2d22a55be8bc
Author: Reo <Re...@users.noreply.github.com>
AuthorDate: Mon Feb 28 14:46:25 2022 +0800
Flink 1.14: Add EqualityFieldKeySelector. (#2898)
---
.../flink/sink/EqualityFieldKeySelector.java | 91 ++++++++++++++++++
.../org/apache/iceberg/flink/sink/FlinkSink.java | 105 +++++++++++++++------
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 91 +++++++++++++-----
3 files changed, 236 insertions(+), 51 deletions(-)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
new file mode 100644
index 0000000..5668956
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record will be emitted to
+ * same writer in order.
+ */
+class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
+
+ private final Schema schema;
+ private final RowType flinkSchema;
+ private final Schema deleteSchema;
+
+ private transient RowDataWrapper rowDataWrapper;
+ private transient StructProjection structProjection;
+ private transient StructLikeWrapper structLikeWrapper;
+
+ EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
+ this.schema = schema;
+ this.flinkSchema = flinkSchema;
+ this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+ }
+
+ /**
+ * Construct the {@link RowDataWrapper} lazily here because few members in it are not serializable. In this way, we
+ * don't have to serialize them with forcing.
+ */
+ protected RowDataWrapper lazyRowDataWrapper() {
+ if (rowDataWrapper == null) {
+ rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+ return rowDataWrapper;
+ }
+
+ /**
+ * Construct the {@link StructProjection} lazily because it is not serializable.
+ */
+ protected StructProjection lazyStructProjection() {
+ if (structProjection == null) {
+ structProjection = StructProjection.create(schema, deleteSchema);
+ }
+ return structProjection;
+ }
+
+ /**
+ * Construct the {@link StructLikeWrapper} lazily because it is not serializable.
+ */
+ protected StructLikeWrapper lazyStructLikeWrapper() {
+ if (structLikeWrapper == null) {
+ structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct());
+ }
+ return structLikeWrapper;
+ }
+
+ @Override
+ public Integer getKey(RowData row) {
+ RowDataWrapper wrappedRowData = lazyRowDataWrapper().wrap(row);
+ StructProjection projectedRowData = lazyStructProjection().wrap(wrappedRowData);
+ StructLikeWrapper wrapper = lazyStructLikeWrapper().set(projectedRowData);
+ return wrapper.hashCode();
+ }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index f4b8d7e..d15a5f2 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -24,6 +24,7 @@ import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,9 +51,11 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
@@ -146,13 +149,14 @@ public class FlinkSink {
MapFunction<T, RowData> mapper,
TypeInformation<RowData> outputType) {
this.inputCreator = newUidPrefix -> {
+ // Input stream order is crucial for some situation(e.g. in cdc case). Therefore, we need to set the parallelism
+ // of map operator same as its input to keep map operator chaining its input, and avoid rebalanced by default.
+ SingleOutputStreamOperator<RowData> inputStream = input.map(mapper, outputType)
+ .setParallelism(input.getParallelism());
if (newUidPrefix != null) {
- return input.map(mapper, outputType)
- .name(operatorName(newUidPrefix))
- .uid(newUidPrefix + "-mapper");
- } else {
- return input.map(mapper, outputType);
+ inputStream.name(operatorName(newUidPrefix)).uid(newUidPrefix + "-mapper");
}
+ return inputStream;
};
return this;
}
@@ -295,15 +299,19 @@ public class FlinkSink {
}
}
+ // Find out the equality field id list based on the user-provided equality field column names.
+ List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
+
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
- // Distribute the records from input data stream based on the write.distribution-mode.
+ // Distribute the records from input data stream based on the write.distribution-mode and equality fields.
DataStream<RowData> distributeStream = distributeDataStream(
- rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+ rowDataInput, table.properties(), equalityFieldIds, table.spec(), table.schema(), flinkRowType);
// Add parallel writers that append rows to files
- SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
+ SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType,
+ equalityFieldIds);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -338,6 +346,28 @@ public class FlinkSink {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
+ @VisibleForTesting
+ List<Integer> checkAndGetEqualityFieldIds() {
+ List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
+ if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+ Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+ for (String column : equalityFieldColumns) {
+ org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+ Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
+ column, table.schema());
+ equalityFieldSet.add(field.fieldId());
+ }
+
+ if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+ LOG.warn("The configured equality field column IDs {} are not matched with the schema identifier field IDs" +
+ " {}, use job specified equality field columns as the equality fields by default.",
+ equalityFieldSet, table.schema().identifierFieldIds());
+ }
+ equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+ }
+ return equalityFieldIds;
+ }
+
@SuppressWarnings("unchecked")
private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<T> resultStream = committerStream
@@ -362,17 +392,8 @@ public class FlinkSink {
return committerStream;
}
- private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
- // Find out the equality field id list based on the user-provided equality field column names.
- List<Integer> equalityFieldIds = Lists.newArrayList();
- if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
- for (String column : equalityFieldColumns) {
- org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
- Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
- column, table.schema());
- equalityFieldIds.add(field.fieldId());
- }
- }
+ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
+ List<Integer> equalityFieldIds) {
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
@@ -407,6 +428,7 @@ public class FlinkSink {
private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
Map<String, String> properties,
+ List<Integer> equalityFieldIds,
PartitionSpec partitionSpec,
Schema iSchema,
RowType flinkRowType) {
@@ -422,24 +444,53 @@ public class FlinkSink {
writeMode = distributionMode;
}
+ LOG.info("Write distribution mode is '{}'", writeMode.modeName());
switch (writeMode) {
case NONE:
- return input;
+ if (equalityFieldIds.isEmpty()) {
+ return input;
+ } else {
+ LOG.info("Distribute rows by equality fields, because there are equality fields set");
+ return input.keyBy(new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+ }
case HASH:
- if (partitionSpec.isUnpartitioned()) {
- return input;
+ if (equalityFieldIds.isEmpty()) {
+ if (partitionSpec.isUnpartitioned()) {
+ LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " +
+ "and table is unpartitioned");
+ return input;
+ } else {
+ return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ }
} else {
- return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ if (partitionSpec.isUnpartitioned()) {
+ LOG.info("Distribute rows by equality fields, because there are equality fields set " +
+ "and table is unpartitioned");
+ return input.keyBy(new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+ } else {
+ for (PartitionField partitionField : partitionSpec.fields()) {
+ Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+ "In 'hash' distribution mode with equality fields set, partition field '%s' " +
+ "should be included in equality fields: '%s'", partitionField, equalityFieldColumns);
+ }
+ return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ }
}
case RANGE:
- LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
- WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
- return input;
+ if (equalityFieldIds.isEmpty()) {
+ LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " +
+ "and {}=range is not supported yet in flink", WRITE_DISTRIBUTION_MODE);
+ return input;
+ } else {
+ LOG.info("Distribute rows by equality fields, because there are equality fields set " +
+ "and{}=range is not supported yet in flink", WRITE_DISTRIBUTION_MODE);
+ return input.keyBy(new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+ }
default:
- throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+ throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
}
}
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 90b1a6d..23169d1 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -49,6 +49,8 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
@@ -84,36 +86,39 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
private final FileFormat format;
private final int parallelism;
private final boolean partitioned;
+ private final String writeDistributionMode;
private StreamExecutionEnvironment env;
private TestTableLoader tableLoader;
- @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+ @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
public static Object[][] parameters() {
return new Object[][] {
- new Object[] {"avro", 1, true},
- new Object[] {"avro", 1, false},
- new Object[] {"avro", 2, true},
- new Object[] {"avro", 2, false},
- new Object[] {"orc", 1, true},
- new Object[] {"orc", 1, false},
- new Object[] {"orc", 2, true},
- new Object[] {"orc", 2, false},
- new Object[] {"parquet", 1, true},
- new Object[] {"parquet", 1, false},
- new Object[] {"parquet", 2, true},
- new Object[] {"parquet", 2, false}
+ new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+
+ new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+
+ new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+ new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+ new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+ new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
};
}
- public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+ public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned, String writeDistributionMode) {
super(FORMAT_V2);
this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
this.parallelism = parallelism;
this.partitioned = partitioned;
+ this.writeDistributionMode = writeDistributionMode;
}
- @Override
@Before
public void setupTable() throws IOException {
this.tableDir = temp.newFolder();
@@ -128,6 +133,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
table.updateProperties()
.set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode)
.commit();
env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
@@ -155,10 +161,6 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
- // Shuffle by the equality key, so that different operations of the same key could be wrote in order when
- // executing tasks in parallel.
- dataStream = dataStream.keyBy(keySelector);
-
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.tableLoader(tableLoader)
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
@@ -197,6 +199,37 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
}
@Test
+ public void testCheckAndGetEqualityFieldIds() {
+ table.updateSchema()
+ .allowIncompatibleChanges()
+ .addRequiredColumn("type", Types.StringType.get())
+ .setIdentifierFields("type")
+ .commit();
+
+ DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+ FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
+
+ // Use schema identifier field IDs as equality field id list by default
+ Assert.assertEquals(
+ table.schema().identifierFieldIds(),
+ Sets.newHashSet(builder.checkAndGetEqualityFieldIds())
+ );
+
+ // Use user-provided equality field column as equality field id list
+ builder.equalityFieldColumns(Lists.newArrayList("id"));
+ Assert.assertEquals(
+ Sets.newHashSet(table.schema().findField("id").fieldId()),
+ Sets.newHashSet(builder.checkAndGetEqualityFieldIds())
+ );
+
+ builder.equalityFieldColumns(Lists.newArrayList("type"));
+ Assert.assertEquals(
+ Sets.newHashSet(table.schema().findField("type").fieldId()),
+ Sets.newHashSet(builder.checkAndGetEqualityFieldIds())
+ );
+ }
+
+ @Test
public void testChangeLogOnIdKey() throws Exception {
List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
ImmutableList.of(
@@ -227,8 +260,18 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
);
- testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
- elementsPerCheckpoint, expectedRecords);
+ if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
+ AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
+ IllegalStateException.class, "should be included in equality fields",
+ () -> {
+ testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
+ elementsPerCheckpoint, expectedRecords);
+ return null;
+ });
+ } else {
+ testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
+ elementsPerCheckpoint, expectedRecords);
+ }
}
@Test
@@ -343,7 +386,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
AssertHelpers.assertThrows("Should be error because upsert mode and overwrite mode enable at the same time.",
IllegalStateException.class, "OVERWRITE mode shouldn't be enable",
- () -> builder.equalityFieldColumns(ImmutableList.of("id")).overwrite(true).append()
+ () -> builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append()
);
AssertHelpers.assertThrows("Should be error because equality field columns are empty.",
@@ -381,8 +424,8 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
IllegalStateException.class, "should be included in equality fields",
() -> {
- testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true,
+ elementsPerCheckpoint, expectedRecords);
return null;
});
}