You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/20 17:00:11 UTC
[iceberg] branch master updated: Flink: Support
write.distribution-mode (#2064)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c75ac35 Flink: Support write.distribution-mode (#2064)
c75ac35 is described below
commit c75ac359c1de6bf9fd4894b40009c5c42d2fee9d
Author: openinx <op...@gmail.com>
AuthorDate: Thu Jan 21 00:59:55 2021 +0800
Flink: Support write.distribution-mode (#2064)
---
.../java/org/apache/iceberg/DistributionMode.java | 55 +++++++++++++
.../java/org/apache/iceberg/TableProperties.java | 3 +
.../org/apache/iceberg/flink/sink/FlinkSink.java | 89 +++++++++++++++++---
.../iceberg/flink/sink/PartitionKeySelector.java | 64 +++++++++++++++
.../apache/iceberg/flink/FlinkCatalogTestBase.java | 11 ++-
.../apache/iceberg/flink/TestFlinkTableSink.java | 65 +++++++++++++++
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 94 ++++++++++++++++++++--
.../flink/sink/TestIcebergStreamWriter.java | 4 +-
8 files changed, 366 insertions(+), 19 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/DistributionMode.java b/api/src/main/java/org/apache/iceberg/DistributionMode.java
new file mode 100644
index 0000000..fbe6c6a
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/DistributionMode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Locale;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash: hash distribute by partition key, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is suitable
+ * for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+ NONE("none"), HASH("hash"), RANGE("range");
+
+ private final String modeName;
+
+ DistributionMode(String modeName) {
+ this.modeName = modeName;
+ }
+
+ public String modeName() {
+ return modeName;
+ }
+
+ public static DistributionMode fromName(String modeName) {
+ Preconditions.checkNotNull(modeName, "Name of distribution mode should not be null");
+ return DistributionMode.valueOf(modeName.toUpperCase(Locale.ENGLISH));
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index a60992c..0dc5232 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -138,6 +138,9 @@ public class TableProperties {
public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
+ public static final String WRITE_DISTRIBUTION_MODE = "write.distribution-mode";
+ public static final String WRITE_DISTRIBUTION_MODE_DEFAULT = "none";
+
public static final String GC_ENABLED = "gc.enabled";
public static final boolean GC_ENABLED_DEFAULT = true;
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 8c4486a..c691e58 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -38,7 +38,9 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -48,13 +50,18 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
public class FlinkSink {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
@@ -114,6 +121,7 @@ public class FlinkSink {
private Table table;
private TableSchema tableSchema;
private boolean overwrite = false;
+ private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
private List<String> equalityFieldColumns = null;
@@ -162,6 +170,20 @@ public class FlinkSink {
}
/**
+ * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support
+ * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
+ *
+ * @param mode to specify the write distribution mode.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder distributionMode(DistributionMode mode) {
+ Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
+ "Flink does not support 'range' write distribution mode now.");
+ this.distributionMode = mode;
+ return this;
+ }
+
+ /**
* Configuring the write parallel number for iceberg stream writer.
*
* @param newWriteParallelism the number of parallel iceberg stream writer.
@@ -209,7 +231,14 @@ public class FlinkSink {
}
}
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
+ // Convert the requested flink table schema to flink row type.
+ RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+ // Distribute the records from input data stream based on the write.distribution-mode.
+ rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+
+ // Chain the iceberg stream writer and committer operator.
+ IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
@@ -225,32 +254,70 @@ public class FlinkSink {
.name(String.format("IcebergSink %s", table.name()))
.setParallelism(1);
}
- }
- static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
- List<Integer> equalityFieldIds) {
- Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+ private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+ Map<String, String> properties,
+ PartitionSpec partitionSpec,
+ Schema iSchema,
+ RowType flinkRowType) {
+ DistributionMode writeMode;
+ if (distributionMode == null) {
+ // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+ String modeName = PropertyUtil.propertyAsString(properties,
+ WRITE_DISTRIBUTION_MODE,
+ WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+ writeMode = DistributionMode.fromName(modeName);
+ } else {
+ writeMode = distributionMode;
+ }
+
+ switch (writeMode) {
+ case NONE:
+ return input;
+
+ case HASH:
+ if (partitionSpec.isUnpartitioned()) {
+ return input;
+ } else {
+ return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ }
+
+ case RANGE:
+ LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
+ WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
+ return input;
+
+ default:
+ throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+ }
+ }
+ }
- RowType flinkSchema;
+ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
if (requestedSchema != null) {
// Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema.
- Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
- TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);
+ Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+ TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to
// iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will
// read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink
// schema.
- flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
+ return (RowType) requestedSchema.toRowDataType().getLogicalType();
} else {
- flinkSchema = FlinkSchemaUtil.convert(table.schema());
+ return FlinkSchemaUtil.convert(schema);
}
+ }
+ static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+ RowType flinkRowType,
+ List<Integer> equalityFieldIds) {
Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);
- TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema,
+ TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType,
table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props,
equalityFieldIds);
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
new file mode 100644
index 0000000..598df09
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+
+/**
+ * Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be wrote by only one
+ * task. That will reduce lots of small files in partitioned fanout write policy for {@link FlinkSink}.
+ */
+class PartitionKeySelector implements KeySelector<RowData, String> {
+
+ private final Schema schema;
+ private final PartitionKey partitionKey;
+ private final RowType flinkSchema;
+
+ private transient RowDataWrapper rowDataWrapper;
+
+ PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) {
+ this.schema = schema;
+ this.partitionKey = new PartitionKey(spec, schema);
+ this.flinkSchema = flinkSchema;
+ }
+
+ /**
+ * Construct the {@link RowDataWrapper} lazily here because few members in it are not serializable. In this way, we
+ * don't have to serialize them with forcing.
+ */
+ private RowDataWrapper lazyRowDataWrapper() {
+ if (rowDataWrapper == null) {
+ rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+ return rowDataWrapper;
+ }
+
+ @Override
+ public String getKey(RowData row) {
+ partitionKey.partition(lazyRowDataWrapper().wrap(row));
+ return partitionKey.toPath();
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index d96ef68..f27e7fb 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -101,17 +101,24 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
}
if (isHadoopCatalog) {
config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
- config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot());
} else {
config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
- config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
config.put(CatalogProperties.HIVE_URI, getURI(hiveConf));
}
+ config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));
this.flinkDatabase = catalogName + "." + DATABASE;
this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {DATABASE}));
}
+ protected String warehouseRoot() {
+ if (isHadoopCatalog) {
+ return hadoopWarehouse.getRoot().getAbsolutePath();
+ } else {
+ return hiveWarehouse.getRoot().getAbsolutePath();
+ }
+ }
+
static String getURI(HiveConf conf) {
return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index ae3a3c1..f064c53 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -19,19 +19,31 @@
package org.apache.iceberg.flink;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -80,6 +92,8 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(400);
+ env.setMaxParallelism(2);
+ env.setParallelism(2);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
} else {
settingsBuilder.inBatchMode();
@@ -220,4 +234,55 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
+
+ @Test
+ public void testHashDistributeMode() throws Exception {
+ String tableName = "test_hash_distribution_mode";
+
+ Map<String, String> tableProps = ImmutableMap.of(
+ "write.format.default", format.name(),
+ TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
+ );
+ sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableProps));
+
+ // Insert data set.
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
+ "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
+ "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+ SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
+ SimpleDataUtil.createRecord(1, "aaa"),
+ SimpleDataUtil.createRecord(1, "bbb"),
+ SimpleDataUtil.createRecord(1, "ccc"),
+ SimpleDataUtil.createRecord(2, "aaa"),
+ SimpleDataUtil.createRecord(2, "bbb"),
+ SimpleDataUtil.createRecord(2, "ccc"),
+ SimpleDataUtil.createRecord(3, "aaa"),
+ SimpleDataUtil.createRecord(3, "bbb"),
+ SimpleDataUtil.createRecord(3, "ccc")
+ ));
+
+ Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1,
+ partitionFiles(tableName, "aaa").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1,
+ partitionFiles(tableName, "bbb").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1,
+ partitionFiles(tableName, "ccc").size());
+
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+
+ private List<Path> partitionFiles(String table, String partition) throws IOException {
+ String databasePath = Joiner.on("/").join(baseNamespace.levels()) + "/" + DATABASE;
+ if (!isHadoopCatalog) {
+ databasePath = databasePath + ".db";
+ }
+ Path dir = Paths.get(warehouseRoot(), databasePath, table, "data", String.format("data=%s", partition));
+ return Files.list(dir)
+ .filter(p -> !p.toString().endsWith(".crc"))
+ .collect(Collectors.toList());
+ }
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 90ac3a2..7982f27 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.flink.sink;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -35,6 +38,8 @@ import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -144,10 +149,17 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
}
- private void testWriteRow(TableSchema tableSchema) throws Exception {
+ private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception {
List<Row> rows = Lists.newArrayList(
- Row.of(4, "bar"),
- Row.of(5, "apache")
+ Row.of(1, "aaa"),
+ Row.of(1, "bbb"),
+ Row.of(1, "ccc"),
+ Row.of(2, "aaa"),
+ Row.of(2, "bbb"),
+ Row.of(2, "ccc"),
+ Row.of(3, "aaa"),
+ Row.of(3, "bbb"),
+ Row.of(3, "ccc")
);
DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
@@ -156,6 +168,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.writeParallelism(parallelism)
+ .distributionMode(distributionMode)
.build();
// Execute the program.
@@ -164,13 +177,84 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
}
+ private List<Path> partitionFiles(String partition) throws IOException {
+ return Files.list(Paths.get(tablePath, "data", String.format("data=%s", partition)))
+ .filter(p -> !p.toString().endsWith(".crc"))
+ .collect(Collectors.toList());
+ }
+
@Test
public void testWriteRow() throws Exception {
- testWriteRow(null);
+ testWriteRow(null, DistributionMode.NONE);
}
@Test
public void testWriteRowWithTableSchema() throws Exception {
- testWriteRow(SimpleDataUtil.FLINK_SCHEMA);
+ testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+ }
+
+ @Test
+ public void testJobNoneDistributeMode() throws Exception {
+ table.updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
+ .commit();
+
+ testWriteRow(null, DistributionMode.NONE);
+
+ if (parallelism > 1) {
+ if (partitioned) {
+ int files = partitionFiles("aaa").size() + partitionFiles("bbb").size() + partitionFiles("ccc").size();
+ Assert.assertTrue("Should have more than 3 files in iceberg table.", files > 3);
+ }
+ }
+ }
+
+ @Test
+ public void testJobHashDistributionMode() {
+ table.updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
+ .commit();
+
+ AssertHelpers.assertThrows("Does not support range distribution-mode now.",
+ IllegalArgumentException.class, "Flink does not support 'range' write distribution mode now.",
+ () -> {
+ testWriteRow(null, DistributionMode.RANGE);
+ return null;
+ });
+ }
+
+ @Test
+ public void testJobNullDistributionMode() throws Exception {
+ table.updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
+ .commit();
+
+ testWriteRow(null, null);
+
+ if (partitioned) {
+ Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc").size());
+ }
+ }
+
+ @Test
+ public void testPartitionWriteMode() throws Exception {
+ testWriteRow(null, DistributionMode.HASH);
+ if (partitioned) {
+ Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc").size());
+ }
+ }
+
+ @Test
+ public void testShuffleByPartitionWithSchema() throws Exception {
+ testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
+ if (partitioned) {
+ Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb").size());
+ Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc").size());
+ }
}
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index c6c20e0..920cd69 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -337,7 +338,8 @@ public class TestIcebergStreamWriter {
private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema) throws Exception {
- IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema, null);
+ RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
+ IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null);
OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new OneInputStreamOperatorTestHarness<>(
streamWriter, 1, 1, 0);