You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/22 04:33:49 UTC

[GitHub] [iceberg] openinx opened a new pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

openinx opened a new pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974


   Add unit tests to proof that flink DataStream job could write the CDC events correctly. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547415472



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.
+    dataStream = dataStream.keyBy(keySelector);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
+
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots(table);
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals("Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
+
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals("Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])), actualRowSet(snapshotId, "*"));
+    }
+  }
+
+  private Row row(String rowKind, int id, String data) {
+    Map<String, RowKind> mapping = ImmutableMap.of(
+        "+I", RowKind.INSERT,
+        "-D", RowKind.DELETE,
+        "-U", RowKind.UPDATE_BEFORE,
+        "+U", RowKind.UPDATE_AFTER);
+
+    RowKind kind = mapping.get(rowKind);
+    if (kind == null) {
+      throw new IllegalArgumentException("Unknown row kind: " + rowKind);
+    }
+
+    return Row.ofKind(kind, id, data);
+  }
+
+  private Record record(int id, String data) {
+    return SimpleDataUtil.createRecord(id, data);
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    List<String> equalityFieldIds = ImmutableList.of("id");

Review comment:
       Should this be `equalityFieldNames`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547412394



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -184,7 +198,18 @@ public Builder writeParallelism(int newWriteParallelism) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema);
+      // Find out the equality field id list based on the user-provided equality field column names.
+      List<Integer> equalityFieldIds = Lists.newArrayList();
+      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+        for (String column : equalityFieldColumns) {
+          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
+              column, table.schema());
+          equalityFieldIds.add(field.fieldId());
+        }
+      }

Review comment:
       Why not do this conversion in `equalityFieldColumns` and keep the column ids in the builder instead of the source column names?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547621405



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.
+    dataStream = dataStream.keyBy(keySelector);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
+
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots(table);
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals("Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
+
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals("Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])), actualRowSet(snapshotId, "*"));
+    }
+  }
+
+  private Row row(String rowKind, int id, String data) {
+    Map<String, RowKind> mapping = ImmutableMap.of(
+        "+I", RowKind.INSERT,
+        "-D", RowKind.DELETE,
+        "-U", RowKind.UPDATE_BEFORE,
+        "+U", RowKind.UPDATE_AFTER);
+
+    RowKind kind = mapping.get(rowKind);
+    if (kind == null) {
+      throw new IllegalArgumentException("Unknown row kind: " + rowKind);
+    }
+
+    return Row.ofKind(kind, id, data);
+  }
+
+  private Record record(int id, String data) {
+    return SimpleDataUtil.createRecord(id, data);
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    List<String> equalityFieldIds = ImmutableList.of("id");
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),

Review comment:
       The current way is correct because it will maintain `rowKind` in a separate field ( rather than in the shared `fields` array) , see [here](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/Row.java#L72). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#issuecomment-749974019


   All checks passed, I've merged this patch to repo so that I could create the next PR for flink table cdc e2e unit tests. Thanks @rdblue for reviewing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547417591



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.
+    dataStream = dataStream.keyBy(keySelector);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
+
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots(table);
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals("Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
+
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals("Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])), actualRowSet(snapshotId, "*"));
+    }
+  }
+
+  private Row row(String rowKind, int id, String data) {
+    Map<String, RowKind> mapping = ImmutableMap.of(
+        "+I", RowKind.INSERT,
+        "-D", RowKind.DELETE,
+        "-U", RowKind.UPDATE_BEFORE,
+        "+U", RowKind.UPDATE_AFTER);
+
+    RowKind kind = mapping.get(rowKind);
+    if (kind == null) {
+      throw new IllegalArgumentException("Unknown row kind: " + rowKind);
+    }
+
+    return Row.ofKind(kind, id, data);
+  }
+
+  private Record record(int id, String data) {
+    return SimpleDataUtil.createRecord(id, data);
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    List<String> equalityFieldIds = ImmutableList.of("id");
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),

Review comment:
       Minor: This makes it look like the row has an operation as its first column, but that doesn't align with the key selector below that uses `row.getField(0)` to get the ID. I think it would make tests easier to read if `row` passed the row kind at the end. That way the fields align.
   
   I'm not sure if it is worth changing all of the rows. Up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r549889701



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       For the bloom filter idea, @wangmiao1981 has been working on a proposal for secondary indexes. I think that could be used for the check you're suggesting here.
   
   > people could choose to use UNIQUENESS ENFORCED or UNIQUENESS NOT-ENFORCED, in this way they could trade off between strong semantic and performance.
   
   Are you saying that if uniqueness is enforced, each insert becomes an upsert. But if uniqueness is not enforced, then the sink would assume that whatever is emitting records will correctly delete before inserting? That sounds reasonable to me.
   
   > Finally the size of delete files will be almost same as the size of data files. The process of merging on read will be quite inefficient because there are too many useless DELETE to JOIN.
   
   I think that even if uniqueness is not enforced, tables will quickly require compaction to rewrite the equality deletes. I think we should spend some time making sure that we have good ways to maintain tables and compact equality deletes into position deletes, and position deletes into data files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547617523



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.
+    dataStream = dataStream.keyBy(keySelector);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
+
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots(table);
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals("Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
+
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals("Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])), actualRowSet(snapshotId, "*"));
+    }
+  }
+
+  private Row row(String rowKind, int id, String data) {
+    Map<String, RowKind> mapping = ImmutableMap.of(
+        "+I", RowKind.INSERT,
+        "-D", RowKind.DELETE,
+        "-U", RowKind.UPDATE_BEFORE,
+        "+U", RowKind.UPDATE_AFTER);
+
+    RowKind kind = mapping.get(rowKind);
+    if (kind == null) {
+      throw new IllegalArgumentException("Unknown row kind: " + rowKind);
+    }
+
+    return Row.ofKind(kind, id, data);
+  }
+
+  private Record record(int id, String data) {
+    return SimpleDataUtil.createRecord(id, data);
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    List<String> equalityFieldIds = ImmutableList.of("id");

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547617326



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.

Review comment:
       Thanks for pointing it out, will address it in next update. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547617115



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -184,7 +198,18 @@ public Builder writeParallelism(int newWriteParallelism) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema);
+      // Find out the equality field id list based on the user-provided equality field column names.
+      List<Integer> equalityFieldIds = Lists.newArrayList();
+      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+        for (String column : equalityFieldColumns) {
+          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
+              column, table.schema());
+          equalityFieldIds.add(field.fieldId());
+        }
+      }

Review comment:
       Because the `FlinkSink` is an API which will be exposed to flink's DataStream users,   the concept of equality field id is harder to understand for those flink users.  Equality field column names will be more friendly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547413103



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.

Review comment:
       Nit: I think you mean "executing tasks in parallel" rather than "parallelism".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r550279316



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       I’d vote for not ensuring uniqueness as it is really hard at scale. If we are to ensure this at write, we have to join the incoming data with the target table making it really expensive. Doing this at read would require sorting the data not only by the sort key but also by the sequence number. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547411736



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       Do you think that we should consider adding primary key columns to the spec?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547413580



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2 extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  private StreamExecutionEnvironment env;
+  private TestTableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSinkV2(String format, int parallelism, boolean partitioned) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    if (!partitioned) {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
+    } else {
+      table = create(SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
+    }
+
+    table.updateProperties()
+        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
+        .commit();
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100L)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
+  }
+
+  private List<Snapshot> findValidSnapshots(Table table) {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
+  }
+
+  private void testChangeLogs(List<String> equalityFieldColumns,
+                              KeySelector<Row, Row> keySelector,
+                              List<List<Row>> elementsPerCheckpoint,
+                              List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
+
+    // Shuffle by the equality key, so that different operations from the same key could be wrote in order when
+    // executing tasks in parallelism.
+    dataStream = dataStream.keyBy(keySelector);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
+
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots(table);
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals("Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
+
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals("Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])), actualRowSet(snapshotId, "*"));
+    }
+  }
+
+  private Row row(String rowKind, int id, String data) {
+    Map<String, RowKind> mapping = ImmutableMap.of(
+        "+I", RowKind.INSERT,
+        "-D", RowKind.DELETE,
+        "-U", RowKind.UPDATE_BEFORE,
+        "+U", RowKind.UPDATE_AFTER);

Review comment:
       Could this be a private static map instead of defining it each time a row is created?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r549921095



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       > Are you saying that if uniqueness is enforced, each insert becomes an upsert. But if uniqueness is not enforced, then the sink would assume that whatever is emitting records will correctly delete before inserting?
   
   Yes.  If someone are exporting relational database's change log events to apache iceberg table and they could guarantee the exactly-once semantics (For example,  the [flink-cdc-connector](https://github.com/ververica/flink-cdc-connectors) could guarantee that), then the uniqueness is always correct when we just write the INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER to iceberg.  While in some other cases,  for example flink aggregate job to refresh the metrics count value,  we will write the same key several times without deleting first, then we should regard all the INSERT as UPSERT. 
   
   > even if uniqueness is not enforced, tables will quickly require compaction to rewrite the equality deletes.
   
   That was planned in the second phase, include: 
   
   1.  Use bloom filter to reduce lots of useless deletes; 
   2. Minor compaction to convert parts of equality deletes to pos-deletes
   3. Major compaction to eliminate all the deletes. 
   4. Make the whole read path & write path more stable. For example,  cache policy reduce duplicated delete files loading when merging on read in the same tasks;  Spill to disk if the `insertedRowMap` is exceeding the task's memory threshold, etc.  I will evaluate the read & write & compaction paths in a large dataset, making this to be a stable solution for production. 
   
   It's good to have a document to collect all those things for reviewing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1974: Flink: Add ChangeLog DataStream end-to-end unit tests.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547616494



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       In the next PR https://github.com/openinx/incubator-iceberg/commit/a863c66eb3d72dd975ea64c75ed2ac35984c17fe,  The flink table SQL's primary key  will act as the equality field columns.   The semantic of iceberg equality columns is almost the same as primary key,  one difference I can think of is:   the uniqueness of key are not enforced. In this [discussion](https://github.com/apache/iceberg/pull/1663#discussion_r528278694),  we don't guarantee the uniqueness when writing a key which has been also wrote in the previous committed txn, that means if : 
   
   ```java
   Txn-1:  INSERT key1,  txn commit; 
   Txn-2:  INSERT key1,  txn commit;
   ```
   
   Then the table will have two records with the same key. 
   
   If people really need iceberg to maintain the key's uniqueness, then they will need to transform all the `INSERT` to `UPSERT`, which means `DELETE` firstly and then `INSERT` the new values.   
   
   It will introduce another issues:  Each `INSERT` will be regarded as an `UPSERT`,  so it write a `DELETE` and a `INSERT`.  Finally the size of delete files will be almost same as the size of data files.    The process of merging on read will be quite inefficient   because there are too many useless `DELETE` to JOIN.  
   
   The direct way is using bloom filter to reduce the useless `DELETE`, say we will generate bloom filter binary for each committed data file.  When bootstrap the flink/spark job we will need to prefetch all the bloom filter binary from parquet/avro data files's metadata. Before writing a equality delete, we will check the bloom filter, and if the bloom filter indicate that all the committed data files are not containing the given key, then we could skip to append that equality-delete. That would reduce lots of useless `DELETE` in delete files. Of course, the bloom filter will have 'false positive' issue, but that probability is less than 1%, that means we may append
   small amout of deletes whose keys don't exist in the current table.  In my view, that should be OK.
   
   In summary, I think it's reasonable to regard those equality fields as primary key in iceberg table, people could choose to use `UNIQUENESS ENFORCED` or `UNIQUENESS NOT-ENFORCED`, in this way they could trade off between strong semantic and performance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org