You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/20 17:00:11 UTC

[iceberg] branch master updated: Flink: Support write.distribution-mode (#2064)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c75ac35  Flink: Support write.distribution-mode (#2064)
c75ac35 is described below

commit c75ac359c1de6bf9fd4894b40009c5c42d2fee9d
Author: openinx <op...@gmail.com>
AuthorDate: Thu Jan 21 00:59:55 2021 +0800

    Flink: Support write.distribution-mode (#2064)
---
 .../java/org/apache/iceberg/DistributionMode.java  | 55 +++++++++++++
 .../java/org/apache/iceberg/TableProperties.java   |  3 +
 .../org/apache/iceberg/flink/sink/FlinkSink.java   | 89 +++++++++++++++++---
 .../iceberg/flink/sink/PartitionKeySelector.java   | 64 +++++++++++++++
 .../apache/iceberg/flink/FlinkCatalogTestBase.java | 11 ++-
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 65 +++++++++++++++
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   | 94 ++++++++++++++++++++--
 .../flink/sink/TestIcebergStreamWriter.java        |  4 +-
 8 files changed, 366 insertions(+), 19 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/DistributionMode.java b/api/src/main/java/org/apache/iceberg/DistributionMode.java
new file mode 100644
index 0000000..fbe6c6a
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/DistributionMode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Locale;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash: hash distribute by partition key, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is suitable
+ * for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash"), RANGE("range");
+
+  private final String modeName;
+
+  DistributionMode(String modeName) {
+    this.modeName = modeName;
+  }
+
+  public String modeName() {
+    return modeName;
+  }
+
+  public static DistributionMode fromName(String modeName) {
+    Preconditions.checkNotNull(modeName, "Name of distribution mode should not be null");
+    return DistributionMode.valueOf(modeName.toUpperCase(Locale.ENGLISH));
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index a60992c..0dc5232 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -138,6 +138,9 @@ public class TableProperties {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_DISTRIBUTION_MODE = "write.distribution-mode";
+  public static final String WRITE_DISTRIBUTION_MODE_DEFAULT = "none";
+
   public static final String GC_ENABLED = "gc.enabled";
   public static final boolean GC_ENABLED_DEFAULT = true;
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 8c4486a..c691e58 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -38,7 +38,9 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -48,13 +50,18 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
 public class FlinkSink {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
 
   private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
   private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
@@ -114,6 +121,7 @@ public class FlinkSink {
     private Table table;
     private TableSchema tableSchema;
     private boolean overwrite = false;
+    private DistributionMode distributionMode = null;
     private Integer writeParallelism = null;
     private List<String> equalityFieldColumns = null;
 
@@ -162,6 +170,20 @@ public class FlinkSink {
     }
 
     /**
+     * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support
+     * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
+     *
+     * @param mode to specify the write distribution mode.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder distributionMode(DistributionMode mode) {
+      Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
+          "Flink does not support 'range' write distribution mode now.");
+      this.distributionMode = mode;
+      return this;
+    }
+
+    /**
      * Configuring the write parallel number for iceberg stream writer.
      *
      * @param newWriteParallelism the number of parallel iceberg stream writer.
@@ -209,7 +231,14 @@ public class FlinkSink {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
+      // Convert the requested flink table schema to flink row type.
+      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+      // Distribute the records from input data stream based on the write.distribution-mode.
+      rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+
+      // Chain the iceberg stream writer and committer operator.
+      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
       IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
 
       this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
@@ -225,32 +254,70 @@ public class FlinkSink {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> properties,
+                                                     PartitionSpec partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+          }
+
+        case RANGE:
+          LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
+              WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
+          return input;
+
+        default:
+          throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+      }
+    }
+  }
 
-    RowType flinkSchema;
+  static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
     if (requestedSchema != null) {
       // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema.
-      Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
-      TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);
+      Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+      TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
 
       // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to
       // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will
       // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink
       // schema.
-      flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
+      return (RowType) requestedSchema.toRowDataType().getLogicalType();
     } else {
-      flinkSchema = FlinkSchemaUtil.convert(table.schema());
+      return FlinkSchemaUtil.convert(schema);
     }
+  }
 
+  static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+                                                         RowType flinkRowType,
+                                                         List<Integer> equalityFieldIds) {
     Map<String, String> props = table.properties();
     long targetFileSize = getTargetFileSizeBytes(props);
     FileFormat fileFormat = getFileFormat(props);
 
-    TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema,
+    TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType,
         table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props,
         equalityFieldIds);
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
new file mode 100644
index 0000000..598df09
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionKeySelector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+
+/**
+ * Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be wrote by only one
+ * task. That will reduce lots of small files in partitioned fanout write policy for {@link FlinkSink}.
+ */
+class PartitionKeySelector implements KeySelector<RowData, String> {
+
+  private final Schema schema;
+  private final PartitionKey partitionKey;
+  private final RowType flinkSchema;
+
+  private transient RowDataWrapper rowDataWrapper;
+
+  PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) {
+    this.schema = schema;
+    this.partitionKey = new PartitionKey(spec, schema);
+    this.flinkSchema = flinkSchema;
+  }
+
+  /**
+   * Construct the {@link RowDataWrapper} lazily here because few members in it are not serializable. In this way, we
+   * don't have to serialize them with forcing.
+   */
+  private RowDataWrapper lazyRowDataWrapper() {
+    if (rowDataWrapper == null) {
+      rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+    }
+    return rowDataWrapper;
+  }
+
+  @Override
+  public String getKey(RowData row) {
+    partitionKey.partition(lazyRowDataWrapper().wrap(row));
+    return partitionKey.toPath();
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index d96ef68..f27e7fb 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -101,17 +101,24 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
     }
     if (isHadoopCatalog) {
       config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
-      config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot());
     } else {
       config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
-      config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
       config.put(CatalogProperties.HIVE_URI, getURI(hiveConf));
     }
+    config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));
 
     this.flinkDatabase = catalogName + "." + DATABASE;
     this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {DATABASE}));
   }
 
+  protected String warehouseRoot() {
+    if (isHadoopCatalog) {
+      return hadoopWarehouse.getRoot().getAbsolutePath();
+    } else {
+      return hiveWarehouse.getRoot().getAbsolutePath();
+    }
+  }
+
   static String getURI(HiveConf conf) {
     return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
   }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index ae3a3c1..f064c53 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -19,19 +19,31 @@
 
 package org.apache.iceberg.flink;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
@@ -80,6 +92,8 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
           env.enableCheckpointing(400);
+          env.setMaxParallelism(2);
+          env.setParallelism(2);
           tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
         } else {
           settingsBuilder.inBatchMode();
@@ -220,4 +234,55 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
 
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
   }
+
+  @Test
+  public void testHashDistributeMode() throws Exception {
+    String tableName = "test_hash_distribution_mode";
+
+    Map<String, String> tableProps = ImmutableMap.of(
+        "write.format.default", format.name(),
+        TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
+    );
+    sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+        tableName, toWithClause(tableProps));
+
+    // Insert data set.
+    sql("INSERT INTO %s VALUES " +
+        "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
+        "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
+        "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+    SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
+        SimpleDataUtil.createRecord(1, "aaa"),
+        SimpleDataUtil.createRecord(1, "bbb"),
+        SimpleDataUtil.createRecord(1, "ccc"),
+        SimpleDataUtil.createRecord(2, "aaa"),
+        SimpleDataUtil.createRecord(2, "bbb"),
+        SimpleDataUtil.createRecord(2, "ccc"),
+        SimpleDataUtil.createRecord(3, "aaa"),
+        SimpleDataUtil.createRecord(3, "bbb"),
+        SimpleDataUtil.createRecord(3, "ccc")
+    ));
+
+    Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1,
+        partitionFiles(tableName, "aaa").size());
+    Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1,
+        partitionFiles(tableName, "bbb").size());
+    Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1,
+        partitionFiles(tableName, "ccc").size());
+
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+  }
+
+  private List<Path> partitionFiles(String table, String partition) throws IOException {
+    String databasePath = Joiner.on("/").join(baseNamespace.levels()) + "/" + DATABASE;
+    if (!isHadoopCatalog) {
+      databasePath = databasePath + ".db";
+    }
+    Path dir = Paths.get(warehouseRoot(), databasePath, table, "data", String.format("data=%s", partition));
+    return Files.list(dir)
+        .filter(p -> !p.toString().endsWith(".crc"))
+        .collect(Collectors.toList());
+  }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 90ac3a2..7982f27 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.flink.sink;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -35,6 +38,8 @@ import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -144,10 +149,17 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
     SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
   }
 
-  private void testWriteRow(TableSchema tableSchema) throws Exception {
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception {
     List<Row> rows = Lists.newArrayList(
-        Row.of(4, "bar"),
-        Row.of(5, "apache")
+        Row.of(1, "aaa"),
+        Row.of(1, "bbb"),
+        Row.of(1, "ccc"),
+        Row.of(2, "aaa"),
+        Row.of(2, "bbb"),
+        Row.of(2, "ccc"),
+        Row.of(3, "aaa"),
+        Row.of(3, "bbb"),
+        Row.of(3, "ccc")
     );
     DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
 
@@ -156,6 +168,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .writeParallelism(parallelism)
+        .distributionMode(distributionMode)
         .build();
 
     // Execute the program.
@@ -164,13 +177,84 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
     SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
   }
 
+  private List<Path> partitionFiles(String partition) throws IOException {
+    return Files.list(Paths.get(tablePath, "data", String.format("data=%s", partition)))
+        .filter(p -> !p.toString().endsWith(".crc"))
+        .collect(Collectors.toList());
+  }
+
   @Test
   public void testWriteRow() throws Exception {
-    testWriteRow(null);
+    testWriteRow(null, DistributionMode.NONE);
   }
 
   @Test
   public void testWriteRowWithTableSchema() throws Exception {
-    testWriteRow(SimpleDataUtil.FLINK_SCHEMA);
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testJobNoneDistributeMode() throws Exception {
+    table.updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
+        .commit();
+
+    testWriteRow(null, DistributionMode.NONE);
+
+    if (parallelism > 1) {
+      if (partitioned) {
+        int files = partitionFiles("aaa").size() + partitionFiles("bbb").size() + partitionFiles("ccc").size();
+        Assert.assertTrue("Should have more than 3 files in iceberg table.", files > 3);
+      }
+    }
+  }
+
+  @Test
+  public void testJobHashDistributionMode() {
+    table.updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
+        .commit();
+
+    AssertHelpers.assertThrows("Does not support range distribution-mode now.",
+        IllegalArgumentException.class, "Flink does not support 'range' write distribution mode now.",
+        () -> {
+          testWriteRow(null, DistributionMode.RANGE);
+          return null;
+        });
+  }
+
+  @Test
+  public void testJobNullDistributionMode() throws Exception {
+    table.updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
+        .commit();
+
+    testWriteRow(null, null);
+
+    if (partitioned) {
+      Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa").size());
+      Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb").size());
+      Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc").size());
+    }
+  }
+
+  @Test
+  public void testPartitionWriteMode() throws Exception {
+    testWriteRow(null, DistributionMode.HASH);
+    if (partitioned) {
+      Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa").size());
+      Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb").size());
+      Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc").size());
+    }
+  }
+
+  @Test
+  public void testShuffleByPartitionWithSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
+    if (partitioned) {
+      Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa").size());
+      Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb").size());
+      Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc").size());
+    }
   }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index c6c20e0..920cd69 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -337,7 +338,8 @@ public class TestIcebergStreamWriter {
 
   private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
       Table icebergTable, TableSchema flinkSchema) throws Exception {
-    IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema, null);
+    RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
+    IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null);
     OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new OneInputStreamOperatorTestHarness<>(
         streamWriter, 1, 1, 0);