You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/28 23:43:51 UTC
[iceberg] branch master updated: Flink: Backport PR #6660 to Flink 1.14 and 1.15 (#6949)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d1679dfac5 Flink: Backport PR #6660 to Flink 1.14 and 1.15 (#6949)
d1679dfac5 is described below
commit d1679dfac55d9cc12d8f869a85109054a41ebd31
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Tue Feb 28 15:43:44 2023 -0800
Flink: Backport PR #6660 to Flink 1.14 and 1.15 (#6949)
---
.../org/apache/iceberg/flink/FlinkWriteConf.java | 8 +
.../apache/iceberg/flink/FlinkWriteOptions.java | 5 +
.../org/apache/iceberg/flink/sink/FlinkSink.java | 11 +-
.../iceberg/flink/sink/IcebergFilesCommitter.java | 22 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 46 +-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 35 +-
.../flink/sink/TestFlinkIcebergSinkBase.java | 64 +++
.../flink/sink/TestFlinkIcebergSinkBranch.java | 137 ++++++
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 310 +------------
...SinkV2.java => TestFlinkIcebergSinkV2Base.java} | 511 +++++++--------------
.../flink/sink/TestFlinkIcebergSinkV2Branch.java | 134 ++++++
.../flink/sink/TestIcebergFilesCommitter.java | 87 ++--
.../org/apache/iceberg/flink/FlinkWriteConf.java | 8 +
.../apache/iceberg/flink/FlinkWriteOptions.java | 5 +
.../org/apache/iceberg/flink/sink/FlinkSink.java | 8 +-
.../iceberg/flink/sink/IcebergFilesCommitter.java | 22 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 46 +-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 35 +-
.../flink/sink/TestFlinkIcebergSinkBase.java | 64 +++
.../flink/sink/TestFlinkIcebergSinkBranch.java | 137 ++++++
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 310 +------------
.../flink/sink/TestFlinkIcebergSinkV2Base.java} | 511 +++++++--------------
.../flink/sink/TestFlinkIcebergSinkV2Branch.java | 134 ++++++
.../flink/sink/TestIcebergFilesCommitter.java | 87 ++--
24 files changed, 1284 insertions(+), 1453 deletions(-)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 448b2aa2d8..4b5c7e4a0d 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -172,4 +172,12 @@ public class FlinkWriteConf {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}
+
+ public String branch() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.BRANCH.key())
+ .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
+ .parse();
+ }
}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index f3cc52972b..86cb2fb0eb 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.SnapshotRef;
/** Flink sink write options */
public class FlinkWriteOptions {
@@ -56,4 +57,8 @@ public class FlinkWriteOptions {
// Overrides the table's write.distribution-mode
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+
+ // Branch to write to
+ public static final ConfigOption<String> BRANCH =
+ ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index ead0b757e5..445b6a6ff9 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -132,10 +132,7 @@ public class FlinkSink {
private TableLoader tableLoader;
private Table table;
private TableSchema tableSchema;
- private boolean overwrite = false;
- private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
- private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
private final Map<String, String> snapshotProperties = Maps.newHashMap();
@@ -319,6 +316,11 @@ public class FlinkSink {
return this;
}
+ public Builder toBranch(String branch) {
+ writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
+ return this;
+ }
+
private <T> DataStreamSink<T> chainIcebergOperators() {
Preconditions.checkArgument(
inputCreator != null,
@@ -425,7 +427,8 @@ public class FlinkSink {
tableLoader,
flinkWriteConf.overwriteMode(),
snapshotProperties,
- flinkWriteConf.workerPoolSize());
+ flinkWriteConf.workerPoolSize(),
+ flinkWriteConf.branch());
SingleOutputStreamOperator<Void> committerStream =
writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index d8a7bc5cf2..22b4dc9d21 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -95,6 +95,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// The completed files cache for current checkpoint. Once the snapshot barrier received, it will
// be flushed to the 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+ private final String branch;
// It will have an unique identifier for one job.
private transient String flinkJobId;
@@ -125,11 +126,13 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
TableLoader tableLoader,
boolean replacePartitions,
Map<String, String> snapshotProperties,
- Integer workerPoolSize) {
+ Integer workerPoolSize,
+ String branch) {
this.tableLoader = tableLoader;
this.replacePartitions = replacePartitions;
this.snapshotProperties = snapshotProperties;
this.workerPoolSize = workerPoolSize;
+ this.branch = branch;
}
@Override
@@ -179,7 +182,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// it's safe to assign the max committed checkpoint id from restored flink job to the current
// flink job.
this.maxCommittedCheckpointId =
- getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId);
+ getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch);
NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -230,7 +233,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
- LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
@@ -286,8 +288,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
- } else {
- LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
}
}
@@ -386,10 +386,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
String operatorId,
long checkpointId) {
LOG.info(
- "Committing {} for checkpoint {} to table {} with summary: {}",
+ "Committing {} for checkpoint {} to table {} branch {} with summary: {}",
description,
checkpointId,
table.name(),
+ branch,
summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict with internal ones
@@ -397,14 +398,16 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
+ operation.toBranch(branch);
long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
LOG.info(
- "Committed {} to table: {}, checkpointId {} in {} ms",
+ "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
description,
table.name(),
+ branch,
checkpointId,
durationMs);
committerMetrics.commitDuration(durationMs);
@@ -474,8 +477,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}
- static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
- Snapshot snapshot = table.currentSnapshot();
+ static long getMaxCommittedCheckpointId(
+ Table table, String flinkJobId, String operatorId, String branch) {
+ Snapshot snapshot = table.snapshot(branch);
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
while (snapshot != null) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index e296763508..345d88a48a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
@@ -206,12 +207,18 @@ public class SimpleDataUtil {
return records;
}
- public static void assertTableRows(String tablePath, List<RowData> expected) throws IOException {
- assertTableRecords(tablePath, convertToRecords(expected));
+ public static void assertTableRows(String tablePath, List<RowData> expected, String branch)
+ throws IOException {
+ assertTableRecords(tablePath, convertToRecords(expected), branch);
}
public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
- assertTableRecords(table, convertToRecords(expected));
+ assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRows(Table table, List<RowData> expected, String branch)
+ throws IOException {
+ assertTableRecords(table, convertToRecords(expected), branch);
}
/** Get all rows for a table */
@@ -267,13 +274,25 @@ public class SimpleDataUtil {
}
public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
+ assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRecords(Table table, List<Record> expected, String branch)
+ throws IOException {
table.refresh();
+ Snapshot snapshot = latestSnapshot(table, branch);
+
+ if (snapshot == null) {
+ Assert.assertEquals(expected, ImmutableList.of());
+ return;
+ }
Types.StructType type = table.schema().asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
- try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
StructLikeSet actualSet = StructLikeSet.create(type);
for (Record record : iterable) {
@@ -284,10 +303,27 @@ public class SimpleDataUtil {
}
}
+ // Returns the latest snapshot of the given branch in the table
+ public static Snapshot latestSnapshot(Table table, String branch) {
+ // For the main branch, currentSnapshot() is used to validate that the API behavior has
+ // not changed since that was the API used for validation prior to addition of branches.
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ return table.currentSnapshot();
+ }
+
+ return table.snapshot(branch);
+ }
+
public static void assertTableRecords(String tablePath, List<Record> expected)
throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
- assertTableRecords(new HadoopTables().load(tablePath), expected);
+ assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRecords(String tablePath, List<Record> expected, String branch)
+ throws IOException {
+ Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
+ assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
}
public static StructLikeSet expectedRowSet(Table table, Record... records) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index c2af30d342..23beb19a72 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -22,14 +22,10 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
@@ -45,7 +41,6 @@ import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
-import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -60,7 +55,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public class TestFlinkIcebergSink {
+public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,13 +67,6 @@ public class TestFlinkIcebergSink {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
- private static final TypeInformation<Row> ROW_TYPE_INFO =
- new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
- private static final DataFormatConverters.RowConverter CONVERTER =
- new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
-
- private Table table;
- private StreamExecutionEnvironment env;
private TableLoader tableLoader;
private final FileFormat format;
@@ -132,14 +120,6 @@ public class TestFlinkIcebergSink {
tableLoader = catalogResource.tableLoader();
}
- private List<RowData> convertToRowData(List<Row> rows) {
- return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
- }
-
- private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
- return new BoundedTestSource<>(rows.toArray(new Row[0]));
- }
-
@Test
public void testWriteRowData() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
@@ -160,19 +140,6 @@ public class TestFlinkIcebergSink {
SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}
- private List<Row> createRows(String prefix) {
- return Lists.newArrayList(
- Row.of(1, prefix + "aaa"),
- Row.of(1, prefix + "bbb"),
- Row.of(1, prefix + "ccc"),
- Row.of(2, prefix + "aaa"),
- Row.of(2, prefix + "bbb"),
- Row.of(2, prefix + "ccc"),
- Row.of(3, prefix + "aaa"),
- Row.of(3, prefix + "bbb"),
- Row.of(3, prefix + "ccc"));
- }
-
private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
throws Exception {
List<Row> rows = createRows("");
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
new file mode 100644
index 0000000000..b38aa6b50c
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class TestFlinkIcebergSinkBase {
+
+ protected Table table;
+ protected StreamExecutionEnvironment env;
+ protected static final TypeInformation<Row> ROW_TYPE_INFO =
+ new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+ protected static final DataFormatConverters.RowConverter CONVERTER =
+ new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+ protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+ return new BoundedTestSource<>(rows.toArray(new Row[0]));
+ }
+
+ protected List<Row> createRows(String prefix) {
+ return Lists.newArrayList(
+ Row.of(1, prefix + "aaa"),
+ Row.of(1, prefix + "bbb"),
+ Row.of(1, prefix + "ccc"),
+ Row.of(2, prefix + "aaa"),
+ Row.of(2, prefix + "bbb"),
+ Row.of(2, prefix + "ccc"),
+ Row.of(3, prefix + "aaa"),
+ Row.of(3, prefix + "bbb"),
+ Row.of(3, prefix + "ccc"));
+ }
+
+ protected List<RowData> convertToRowData(List<Row> rows) {
+ return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+ }
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
new file mode 100644
index 0000000000..16b4542b00
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final HadoopCatalogResource catalogResource =
+ new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ private final String branch;
+ private TableLoader tableLoader;
+
+ @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+ public static Object[] parameters() {
+ return new Object[] {"main", "testBranch"};
+ }
+
+ public TestFlinkIcebergSinkBranch(String branch) {
+ this.branch = branch;
+ }
+
+ @Before
+ public void before() throws IOException {
+ table =
+ catalogResource
+ .catalog()
+ .createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ FileFormat.AVRO.name(),
+ TableProperties.FORMAT_VERSION,
+ "1"));
+
+ env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .enableCheckpointing(100);
+
+ tableLoader = catalogResource.tableLoader();
+ }
+
+ @Test
+ public void testWriteRowWithTableSchema() throws Exception {
+ testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+ verifyOtherBranchUnmodified();
+ }
+
+ private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+ throws Exception {
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .tableSchema(tableSchema)
+ .toBranch(branch)
+ .distributionMode(distributionMode)
+ .append();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream.");
+
+ SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+ SimpleDataUtil.assertTableRows(
+ table,
+ ImmutableList.of(),
+ branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+ verifyOtherBranchUnmodified();
+ }
+
+ private void verifyOtherBranchUnmodified() {
+ String otherBranch =
+ branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+ if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+ Assert.assertNull(table.currentSnapshot());
+ }
+
+ Assert.assertTrue(table.snapshot(otherBranch) == null);
+ }
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index ccc3c0f23d..af3cc19635 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,38 +18,25 @@
*/
package org.apache.iceberg.flink.sink;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.IcebergGenerics;
-import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
-import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
-import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -60,7 +47,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,29 +59,6 @@ public class TestFlinkIcebergSinkV2 {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
- private static final int FORMAT_V2 = 2;
- private static final TypeInformation<Row> ROW_TYPE_INFO =
- new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
-
- private static final Map<String, RowKind> ROW_KIND_MAP =
- ImmutableMap.of(
- "+I", RowKind.INSERT,
- "-D", RowKind.DELETE,
- "-U", RowKind.UPDATE_BEFORE,
- "+U", RowKind.UPDATE_AFTER);
-
- private static final int ROW_ID_POS = 0;
- private static final int ROW_DATA_POS = 1;
-
- private final FileFormat format;
- private final int parallelism;
- private final boolean partitioned;
- private final String writeDistributionMode;
-
- private Table table;
- private StreamExecutionEnvironment env;
- private TableLoader tableLoader;
-
@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
public static Object[][] parameters() {
@@ -155,67 +119,6 @@ public class TestFlinkIcebergSinkV2 {
tableLoader = catalogResource.tableLoader();
}
- private List<Snapshot> findValidSnapshots() {
- List<Snapshot> validSnapshots = Lists.newArrayList();
- for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests(table.io()).stream()
- .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
- validSnapshots.add(snapshot);
- }
- }
- return validSnapshots;
- }
-
- private void testChangeLogs(
- List<String> equalityFieldColumns,
- KeySelector<Row, Object> keySelector,
- boolean insertAsUpsert,
- List<List<Row>> elementsPerCheckpoint,
- List<List<Record>> expectedRecordsPerCheckpoint)
- throws Exception {
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .tableLoader(tableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .equalityFieldColumns(equalityFieldColumns)
- .upsert(insertAsUpsert)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg Change-Log DataStream.");
-
- table.refresh();
- List<Snapshot> snapshots = findValidSnapshots();
- int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
- Assert.assertEquals(
- "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
- for (int i = 0; i < expectedSnapshotNum; i++) {
- long snapshotId = snapshots.get(i).snapshotId();
- List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
- Assert.assertEquals(
- "Should have the expected records for the checkpoint#" + i,
- expectedRowSet(expectedRecords.toArray(new Record[0])),
- actualRowSet(snapshotId, "*"));
- }
- }
-
- private Row row(String rowKind, int id, String data) {
- RowKind kind = ROW_KIND_MAP.get(rowKind);
- if (kind == null) {
- throw new IllegalArgumentException("Unknown row kind: " + rowKind);
- }
-
- return Row.ofKind(kind, id, data);
- }
-
- private Record record(int id, String data) {
- return SimpleDataUtil.createRecord(id, data);
- }
-
@Test
public void testCheckAndGetEqualityFieldIds() {
table
@@ -249,79 +152,12 @@ public class TestFlinkIcebergSinkV2 {
@Test
public void testChangeLogOnIdKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(
- row("+I", 1, "aaa"),
- row("-D", 1, "aaa"),
- row("+I", 1, "bbb"),
- row("+I", 2, "aaa"),
- row("-D", 2, "aaa"),
- row("+I", 2, "bbb")),
- ImmutableList.of(
- row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
- ImmutableList.of(
- row("-D", 1, "bbb"),
- row("+I", 1, "ccc"),
- row("-D", 1, "ccc"),
- row("+I", 1, "ddd")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
- ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
- ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
-
- if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- return null;
- });
- } else {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testChangeLogOnDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(
- row("+I", 1, "aaa"),
- row("-D", 1, "aaa"),
- row("+I", 2, "bbb"),
- row("+I", 1, "bbb"),
- row("+I", 2, "aaa")),
- ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
- ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
- ImmutableList.of(
- record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")));
-
- testChangeLogs(
- ImmutableList.of("data"),
- row -> row.getField(ROW_DATA_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);
}
@Test
@@ -344,59 +180,12 @@ public class TestFlinkIcebergSinkV2 {
@Test
public void testChangeLogOnIdDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(
- row("+I", 1, "aaa"),
- row("-D", 1, "aaa"),
- row("+I", 2, "bbb"),
- row("+I", 1, "bbb"),
- row("+I", 2, "aaa")),
- ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
- ImmutableList.of(
- record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
- ImmutableList.of(
- record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
- testChangeLogs(
- ImmutableList.of("data", "id"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testChangeLogOnSameKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- // Checkpoint #1
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #2
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
- // Checkpoint #3
- ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #4
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH);
}
@Test
@@ -426,97 +215,16 @@ public class TestFlinkIcebergSinkV2 {
@Test
public void testUpsertOnIdKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
- ImmutableList.of(row("+I", 1, "ccc")),
- ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb")),
- ImmutableList.of(record(1, "ccc")),
- ImmutableList.of(record(1, "eee")));
-
- if (!partitioned) {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- } else {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- return null;
- });
- }
+ testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testUpsertOnDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
- ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
- ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
- ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
- ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
-
- testChangeLogs(
- ImmutableList.of("data"),
- row -> row.getField(ROW_DATA_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
+ testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testUpsertOnIdDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
- ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
- ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
- ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
-
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- }
-
- private StructLikeSet expectedRowSet(Record... records) {
- return SimpleDataUtil.expectedRowSet(table, records);
- }
-
- private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException {
- table.refresh();
- StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
- try (CloseableIterable<Record> reader =
- IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) {
- reader.forEach(set::add);
- }
- return set;
+ testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
similarity index 60%
copy from flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
copy to flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index ccc3c0f23d..15380408e4 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -26,184 +26,50 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2Base {
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ protected static final int FORMAT_V2 = 2;
+ protected static final TypeInformation<Row> ROW_TYPE_INFO =
+ new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+ protected static final int ROW_ID_POS = 0;
+ protected static final int ROW_DATA_POS = 1;
- private static final int FORMAT_V2 = 2;
- private static final TypeInformation<Row> ROW_TYPE_INFO =
- new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+ protected int parallelism = 1;
+ protected TableLoader tableLoader;
+ protected Table table;
+ protected StreamExecutionEnvironment env;
+ protected FileFormat format;
+ protected boolean partitioned;
+ protected String writeDistributionMode;
- private static final Map<String, RowKind> ROW_KIND_MAP =
+ protected static final Map<String, RowKind> ROW_KIND_MAP =
ImmutableMap.of(
"+I", RowKind.INSERT,
"-D", RowKind.DELETE,
"-U", RowKind.UPDATE_BEFORE,
"+U", RowKind.UPDATE_AFTER);
- private static final int ROW_ID_POS = 0;
- private static final int ROW_DATA_POS = 1;
-
- private final FileFormat format;
- private final int parallelism;
- private final boolean partitioned;
- private final String writeDistributionMode;
-
- private Table table;
- private StreamExecutionEnvironment env;
- private TableLoader tableLoader;
-
- @Parameterized.Parameters(
- name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
- public static Object[][] parameters() {
- return new Object[][] {
- new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
- };
- }
-
- public TestFlinkIcebergSinkV2(
- String format, int parallelism, boolean partitioned, String writeDistributionMode) {
- this.format = FileFormat.fromString(format);
- this.parallelism = parallelism;
- this.partitioned = partitioned;
- this.writeDistributionMode = writeDistributionMode;
- }
-
- @Before
- public void setupTable() {
- table =
- catalogResource
- .catalog()
- .createTable(
- TestFixtures.TABLE_IDENTIFIER,
- SimpleDataUtil.SCHEMA,
- partitioned
- ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
- : PartitionSpec.unpartitioned(),
- ImmutableMap.of(
- TableProperties.DEFAULT_FILE_FORMAT,
- format.name(),
- TableProperties.FORMAT_VERSION,
- String.valueOf(FORMAT_V2)));
-
- table
- .updateProperties()
- .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
- .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode)
- .commit();
-
- env =
- StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
- .enableCheckpointing(100L)
- .setParallelism(parallelism)
- .setMaxParallelism(parallelism);
-
- tableLoader = catalogResource.tableLoader();
- }
-
- private List<Snapshot> findValidSnapshots() {
- List<Snapshot> validSnapshots = Lists.newArrayList();
- for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests(table.io()).stream()
- .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
- validSnapshots.add(snapshot);
- }
- }
- return validSnapshots;
- }
-
- private void testChangeLogs(
- List<String> equalityFieldColumns,
- KeySelector<Row, Object> keySelector,
- boolean insertAsUpsert,
- List<List<Row>> elementsPerCheckpoint,
- List<List<Record>> expectedRecordsPerCheckpoint)
- throws Exception {
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .tableLoader(tableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .equalityFieldColumns(equalityFieldColumns)
- .upsert(insertAsUpsert)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg Change-Log DataStream.");
-
- table.refresh();
- List<Snapshot> snapshots = findValidSnapshots();
- int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
- Assert.assertEquals(
- "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
- for (int i = 0; i < expectedSnapshotNum; i++) {
- long snapshotId = snapshots.get(i).snapshotId();
- List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
- Assert.assertEquals(
- "Should have the expected records for the checkpoint#" + i,
- expectedRowSet(expectedRecords.toArray(new Record[0])),
- actualRowSet(snapshotId, "*"));
- }
- }
-
- private Row row(String rowKind, int id, String data) {
+ protected Row row(String rowKind, int id, String data) {
RowKind kind = ROW_KIND_MAP.get(rowKind);
if (kind == null) {
throw new IllegalArgumentException("Unknown row kind: " + rowKind);
@@ -212,92 +78,85 @@ public class TestFlinkIcebergSinkV2 {
return Row.ofKind(kind, id, data);
}
- private Record record(int id, String data) {
- return SimpleDataUtil.createRecord(id, data);
- }
-
- @Test
- public void testCheckAndGetEqualityFieldIds() {
- table
- .updateSchema()
- .allowIncompatibleChanges()
- .addRequiredColumn("type", Types.StringType.get())
- .setIdentifierFields("type")
- .commit();
-
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
-
- // Use schema identifier field IDs as equality field id list by default
- Assert.assertEquals(
- table.schema().identifierFieldIds(),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
-
- // Use user-provided equality field column as equality field id list
- builder.equalityFieldColumns(Lists.newArrayList("id"));
- Assert.assertEquals(
- Sets.newHashSet(table.schema().findField("id").fieldId()),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ protected void testUpsertOnIdDataKey(String branch) throws Exception {
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
+ ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
+ ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
- builder.equalityFieldColumns(Lists.newArrayList("type"));
- Assert.assertEquals(
- Sets.newHashSet(table.schema().findField("type").fieldId()),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ List<List<Record>> expectedRecords =
+ ImmutableList.of(
+ ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+ ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+ ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+ testChangeLogs(
+ ImmutableList.of("id", "data"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ true,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
}
- @Test
- public void testChangeLogOnIdKey() throws Exception {
+ protected void testChangeLogOnIdDataKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("-D", 1, "aaa"),
+ row("+I", 2, "bbb"),
row("+I", 1, "bbb"),
- row("+I", 2, "aaa"),
- row("-D", 2, "aaa"),
- row("+I", 2, "bbb")),
+ row("+I", 2, "aaa")),
+ ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
+ ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
+
+ List<List<Record>> expectedRecords =
+ ImmutableList.of(
+ ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
ImmutableList.of(
- row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
+ record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
ImmutableList.of(
- row("-D", 1, "bbb"),
- row("+I", 1, "ccc"),
- row("-D", 1, "ccc"),
- row("+I", 1, "ddd")));
+ record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
+
+ testChangeLogs(
+ ImmutableList.of("data", "id"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
+ }
+
+ protected void testChangeLogOnSameKey(String branch) throws Exception {
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ // Checkpoint #1
+ ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+ // Checkpoint #2
+ ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
+ // Checkpoint #3
+ ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+ // Checkpoint #4
+ ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
List<List<Record>> expectedRecords =
ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
- ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
- ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
+ ImmutableList.of(record(1, "aaa")),
+ ImmutableList.of(record(1, "aaa")),
+ ImmutableList.of(record(1, "aaa")),
+ ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
- if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- return null;
- });
- } else {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ testChangeLogs(
+ ImmutableList.of("id", "data"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
}
- @Test
- public void testChangeLogOnDataKey() throws Exception {
+ protected void testChangeLogOnDataKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(
@@ -321,111 +180,83 @@ public class TestFlinkIcebergSinkV2 {
row -> row.getField(ROW_DATA_POS),
false,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
}
- @Test
- public void testUpsertOnlyDeletesOnDataKey() throws Exception {
+ protected void testUpsertOnDataKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb")));
+ ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
+ ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
+ ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
List<List<Record>> expectedRecords =
- ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of());
+ ImmutableList.of(
+ ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+ ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+ ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
testChangeLogs(
ImmutableList.of("data"),
row -> row.getField(ROW_DATA_POS),
true,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
}
- @Test
- public void testChangeLogOnIdDataKey() throws Exception {
+ protected void testChangeLogOnIdKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("-D", 1, "aaa"),
- row("+I", 2, "bbb"),
row("+I", 1, "bbb"),
- row("+I", 2, "aaa")),
- ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+ row("+I", 2, "aaa"),
+ row("-D", 2, "aaa"),
+ row("+I", 2, "bbb")),
ImmutableList.of(
- record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+ row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
ImmutableList.of(
- record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
- testChangeLogs(
- ImmutableList.of("data", "id"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
-
- @Test
- public void testChangeLogOnSameKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- // Checkpoint #1
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #2
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
- // Checkpoint #3
- ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #4
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
+ row("-D", 1, "bbb"),
+ row("+I", 1, "ccc"),
+ row("-D", 1, "ccc"),
+ row("+I", 1, "ddd")));
List<List<Record>> expectedRecords =
ImmutableList.of(
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+ ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+ ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
- @Test
- public void testUpsertModeCheck() throws Exception {
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .tableLoader(tableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .upsert(true);
-
- AssertHelpers.assertThrows(
- "Should be error because upsert mode and overwrite mode enable at the same time.",
- IllegalStateException.class,
- "OVERWRITE mode shouldn't be enable",
- () ->
- builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append());
-
- AssertHelpers.assertThrows(
- "Should be error because equality field columns are empty.",
- IllegalStateException.class,
- "Equality field columns shouldn't be empty",
- () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+ if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
+ AssertHelpers.assertThrows(
+ "Should be error because equality field columns don't include all partition keys",
+ IllegalStateException.class,
+ "should be included in equality fields",
+ () -> {
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
+ return null;
+ });
+ } else {
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
+ }
}
- @Test
- public void testUpsertOnIdKey() throws Exception {
+ protected void testUpsertOnIdKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
@@ -444,7 +275,8 @@ public class TestFlinkIcebergSinkV2 {
row -> row.getField(ROW_ID_POS),
true,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
} else {
AssertHelpers.assertThrows(
"Should be error because equality field columns don't include all partition keys",
@@ -456,54 +288,65 @@ public class TestFlinkIcebergSinkV2 {
row -> row.getField(ROW_ID_POS),
true,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
return null;
});
}
}
- @Test
- public void testUpsertOnDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
- ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
- ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
+ protected void testChangeLogs(
+ List<String> equalityFieldColumns,
+ KeySelector<Row, Object> keySelector,
+ boolean insertAsUpsert,
+ List<List<Row>> elementsPerCheckpoint,
+ List<List<Record>> expectedRecordsPerCheckpoint,
+ String branch)
+ throws Exception {
+ DataStream<Row> dataStream =
+ env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
- ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
- ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .tableLoader(tableLoader)
+ .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+ .writeParallelism(parallelism)
+ .equalityFieldColumns(equalityFieldColumns)
+ .upsert(insertAsUpsert)
+ .toBranch(branch)
+ .append();
- testChangeLogs(
- ImmutableList.of("data"),
- row -> row.getField(ROW_DATA_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ // Execute the program.
+ env.execute("Test Iceberg Change-Log DataStream.");
- @Test
- public void testUpsertOnIdDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
- ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
+ table.refresh();
+ List<Snapshot> snapshots = findValidSnapshots();
+ int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+ Assert.assertEquals(
+ "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
- ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
- ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+ for (int i = 0; i < expectedSnapshotNum; i++) {
+ long snapshotId = snapshots.get(i).snapshotId();
+ List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+ Assert.assertEquals(
+ "Should have the expected records for the checkpoint#" + i,
+ expectedRowSet(expectedRecords.toArray(new Record[0])),
+ actualRowSet(snapshotId, "*"));
+ }
+ }
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- true,
- elementsPerCheckpoint,
- expectedRecords);
+ protected Record record(int id, String data) {
+ return SimpleDataUtil.createRecord(id, data);
+ }
+
+ private List<Snapshot> findValidSnapshots() {
+ List<Snapshot> validSnapshots = Lists.newArrayList();
+ for (Snapshot snapshot : table.snapshots()) {
+ if (snapshot.allManifests(table.io()).stream()
+ .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+ validSnapshots.add(snapshot);
+ }
+ }
+ return validSnapshots;
}
private StructLikeSet expectedRowSet(Record... records) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
new file mode 100644
index 0000000000..fed3338482
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final HadoopCatalogResource catalogResource =
+ new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ private final String branch;
+
+ @Parameterized.Parameters(name = "branch = {0}")
+ public static Object[] parameters() {
+ return new Object[] {"main", "testBranch"};
+ }
+
+ public TestFlinkIcebergSinkV2Branch(String branch) {
+ this.branch = branch;
+ }
+
+ @Before
+ public void before() throws IOException {
+ table =
+ catalogResource
+ .catalog()
+ .createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ FileFormat.AVRO.name(),
+ TableProperties.FORMAT_VERSION,
+ "2"));
+
+ env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .enableCheckpointing(100);
+
+ tableLoader = catalogResource.tableLoader();
+ }
+
+ @Test
+ public void testChangeLogOnIdKey() throws Exception {
+ testChangeLogOnIdKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testChangeLogOnDataKey() throws Exception {
+ testChangeLogOnDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testChangeLogOnIdDataKey() throws Exception {
+ testChangeLogOnIdDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testUpsertOnIdKey() throws Exception {
+ testUpsertOnIdKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testUpsertOnDataKey() throws Exception {
+ testUpsertOnDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testUpsertOnIdDataKey() throws Exception {
+ testUpsertOnIdDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ private void verifyOtherBranchUnmodified() {
+ String otherBranch =
+ branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+ if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+ Assert.assertNull(table.currentSnapshot());
+ }
+
+ Assert.assertTrue(table.snapshot(otherBranch) == null);
+ }
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 66baaeb0e9..a4f29d47f4 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private File flinkManifestFolder;
private final FileFormat format;
+ private final String branch;
- @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}")
+ @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}")
public static Object[][] parameters() {
return new Object[][] {
- new Object[] {"avro", 1},
- new Object[] {"avro", 2},
- new Object[] {"parquet", 1},
- new Object[] {"parquet", 2},
- new Object[] {"orc", 1},
- new Object[] {"orc", 2}
+ new Object[] {"avro", 1, "main"},
+ new Object[] {"avro", 2, "test-branch"},
+ new Object[] {"parquet", 1, "main"},
+ new Object[] {"parquet", 2, "test-branch"},
+ new Object[] {"orc", 1, "main"},
+ new Object[] {"orc", 2, "test-branch"}
};
}
- public TestIcebergFilesCommitter(String format, int formatVersion) {
+ public TestIcebergFilesCommitter(String format, int formatVersion, String branch) {
super(formatVersion);
this.format = FileFormat.fromString(format);
+ this.branch = branch;
}
@Override
@@ -125,7 +127,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.open();
operatorId = harness.getOperator().getOperatorID();
- SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
+ SimpleDataUtil.assertTableRows(table, Lists.newArrayList(), branch);
assertSnapshotSize(0);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
@@ -204,12 +206,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(i);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
assertSnapshotSize(i);
assertMaxCommittedCheckpointId(jobID, operatorId, i);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
- table.currentSnapshot().summary().get("flink.test"));
+ SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
}
}
}
@@ -255,13 +257,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
assertFlinkManifests(1);
// 4. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
@@ -308,13 +310,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
// 4. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
@@ -348,7 +350,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row), branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
@@ -360,7 +362,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.initializeState(snapshot);
harness.open();
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
@@ -375,7 +377,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
@@ -406,7 +408,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.processElement(of(dataFile), ++timestamp);
snapshot = harness.snapshot(++checkpointId, ++timestamp);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of());
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
assertFlinkManifests(1);
}
@@ -421,7 +423,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// transaction.
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
harness.snapshot(++checkpointId, ++timestamp);
@@ -431,7 +433,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
@@ -459,7 +461,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(3);
RowData row = SimpleDataUtil.createRowData(3, "foo");
@@ -473,7 +475,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
}
@@ -509,7 +511,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(i);
assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
}
@@ -540,7 +542,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
}
@@ -577,7 +579,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId + 1);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(i + 1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
}
@@ -628,7 +630,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
assertFlinkManifests(1);
// Only the first row is committed at this point
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
@@ -651,7 +653,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// transaction.
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -675,7 +677,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness2.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -702,12 +704,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
((BoundedOneInput) harness.getOneInputOperator()).endInput();
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
- table.currentSnapshot().summary().get("flink.test"));
+ SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
}
}
@@ -748,7 +750,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
@@ -794,7 +796,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
@@ -816,7 +818,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 6. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
@@ -867,7 +869,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// Notify the 2nd snapshot to complete.
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
Assert.assertEquals(
@@ -951,7 +953,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
table.refresh();
long actualId =
IcebergFilesCommitter.getMaxCommittedCheckpointId(
- table, jobID.toString(), operatorID.toHexString());
+ table, jobID.toString(), operatorID.toHexString(), branch);
Assert.assertEquals(expectedId, actualId);
}
@@ -962,7 +964,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
throws Exception {
- TestOperatorFactory factory = TestOperatorFactory.of(table.location());
+ TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch);
return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
}
@@ -982,13 +984,15 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private static class TestOperatorFactory extends AbstractStreamOperatorFactory<Void>
implements OneInputStreamOperatorFactory<WriteResult, Void> {
private final String tablePath;
+ private final String branch;
- private TestOperatorFactory(String tablePath) {
+ private TestOperatorFactory(String tablePath, String branch) {
this.tablePath = tablePath;
+ this.branch = branch;
}
- private static TestOperatorFactory of(String tablePath) {
- return new TestOperatorFactory(tablePath);
+ private static TestOperatorFactory of(String tablePath, String branch) {
+ return new TestOperatorFactory(tablePath, branch);
}
@Override
@@ -1000,7 +1004,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
new TestTableLoader(tablePath),
false,
Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()),
- ThreadPools.WORKER_THREAD_POOL_SIZE);
+ ThreadPools.WORKER_THREAD_POOL_SIZE,
+ branch);
committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
return (T) committer;
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 448b2aa2d8..4b5c7e4a0d 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -172,4 +172,12 @@ public class FlinkWriteConf {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}
+
+ public String branch() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.BRANCH.key())
+ .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
+ .parse();
+ }
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index f3cc52972b..86cb2fb0eb 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.SnapshotRef;
/** Flink sink write options */
public class FlinkWriteOptions {
@@ -56,4 +57,8 @@ public class FlinkWriteOptions {
// Overrides the table's write.distribution-mode
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+
+ // Branch to write to
+ public static final ConfigOption<String> BRANCH =
+ ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 81706e5824..445b6a6ff9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -316,6 +316,11 @@ public class FlinkSink {
return this;
}
+ public Builder toBranch(String branch) {
+ writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
+ return this;
+ }
+
private <T> DataStreamSink<T> chainIcebergOperators() {
Preconditions.checkArgument(
inputCreator != null,
@@ -422,7 +427,8 @@ public class FlinkSink {
tableLoader,
flinkWriteConf.overwriteMode(),
snapshotProperties,
- flinkWriteConf.workerPoolSize());
+ flinkWriteConf.workerPoolSize(),
+ flinkWriteConf.branch());
SingleOutputStreamOperator<Void> committerStream =
writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index d8a7bc5cf2..22b4dc9d21 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -95,6 +95,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// The completed files cache for current checkpoint. Once the snapshot barrier received, it will
// be flushed to the 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+ private final String branch;
// It will have an unique identifier for one job.
private transient String flinkJobId;
@@ -125,11 +126,13 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
TableLoader tableLoader,
boolean replacePartitions,
Map<String, String> snapshotProperties,
- Integer workerPoolSize) {
+ Integer workerPoolSize,
+ String branch) {
this.tableLoader = tableLoader;
this.replacePartitions = replacePartitions;
this.snapshotProperties = snapshotProperties;
this.workerPoolSize = workerPoolSize;
+ this.branch = branch;
}
@Override
@@ -179,7 +182,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// it's safe to assign the max committed checkpoint id from restored flink job to the current
// flink job.
this.maxCommittedCheckpointId =
- getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId);
+ getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch);
NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -230,7 +233,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
- LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
@@ -286,8 +288,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
- } else {
- LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
}
}
@@ -386,10 +386,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
String operatorId,
long checkpointId) {
LOG.info(
- "Committing {} for checkpoint {} to table {} with summary: {}",
+ "Committing {} for checkpoint {} to table {} branch {} with summary: {}",
description,
checkpointId,
table.name(),
+ branch,
summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict with internal ones
@@ -397,14 +398,16 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
+ operation.toBranch(branch);
long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
LOG.info(
- "Committed {} to table: {}, checkpointId {} in {} ms",
+ "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
description,
table.name(),
+ branch,
checkpointId,
durationMs);
committerMetrics.commitDuration(durationMs);
@@ -474,8 +477,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}
- static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
- Snapshot snapshot = table.currentSnapshot();
+ static long getMaxCommittedCheckpointId(
+ Table table, String flinkJobId, String operatorId, String branch) {
+ Snapshot snapshot = table.snapshot(branch);
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
while (snapshot != null) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index e296763508..345d88a48a 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
@@ -206,12 +207,18 @@ public class SimpleDataUtil {
return records;
}
- public static void assertTableRows(String tablePath, List<RowData> expected) throws IOException {
- assertTableRecords(tablePath, convertToRecords(expected));
+ public static void assertTableRows(String tablePath, List<RowData> expected, String branch)
+ throws IOException {
+ assertTableRecords(tablePath, convertToRecords(expected), branch);
}
public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
- assertTableRecords(table, convertToRecords(expected));
+ assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRows(Table table, List<RowData> expected, String branch)
+ throws IOException {
+ assertTableRecords(table, convertToRecords(expected), branch);
}
/** Get all rows for a table */
@@ -267,13 +274,25 @@ public class SimpleDataUtil {
}
public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
+ assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRecords(Table table, List<Record> expected, String branch)
+ throws IOException {
table.refresh();
+ Snapshot snapshot = latestSnapshot(table, branch);
+
+ if (snapshot == null) {
+ Assert.assertEquals(expected, ImmutableList.of());
+ return;
+ }
Types.StructType type = table.schema().asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
- try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+ try (CloseableIterable<Record> iterable =
+ IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
StructLikeSet actualSet = StructLikeSet.create(type);
for (Record record : iterable) {
@@ -284,10 +303,27 @@ public class SimpleDataUtil {
}
}
+ // Returns the latest snapshot of the given branch in the table
+ public static Snapshot latestSnapshot(Table table, String branch) {
+ // For the main branch, currentSnapshot() is used to validate that the API behavior has
+ // not changed since that was the API used for validation prior to addition of branches.
+ if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ return table.currentSnapshot();
+ }
+
+ return table.snapshot(branch);
+ }
+
public static void assertTableRecords(String tablePath, List<Record> expected)
throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
- assertTableRecords(new HadoopTables().load(tablePath), expected);
+ assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH);
+ }
+
+ public static void assertTableRecords(String tablePath, List<Record> expected, String branch)
+ throws IOException {
+ Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
+ assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
}
public static StructLikeSet expectedRowSet(Table table, Record... records) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index c2af30d342..23beb19a72 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -22,14 +22,10 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
@@ -45,7 +41,6 @@ import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
-import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -60,7 +55,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public class TestFlinkIcebergSink {
+public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,13 +67,6 @@ public class TestFlinkIcebergSink {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
- private static final TypeInformation<Row> ROW_TYPE_INFO =
- new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
- private static final DataFormatConverters.RowConverter CONVERTER =
- new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
-
- private Table table;
- private StreamExecutionEnvironment env;
private TableLoader tableLoader;
private final FileFormat format;
@@ -132,14 +120,6 @@ public class TestFlinkIcebergSink {
tableLoader = catalogResource.tableLoader();
}
- private List<RowData> convertToRowData(List<Row> rows) {
- return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
- }
-
- private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
- return new BoundedTestSource<>(rows.toArray(new Row[0]));
- }
-
@Test
public void testWriteRowData() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
@@ -160,19 +140,6 @@ public class TestFlinkIcebergSink {
SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}
- private List<Row> createRows(String prefix) {
- return Lists.newArrayList(
- Row.of(1, prefix + "aaa"),
- Row.of(1, prefix + "bbb"),
- Row.of(1, prefix + "ccc"),
- Row.of(2, prefix + "aaa"),
- Row.of(2, prefix + "bbb"),
- Row.of(2, prefix + "ccc"),
- Row.of(3, prefix + "aaa"),
- Row.of(3, prefix + "bbb"),
- Row.of(3, prefix + "ccc"));
- }
-
private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
throws Exception {
List<Row> rows = createRows("");
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
new file mode 100644
index 0000000000..b38aa6b50c
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class TestFlinkIcebergSinkBase {
+
+ protected Table table;
+ protected StreamExecutionEnvironment env;
+ protected static final TypeInformation<Row> ROW_TYPE_INFO =
+ new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+ protected static final DataFormatConverters.RowConverter CONVERTER =
+ new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+ protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+ return new BoundedTestSource<>(rows.toArray(new Row[0]));
+ }
+
+ protected List<Row> createRows(String prefix) {
+ return Lists.newArrayList(
+ Row.of(1, prefix + "aaa"),
+ Row.of(1, prefix + "bbb"),
+ Row.of(1, prefix + "ccc"),
+ Row.of(2, prefix + "aaa"),
+ Row.of(2, prefix + "bbb"),
+ Row.of(2, prefix + "ccc"),
+ Row.of(3, prefix + "aaa"),
+ Row.of(3, prefix + "bbb"),
+ Row.of(3, prefix + "ccc"));
+ }
+
+ protected List<RowData> convertToRowData(List<Row> rows) {
+ return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
new file mode 100644
index 0000000000..16b4542b00
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final HadoopCatalogResource catalogResource =
+ new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ private final String branch;
+ private TableLoader tableLoader;
+
+ @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+ public static Object[] parameters() {
+ return new Object[] {"main", "testBranch"};
+ }
+
+ public TestFlinkIcebergSinkBranch(String branch) {
+ this.branch = branch;
+ }
+
+ @Before
+ public void before() throws IOException {
+ table =
+ catalogResource
+ .catalog()
+ .createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ FileFormat.AVRO.name(),
+ TableProperties.FORMAT_VERSION,
+ "1"));
+
+ env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .enableCheckpointing(100);
+
+ tableLoader = catalogResource.tableLoader();
+ }
+
+ @Test
+ public void testWriteRowWithTableSchema() throws Exception {
+ testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+ verifyOtherBranchUnmodified();
+ }
+
+ private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+ throws Exception {
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .tableSchema(tableSchema)
+ .toBranch(branch)
+ .distributionMode(distributionMode)
+ .append();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream.");
+
+ SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+ SimpleDataUtil.assertTableRows(
+ table,
+ ImmutableList.of(),
+ branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+ verifyOtherBranchUnmodified();
+ }
+
+ private void verifyOtherBranchUnmodified() {
+ String otherBranch =
+ branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+ if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+ Assert.assertNull(table.currentSnapshot());
+ }
+
+ Assert.assertTrue(table.snapshot(otherBranch) == null);
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 5d8a56ebbf..422bd97cd7 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,38 +18,25 @@
*/
package org.apache.iceberg.flink.sink;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.IcebergGenerics;
-import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
-import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
-import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -60,7 +47,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,29 +59,6 @@ public class TestFlinkIcebergSinkV2 {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
- private static final int FORMAT_V2 = 2;
- private static final TypeInformation<Row> ROW_TYPE_INFO =
- new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
-
- private static final Map<String, RowKind> ROW_KIND_MAP =
- ImmutableMap.of(
- "+I", RowKind.INSERT,
- "-D", RowKind.DELETE,
- "-U", RowKind.UPDATE_BEFORE,
- "+U", RowKind.UPDATE_AFTER);
-
- private static final int ROW_ID_POS = 0;
- private static final int ROW_DATA_POS = 1;
-
- private final FileFormat format;
- private final int parallelism;
- private final boolean partitioned;
- private final String writeDistributionMode;
-
- private Table table;
- private StreamExecutionEnvironment env;
- private TableLoader tableLoader;
-
@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
public static Object[][] parameters() {
@@ -155,67 +119,6 @@ public class TestFlinkIcebergSinkV2 {
tableLoader = catalogResource.tableLoader();
}
- private List<Snapshot> findValidSnapshots() {
- List<Snapshot> validSnapshots = Lists.newArrayList();
- for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests(table.io()).stream()
- .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
- validSnapshots.add(snapshot);
- }
- }
- return validSnapshots;
- }
-
- private void testChangeLogs(
- List<String> equalityFieldColumns,
- KeySelector<Row, Object> keySelector,
- boolean insertAsUpsert,
- List<List<Row>> elementsPerCheckpoint,
- List<List<Record>> expectedRecordsPerCheckpoint)
- throws Exception {
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .tableLoader(tableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .equalityFieldColumns(equalityFieldColumns)
- .upsert(insertAsUpsert)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg Change-Log DataStream.");
-
- table.refresh();
- List<Snapshot> snapshots = findValidSnapshots();
- int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
- Assert.assertEquals(
- "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
- for (int i = 0; i < expectedSnapshotNum; i++) {
- long snapshotId = snapshots.get(i).snapshotId();
- List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
- Assert.assertEquals(
- "Should have the expected records for the checkpoint#" + i,
- expectedRowSet(expectedRecords.toArray(new Record[0])),
- actualRowSet(snapshotId, "*"));
- }
- }
-
- private Row row(String rowKind, int id, String data) {
- RowKind kind = ROW_KIND_MAP.get(rowKind);
- if (kind == null) {
- throw new IllegalArgumentException("Unknown row kind: " + rowKind);
- }
-
- return Row.ofKind(kind, id, data);
- }
-
- private Record record(int id, String data) {
- return SimpleDataUtil.createRecord(id, data);
- }
-
@Test
public void testCheckAndGetEqualityFieldIds() {
table
@@ -249,108 +152,17 @@ public class TestFlinkIcebergSinkV2 {
@Test
public void testChangeLogOnIdKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(
- row("+I", 1, "aaa"),
- row("-D", 1, "aaa"),
- row("+I", 1, "bbb"),
- row("+I", 2, "aaa"),
- row("-D", 2, "aaa"),
- row("+I", 2, "bbb")),
- ImmutableList.of(
- row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
- ImmutableList.of(
- row("-D", 1, "bbb"),
- row("+I", 1, "ccc"),
- row("-D", 1, "ccc"),
- row("+I", 1, "ddd")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
- ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
- ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
-
- if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- return null;
- });
- } else {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testChangeLogOnDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(
- row("+I", 1, "aaa"),
- row("-D", 1, "aaa"),
- row("+I", 2, "bbb"),
- row("+I", 1, "bbb"),
- row("+I", 2, "aaa")),
- ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
- ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
- ImmutableList.of(
- record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")));
-
- testChangeLogs(
- ImmutableList.of("data"),
- row -> row.getField(ROW_DATA_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testChangeLogOnIdDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(
- row("+I", 1, "aaa"),
- row("-D", 1, "aaa"),
- row("+I", 2, "bbb"),
- row("+I", 1, "bbb"),
- row("+I", 2, "aaa")),
- ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
- ImmutableList.of(
- record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
- ImmutableList.of(
- record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
- testChangeLogs(
- ImmutableList.of("data", "id"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
@Test
@@ -373,30 +185,7 @@ public class TestFlinkIcebergSinkV2 {
@Test
public void testChangeLogOnSameKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- // Checkpoint #1
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #2
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
- // Checkpoint #3
- ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #4
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
+ testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH);
}
@Test
@@ -426,97 +215,16 @@ public class TestFlinkIcebergSinkV2 {
@Test
public void testUpsertOnIdKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
- ImmutableList.of(row("+I", 1, "ccc")),
- ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb")),
- ImmutableList.of(record(1, "ccc")),
- ImmutableList.of(record(1, "eee")));
-
- if (!partitioned) {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- } else {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- return null;
- });
- }
+ testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testUpsertOnDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
- ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
- ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
- ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
- ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
-
- testChangeLogs(
- ImmutableList.of("data"),
- row -> row.getField(ROW_DATA_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
+ testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH);
}
@Test
public void testUpsertOnIdDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
- ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
- ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
- ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
-
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- }
-
- private StructLikeSet expectedRowSet(Record... records) {
- return SimpleDataUtil.expectedRowSet(table, records);
- }
-
- private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException {
- table.refresh();
- StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
- try (CloseableIterable<Record> reader =
- IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) {
- reader.forEach(set::add);
- }
- return set;
+ testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
similarity index 60%
copy from flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
copy to flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index ccc3c0f23d..15380408e4 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -26,184 +26,50 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2Base {
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ protected static final int FORMAT_V2 = 2;
+ protected static final TypeInformation<Row> ROW_TYPE_INFO =
+ new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
- @Rule
- public final HadoopCatalogResource catalogResource =
- new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+ protected static final int ROW_ID_POS = 0;
+ protected static final int ROW_DATA_POS = 1;
- private static final int FORMAT_V2 = 2;
- private static final TypeInformation<Row> ROW_TYPE_INFO =
- new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+ protected int parallelism = 1;
+ protected TableLoader tableLoader;
+ protected Table table;
+ protected StreamExecutionEnvironment env;
+ protected FileFormat format;
+ protected boolean partitioned;
+ protected String writeDistributionMode;
- private static final Map<String, RowKind> ROW_KIND_MAP =
+ protected static final Map<String, RowKind> ROW_KIND_MAP =
ImmutableMap.of(
"+I", RowKind.INSERT,
"-D", RowKind.DELETE,
"-U", RowKind.UPDATE_BEFORE,
"+U", RowKind.UPDATE_AFTER);
- private static final int ROW_ID_POS = 0;
- private static final int ROW_DATA_POS = 1;
-
- private final FileFormat format;
- private final int parallelism;
- private final boolean partitioned;
- private final String writeDistributionMode;
-
- private Table table;
- private StreamExecutionEnvironment env;
- private TableLoader tableLoader;
-
- @Parameterized.Parameters(
- name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
- public static Object[][] parameters() {
- return new Object[][] {
- new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
- new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
- new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
- new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
- };
- }
-
- public TestFlinkIcebergSinkV2(
- String format, int parallelism, boolean partitioned, String writeDistributionMode) {
- this.format = FileFormat.fromString(format);
- this.parallelism = parallelism;
- this.partitioned = partitioned;
- this.writeDistributionMode = writeDistributionMode;
- }
-
- @Before
- public void setupTable() {
- table =
- catalogResource
- .catalog()
- .createTable(
- TestFixtures.TABLE_IDENTIFIER,
- SimpleDataUtil.SCHEMA,
- partitioned
- ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
- : PartitionSpec.unpartitioned(),
- ImmutableMap.of(
- TableProperties.DEFAULT_FILE_FORMAT,
- format.name(),
- TableProperties.FORMAT_VERSION,
- String.valueOf(FORMAT_V2)));
-
- table
- .updateProperties()
- .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
- .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode)
- .commit();
-
- env =
- StreamExecutionEnvironment.getExecutionEnvironment(
- MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
- .enableCheckpointing(100L)
- .setParallelism(parallelism)
- .setMaxParallelism(parallelism);
-
- tableLoader = catalogResource.tableLoader();
- }
-
- private List<Snapshot> findValidSnapshots() {
- List<Snapshot> validSnapshots = Lists.newArrayList();
- for (Snapshot snapshot : table.snapshots()) {
- if (snapshot.allManifests(table.io()).stream()
- .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
- validSnapshots.add(snapshot);
- }
- }
- return validSnapshots;
- }
-
- private void testChangeLogs(
- List<String> equalityFieldColumns,
- KeySelector<Row, Object> keySelector,
- boolean insertAsUpsert,
- List<List<Row>> elementsPerCheckpoint,
- List<List<Record>> expectedRecordsPerCheckpoint)
- throws Exception {
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .tableLoader(tableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .equalityFieldColumns(equalityFieldColumns)
- .upsert(insertAsUpsert)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg Change-Log DataStream.");
-
- table.refresh();
- List<Snapshot> snapshots = findValidSnapshots();
- int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
- Assert.assertEquals(
- "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
- for (int i = 0; i < expectedSnapshotNum; i++) {
- long snapshotId = snapshots.get(i).snapshotId();
- List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
- Assert.assertEquals(
- "Should have the expected records for the checkpoint#" + i,
- expectedRowSet(expectedRecords.toArray(new Record[0])),
- actualRowSet(snapshotId, "*"));
- }
- }
-
- private Row row(String rowKind, int id, String data) {
+ protected Row row(String rowKind, int id, String data) {
RowKind kind = ROW_KIND_MAP.get(rowKind);
if (kind == null) {
throw new IllegalArgumentException("Unknown row kind: " + rowKind);
@@ -212,92 +78,85 @@ public class TestFlinkIcebergSinkV2 {
return Row.ofKind(kind, id, data);
}
- private Record record(int id, String data) {
- return SimpleDataUtil.createRecord(id, data);
- }
-
- @Test
- public void testCheckAndGetEqualityFieldIds() {
- table
- .updateSchema()
- .allowIncompatibleChanges()
- .addRequiredColumn("type", Types.StringType.get())
- .setIdentifierFields("type")
- .commit();
-
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
-
- // Use schema identifier field IDs as equality field id list by default
- Assert.assertEquals(
- table.schema().identifierFieldIds(),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
-
- // Use user-provided equality field column as equality field id list
- builder.equalityFieldColumns(Lists.newArrayList("id"));
- Assert.assertEquals(
- Sets.newHashSet(table.schema().findField("id").fieldId()),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ protected void testUpsertOnIdDataKey(String branch) throws Exception {
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
+ ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
+ ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
- builder.equalityFieldColumns(Lists.newArrayList("type"));
- Assert.assertEquals(
- Sets.newHashSet(table.schema().findField("type").fieldId()),
- Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+ List<List<Record>> expectedRecords =
+ ImmutableList.of(
+ ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+ ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+ ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+ testChangeLogs(
+ ImmutableList.of("id", "data"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ true,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
}
- @Test
- public void testChangeLogOnIdKey() throws Exception {
+ protected void testChangeLogOnIdDataKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("-D", 1, "aaa"),
+ row("+I", 2, "bbb"),
row("+I", 1, "bbb"),
- row("+I", 2, "aaa"),
- row("-D", 2, "aaa"),
- row("+I", 2, "bbb")),
+ row("+I", 2, "aaa")),
+ ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
+ ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
+
+ List<List<Record>> expectedRecords =
+ ImmutableList.of(
+ ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
ImmutableList.of(
- row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
+ record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
ImmutableList.of(
- row("-D", 1, "bbb"),
- row("+I", 1, "ccc"),
- row("-D", 1, "ccc"),
- row("+I", 1, "ddd")));
+ record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
+
+ testChangeLogs(
+ ImmutableList.of("data", "id"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
+ }
+
+ protected void testChangeLogOnSameKey(String branch) throws Exception {
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ // Checkpoint #1
+ ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+ // Checkpoint #2
+ ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
+ // Checkpoint #3
+ ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+ // Checkpoint #4
+ ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
List<List<Record>> expectedRecords =
ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
- ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
- ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
+ ImmutableList.of(record(1, "aaa")),
+ ImmutableList.of(record(1, "aaa")),
+ ImmutableList.of(record(1, "aaa")),
+ ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
- if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- return null;
- });
- } else {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ testChangeLogs(
+ ImmutableList.of("id", "data"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
}
- @Test
- public void testChangeLogOnDataKey() throws Exception {
+ protected void testChangeLogOnDataKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(
@@ -321,111 +180,83 @@ public class TestFlinkIcebergSinkV2 {
row -> row.getField(ROW_DATA_POS),
false,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
}
- @Test
- public void testUpsertOnlyDeletesOnDataKey() throws Exception {
+ protected void testUpsertOnDataKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb")));
+ ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
+ ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
+ ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
List<List<Record>> expectedRecords =
- ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of());
+ ImmutableList.of(
+ ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+ ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+ ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
testChangeLogs(
ImmutableList.of("data"),
row -> row.getField(ROW_DATA_POS),
true,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
}
- @Test
- public void testChangeLogOnIdDataKey() throws Exception {
+ protected void testChangeLogOnIdKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(
row("+I", 1, "aaa"),
row("-D", 1, "aaa"),
- row("+I", 2, "bbb"),
row("+I", 1, "bbb"),
- row("+I", 2, "aaa")),
- ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
- ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+ row("+I", 2, "aaa"),
+ row("-D", 2, "aaa"),
+ row("+I", 2, "bbb")),
ImmutableList.of(
- record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+ row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
ImmutableList.of(
- record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
- testChangeLogs(
- ImmutableList.of("data", "id"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
-
- @Test
- public void testChangeLogOnSameKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- // Checkpoint #1
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #2
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
- // Checkpoint #3
- ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
- // Checkpoint #4
- ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
+ row("-D", 1, "bbb"),
+ row("+I", 1, "ccc"),
+ row("-D", 1, "ccc"),
+ row("+I", 1, "ddd")));
List<List<Record>> expectedRecords =
ImmutableList.of(
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa")),
- ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- false,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+ ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+ ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
- @Test
- public void testUpsertModeCheck() throws Exception {
- DataStream<Row> dataStream =
- env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .tableLoader(tableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .upsert(true);
-
- AssertHelpers.assertThrows(
- "Should be error because upsert mode and overwrite mode enable at the same time.",
- IllegalStateException.class,
- "OVERWRITE mode shouldn't be enable",
- () ->
- builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append());
-
- AssertHelpers.assertThrows(
- "Should be error because equality field columns are empty.",
- IllegalStateException.class,
- "Equality field columns shouldn't be empty",
- () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+ if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
+ AssertHelpers.assertThrows(
+ "Should be error because equality field columns don't include all partition keys",
+ IllegalStateException.class,
+ "should be included in equality fields",
+ () -> {
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
+ return null;
+ });
+ } else {
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch);
+ }
}
- @Test
- public void testUpsertOnIdKey() throws Exception {
+ protected void testUpsertOnIdKey(String branch) throws Exception {
List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
@@ -444,7 +275,8 @@ public class TestFlinkIcebergSinkV2 {
row -> row.getField(ROW_ID_POS),
true,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
} else {
AssertHelpers.assertThrows(
"Should be error because equality field columns don't include all partition keys",
@@ -456,54 +288,65 @@ public class TestFlinkIcebergSinkV2 {
row -> row.getField(ROW_ID_POS),
true,
elementsPerCheckpoint,
- expectedRecords);
+ expectedRecords,
+ branch);
return null;
});
}
}
- @Test
- public void testUpsertOnDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
- ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
- ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
+ protected void testChangeLogs(
+ List<String> equalityFieldColumns,
+ KeySelector<Row, Object> keySelector,
+ boolean insertAsUpsert,
+ List<List<Row>> elementsPerCheckpoint,
+ List<List<Record>> expectedRecordsPerCheckpoint,
+ String branch)
+ throws Exception {
+ DataStream<Row> dataStream =
+ env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
- ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
- ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .tableLoader(tableLoader)
+ .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+ .writeParallelism(parallelism)
+ .equalityFieldColumns(equalityFieldColumns)
+ .upsert(insertAsUpsert)
+ .toBranch(branch)
+ .append();
- testChangeLogs(
- ImmutableList.of("data"),
- row -> row.getField(ROW_DATA_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords);
- }
+ // Execute the program.
+ env.execute("Test Iceberg Change-Log DataStream.");
- @Test
- public void testUpsertOnIdDataKey() throws Exception {
- List<List<Row>> elementsPerCheckpoint =
- ImmutableList.of(
- ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
- ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
- ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
+ table.refresh();
+ List<Snapshot> snapshots = findValidSnapshots();
+ int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+ Assert.assertEquals(
+ "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
- List<List<Record>> expectedRecords =
- ImmutableList.of(
- ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
- ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
- ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+ for (int i = 0; i < expectedSnapshotNum; i++) {
+ long snapshotId = snapshots.get(i).snapshotId();
+ List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+ Assert.assertEquals(
+ "Should have the expected records for the checkpoint#" + i,
+ expectedRowSet(expectedRecords.toArray(new Record[0])),
+ actualRowSet(snapshotId, "*"));
+ }
+ }
- testChangeLogs(
- ImmutableList.of("id", "data"),
- row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- true,
- elementsPerCheckpoint,
- expectedRecords);
+ protected Record record(int id, String data) {
+ return SimpleDataUtil.createRecord(id, data);
+ }
+
+ private List<Snapshot> findValidSnapshots() {
+ List<Snapshot> validSnapshots = Lists.newArrayList();
+ for (Snapshot snapshot : table.snapshots()) {
+ if (snapshot.allManifests(table.io()).stream()
+ .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+ validSnapshots.add(snapshot);
+ }
+ }
+ return validSnapshots;
}
private StructLikeSet expectedRowSet(Record... records) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
new file mode 100644
index 0000000000..fed3338482
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final HadoopCatalogResource catalogResource =
+ new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ private final String branch;
+
+ @Parameterized.Parameters(name = "branch = {0}")
+ public static Object[] parameters() {
+ return new Object[] {"main", "testBranch"};
+ }
+
+ public TestFlinkIcebergSinkV2Branch(String branch) {
+ this.branch = branch;
+ }
+
+ @Before
+ public void before() throws IOException {
+ table =
+ catalogResource
+ .catalog()
+ .createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ SimpleDataUtil.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ FileFormat.AVRO.name(),
+ TableProperties.FORMAT_VERSION,
+ "2"));
+
+ env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .enableCheckpointing(100);
+
+ tableLoader = catalogResource.tableLoader();
+ }
+
+ @Test
+ public void testChangeLogOnIdKey() throws Exception {
+ testChangeLogOnIdKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testChangeLogOnDataKey() throws Exception {
+ testChangeLogOnDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testChangeLogOnIdDataKey() throws Exception {
+ testChangeLogOnIdDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testUpsertOnIdKey() throws Exception {
+ testUpsertOnIdKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testUpsertOnDataKey() throws Exception {
+ testUpsertOnDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ @Test
+ public void testUpsertOnIdDataKey() throws Exception {
+ testUpsertOnIdDataKey(branch);
+ verifyOtherBranchUnmodified();
+ }
+
+ private void verifyOtherBranchUnmodified() {
+ String otherBranch =
+ branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+ if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+ Assert.assertNull(table.currentSnapshot());
+ }
+
+ Assert.assertTrue(table.snapshot(otherBranch) == null);
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 66baaeb0e9..a4f29d47f4 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private File flinkManifestFolder;
private final FileFormat format;
+ private final String branch;
- @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}")
+ @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}")
public static Object[][] parameters() {
return new Object[][] {
- new Object[] {"avro", 1},
- new Object[] {"avro", 2},
- new Object[] {"parquet", 1},
- new Object[] {"parquet", 2},
- new Object[] {"orc", 1},
- new Object[] {"orc", 2}
+ new Object[] {"avro", 1, "main"},
+ new Object[] {"avro", 2, "test-branch"},
+ new Object[] {"parquet", 1, "main"},
+ new Object[] {"parquet", 2, "test-branch"},
+ new Object[] {"orc", 1, "main"},
+ new Object[] {"orc", 2, "test-branch"}
};
}
- public TestIcebergFilesCommitter(String format, int formatVersion) {
+ public TestIcebergFilesCommitter(String format, int formatVersion, String branch) {
super(formatVersion);
this.format = FileFormat.fromString(format);
+ this.branch = branch;
}
@Override
@@ -125,7 +127,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.open();
operatorId = harness.getOperator().getOperatorID();
- SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
+ SimpleDataUtil.assertTableRows(table, Lists.newArrayList(), branch);
assertSnapshotSize(0);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
@@ -204,12 +206,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(i);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
assertSnapshotSize(i);
assertMaxCommittedCheckpointId(jobID, operatorId, i);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
- table.currentSnapshot().summary().get("flink.test"));
+ SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
}
}
}
@@ -255,13 +257,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
assertFlinkManifests(1);
// 4. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
@@ -308,13 +310,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
// 4. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
@@ -348,7 +350,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row), branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
@@ -360,7 +362,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.initializeState(snapshot);
harness.open();
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
@@ -375,7 +377,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
@@ -406,7 +408,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.processElement(of(dataFile), ++timestamp);
snapshot = harness.snapshot(++checkpointId, ++timestamp);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of());
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
assertFlinkManifests(1);
}
@@ -421,7 +423,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// transaction.
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
harness.snapshot(++checkpointId, ++timestamp);
@@ -431,7 +433,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
@@ -459,7 +461,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(3);
RowData row = SimpleDataUtil.createRowData(3, "foo");
@@ -473,7 +475,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
}
@@ -509,7 +511,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(i);
assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
}
@@ -540,7 +542,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
}
@@ -577,7 +579,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness.notifyOfCompletedCheckpoint(checkpointId + 1);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(i + 1);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
}
@@ -628,7 +630,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
assertFlinkManifests(1);
// Only the first row is committed at this point
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
@@ -651,7 +653,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// transaction.
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(2);
assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -675,7 +677,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
harness2.notifyOfCompletedCheckpoint(checkpointId);
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, expectedRows);
+ SimpleDataUtil.assertTableRows(table, expectedRows, branch);
assertSnapshotSize(4);
assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -702,12 +704,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
((BoundedOneInput) harness.getOneInputOperator()).endInput();
assertFlinkManifests(0);
- SimpleDataUtil.assertTableRows(table, tableRows);
+ SimpleDataUtil.assertTableRows(table, tableRows, branch);
assertSnapshotSize(1);
assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
- table.currentSnapshot().summary().get("flink.test"));
+ SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
}
}
@@ -748,7 +750,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
@@ -794,7 +796,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
@@ -816,7 +818,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 6. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
@@ -867,7 +869,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// Notify the 2nd snapshot to complete.
harness.notifyOfCompletedCheckpoint(checkpoint);
- SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4));
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
Assert.assertEquals(
@@ -951,7 +953,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
table.refresh();
long actualId =
IcebergFilesCommitter.getMaxCommittedCheckpointId(
- table, jobID.toString(), operatorID.toHexString());
+ table, jobID.toString(), operatorID.toHexString(), branch);
Assert.assertEquals(expectedId, actualId);
}
@@ -962,7 +964,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
throws Exception {
- TestOperatorFactory factory = TestOperatorFactory.of(table.location());
+ TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch);
return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
}
@@ -982,13 +984,15 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private static class TestOperatorFactory extends AbstractStreamOperatorFactory<Void>
implements OneInputStreamOperatorFactory<WriteResult, Void> {
private final String tablePath;
+ private final String branch;
- private TestOperatorFactory(String tablePath) {
+ private TestOperatorFactory(String tablePath, String branch) {
this.tablePath = tablePath;
+ this.branch = branch;
}
- private static TestOperatorFactory of(String tablePath) {
- return new TestOperatorFactory(tablePath);
+ private static TestOperatorFactory of(String tablePath, String branch) {
+ return new TestOperatorFactory(tablePath, branch);
}
@Override
@@ -1000,7 +1004,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
new TestTableLoader(tablePath),
false,
Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()),
- ThreadPools.WORKER_THREAD_POOL_SIZE);
+ ThreadPools.WORKER_THREAD_POOL_SIZE,
+ branch);
committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
return (T) committer;
}