You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2022/02/28 06:46:39 UTC

[iceberg] branch master updated: Flink 1.14: Add EqualityFieldKeySelector. (#2898)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3efc838  Flink 1.14:  Add EqualityFieldKeySelector. (#2898)
3efc838 is described below

commit 3efc8383138523235a679f3ade5d2d22a55be8bc
Author: Reo <Re...@users.noreply.github.com>
AuthorDate: Mon Feb 28 14:46:25 2022 +0800

    Flink 1.14:  Add EqualityFieldKeySelector. (#2898)
---
 .../flink/sink/EqualityFieldKeySelector.java       |  91 ++++++++++++++++++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   | 105 +++++++++++++++------
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java |  91 +++++++++++++-----
 3 files changed, 236 insertions(+), 51 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
new file mode 100644
index 0000000..5668956
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record will be emitted to
+ * same writer in order.
+ */
+class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
+
+  private final Schema schema;
+  private final RowType flinkSchema;
+  private final Schema deleteSchema;
+
+  private transient RowDataWrapper rowDataWrapper;
+  private transient StructProjection structProjection;
+  private transient StructLikeWrapper structLikeWrapper;
+
+  EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
+    this.schema = schema;
+    this.flinkSchema = flinkSchema;
+    this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+  }
+
+  /**
+   * Construct the {@link RowDataWrapper} lazily here because few members in it are not serializable. In this way, we
+   * don't have to serialize them with forcing.
+   */
+  protected RowDataWrapper lazyRowDataWrapper() {
+    if (rowDataWrapper == null) {
+      rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+    }
+    return rowDataWrapper;
+  }
+
+  /**
+   * Construct the {@link StructProjection} lazily because it is not serializable.
+   */
+  protected StructProjection lazyStructProjection() {
+    if (structProjection == null) {
+      structProjection = StructProjection.create(schema, deleteSchema);
+    }
+    return structProjection;
+  }
+
+  /**
+   * Construct the {@link StructLikeWrapper} lazily because it is not serializable.
+   */
+  protected StructLikeWrapper lazyStructLikeWrapper() {
+    if (structLikeWrapper == null) {
+      structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct());
+    }
+    return structLikeWrapper;
+  }
+
+  @Override
+  public Integer getKey(RowData row) {
+    RowDataWrapper wrappedRowData = lazyRowDataWrapper().wrap(row);
+    StructProjection projectedRowData = lazyStructProjection().wrap(wrappedRowData);
+    StructLikeWrapper wrapper = lazyStructLikeWrapper().set(projectedRowData);
+    return wrapper.hashCode();
+  }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index f4b8d7e..d15a5f2 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -24,6 +24,7 @@ import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,9 +51,11 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
@@ -146,13 +149,14 @@ public class FlinkSink {
                                             MapFunction<T, RowData> mapper,
                                             TypeInformation<RowData> outputType) {
       this.inputCreator = newUidPrefix -> {
+        // Input stream order is crucial for some situation(e.g. in cdc case). Therefore, we need to set the parallelism
+        // of map operator same as its input to keep map operator chaining its input, and avoid rebalanced by default.
+        SingleOutputStreamOperator<RowData> inputStream = input.map(mapper, outputType)
+            .setParallelism(input.getParallelism());
         if (newUidPrefix != null) {
-          return input.map(mapper, outputType)
-              .name(operatorName(newUidPrefix))
-              .uid(newUidPrefix + "-mapper");
-        } else {
-          return input.map(mapper, outputType);
+          inputStream.name(operatorName(newUidPrefix)).uid(newUidPrefix + "-mapper");
         }
+        return inputStream;
       };
       return this;
     }
@@ -295,15 +299,19 @@ public class FlinkSink {
         }
       }
 
+      // Find out the equality field id list based on the user-provided equality field column names.
+      List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
+
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
-      // Distribute the records from input data stream based on the write.distribution-mode.
+      // Distribute the records from input data stream based on the write.distribution-mode and equality fields.
       DataStream<RowData> distributeStream = distributeDataStream(
-          rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+          rowDataInput, table.properties(), equalityFieldIds, table.spec(), table.schema(), flinkRowType);
 
       // Add parallel writers that append rows to files
-      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType,
+          equalityFieldIds);
 
       // Add single-parallelism committer that commits files
       // after successful checkpoint or end of input
@@ -338,6 +346,28 @@ public class FlinkSink {
       return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
     }
 
+    @VisibleForTesting
+    List<Integer> checkAndGetEqualityFieldIds() {
+      List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
+      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+        Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+        for (String column : equalityFieldColumns) {
+          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
+              column, table.schema());
+          equalityFieldSet.add(field.fieldId());
+        }
+
+        if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+          LOG.warn("The configured equality field column IDs {} are not matched with the schema identifier field IDs" +
+                  " {}, use job specified equality field columns as the equality fields by default.",
+              equalityFieldSet, table.schema().identifierFieldIds());
+        }
+        equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+      }
+      return equalityFieldIds;
+    }
+
     @SuppressWarnings("unchecked")
     private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
       DataStreamSink<T> resultStream = committerStream
@@ -362,17 +392,8 @@ public class FlinkSink {
       return committerStream;
     }
 
-    private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
+    private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
+                                                                 List<Integer> equalityFieldIds) {
 
       // Fallback to use upsert mode parsed from table properties if don't specify in job level.
       boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
@@ -407,6 +428,7 @@ public class FlinkSink {
 
     private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
                                                      Map<String, String> properties,
+                                                     List<Integer> equalityFieldIds,
                                                      PartitionSpec partitionSpec,
                                                      Schema iSchema,
                                                      RowType flinkRowType) {
@@ -422,24 +444,53 @@ public class FlinkSink {
         writeMode = distributionMode;
       }
 
+      LOG.info("Write distribution mode is '{}'", writeMode.modeName());
       switch (writeMode) {
         case NONE:
-          return input;
+          if (equalityFieldIds.isEmpty()) {
+            return input;
+          } else {
+            LOG.info("Distribute rows by equality fields, because there are equality fields set");
+            return input.keyBy(new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+          }
 
         case HASH:
-          if (partitionSpec.isUnpartitioned()) {
-            return input;
+          if (equalityFieldIds.isEmpty()) {
+            if (partitionSpec.isUnpartitioned()) {
+              LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " +
+                  "and table is unpartitioned");
+              return input;
+            } else {
+              return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+            }
           } else {
-            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+            if (partitionSpec.isUnpartitioned()) {
+              LOG.info("Distribute rows by equality fields, because there are equality fields set " +
+                  "and table is unpartitioned");
+              return input.keyBy(new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+            } else {
+              for (PartitionField partitionField : partitionSpec.fields()) {
+                Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+                    "In 'hash' distribution mode with equality fields set, partition field '%s' " +
+                        "should be included in equality fields: '%s'", partitionField, equalityFieldColumns);
+              }
+              return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+            }
           }
 
         case RANGE:
-          LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
-              WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
-          return input;
+          if (equalityFieldIds.isEmpty()) {
+            LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " +
+                "and {}=range is not supported yet in flink", WRITE_DISTRIBUTION_MODE);
+            return input;
+          } else {
+            LOG.info("Distribute rows by equality fields, because there are equality fields set " +
+                "and{}=range is not supported yet in flink", WRITE_DISTRIBUTION_MODE);
+            return input.keyBy(new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+          }
 
         default:
-          throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+          throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
       }
     }
   }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 90b1a6d..23169d1 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -49,6 +49,8 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
 import org.junit.Before;
@@ -84,36 +86,39 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
   private final FileFormat format;
   private final int parallelism;
   private final boolean partitioned;
+  private final String writeDistributionMode;
 
   private StreamExecutionEnvironment env;
   private TestTableLoader tableLoader;
 
-  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
   public static Object[][] parameters() {
     return new Object[][] {
-        new Object[] {"avro", 1, true},
-        new Object[] {"avro", 1, false},
-        new Object[] {"avro", 2, true},
-        new Object[] {"avro", 2, false},
-        new Object[] {"orc", 1, true},
-        new Object[] {"orc", 1, false},
-        new Object[] {"orc", 2, true},
-        new Object[] {"orc", 2, false},
-        new Object[] {"parquet", 1, true},
-        new Object[] {"parquet", 1, false},
-        new Object[] {"parquet", 2, true},
-        new Object[] {"parquet", 2, false}
+        new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+        new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+        new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+        new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+
+        new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+        new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+        new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+        new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+
+        new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+        new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+        new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
+        new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
     };
   }
 
-  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned, String writeDistributionMode) {
     super(FORMAT_V2);
     this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
     this.parallelism = parallelism;
     this.partitioned = partitioned;
+    this.writeDistributionMode = writeDistributionMode;
   }
 
-  @Override
   @Before
   public void setupTable() throws IOException {
     this.tableDir = temp.newFolder();
@@ -128,6 +133,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
 
     table.updateProperties()
         .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode)
         .commit();
 
     env = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
@@ -155,10 +161,6 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
                               List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
     DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
 
-    // Shuffle by the equality key, so that different operations of the same key could be wrote in order when
-    // executing tasks in parallel.
-    dataStream = dataStream.keyBy(keySelector);
-
     FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
         .tableLoader(tableLoader)
         .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
@@ -197,6 +199,37 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
   }
 
   @Test
+  public void testCheckAndGetEqualityFieldIds() {
+    table.updateSchema()
+        .allowIncompatibleChanges()
+        .addRequiredColumn("type", Types.StringType.get())
+        .setIdentifierFields("type")
+        .commit();
+
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+    FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
+
+    // Use schema identifier field IDs as equality field id list by default
+    Assert.assertEquals(
+        table.schema().identifierFieldIds(),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds())
+    );
+
+    // Use user-provided equality field column as equality field id list
+    builder.equalityFieldColumns(Lists.newArrayList("id"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("id").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds())
+    );
+
+    builder.equalityFieldColumns(Lists.newArrayList("type"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("type").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds())
+    );
+  }
+
+  @Test
   public void testChangeLogOnIdKey() throws Exception {
     List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
         ImmutableList.of(
@@ -227,8 +260,18 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
         ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
     );
 
-    testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
-        elementsPerCheckpoint, expectedRecords);
+    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
+      AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
+          IllegalStateException.class, "should be included in equality fields",
+          () -> {
+            testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
+                elementsPerCheckpoint, expectedRecords);
+            return null;
+          });
+    } else {
+      testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
+          elementsPerCheckpoint, expectedRecords);
+    }
   }
 
   @Test
@@ -343,7 +386,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
 
     AssertHelpers.assertThrows("Should be error because upsert mode and overwrite mode enable at the same time.",
         IllegalStateException.class, "OVERWRITE mode shouldn't be enable",
-        () -> builder.equalityFieldColumns(ImmutableList.of("id")).overwrite(true).append()
+        () -> builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append()
     );
 
     AssertHelpers.assertThrows("Should be error because equality field columns are empty.",
@@ -381,8 +424,8 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
       AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
           IllegalStateException.class, "should be included in equality fields",
           () -> {
-            testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint,
-                expectedRecords);
+            testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true,
+                elementsPerCheckpoint, expectedRecords);
             return null;
           });
     }