You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/15 07:21:04 UTC

[flink-table-store] branch master updated: [FLINK-28552] GenerateUtils#generateCompare supports MULTISET and MAP

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new dbe49f7a [FLINK-28552] GenerateUtils#generateCompare supports MULTISET and MAP
dbe49f7a is described below

commit dbe49f7ab6b936a51c09a066c8212df96e744324
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Tue Nov 15 15:20:58 2022 +0800

    [FLINK-28552] GenerateUtils#generateCompare supports MULTISET and MAP
    
    This closes #375
---
 .../GenerateUtils.scala                            | 105 ++++++++++++-
 .../store/format/FileStatsExtractorTestBase.java   |  30 ++++
 .../store/table/AppendOnlyFileStoreTableTest.java  |  36 +++--
 .../ChangelogValueCountFileStoreTableTest.java     |  32 ++--
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 163 ++++++++++++++-------
 .../table/store/table/FileStoreTableTestBase.java  |  45 +++++-
 .../table/store/format/orc/OrcFileFormat.java      |  13 +-
 .../format/orc/OrcFileStatsExtractorTest.java      |   6 +-
 8 files changed, 331 insertions(+), 99 deletions(-)

diff --git a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala
index 6c9e5dbb..b28c4a4a 100644
--- a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala
+++ b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala
@@ -47,6 +47,8 @@ object GenerateUtils {
 
   val ARRAY_DATA: String = className[ArrayData]
 
+  val MAP_DATA: String = className[MapData]
+
   val ROW_DATA: String = className[RowData]
 
   val BINARY_STRING: String = className[BinaryStringData]
@@ -135,8 +137,8 @@ object GenerateUtils {
     case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DATE | TIME_WITHOUT_TIME_ZONE |
         INTERVAL_YEAR_MONTH | INTERVAL_DAY_TIME =>
       s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
-    case TIMESTAMP_WITH_TIME_ZONE | MULTISET | MAP =>
-      throw new UnsupportedOperationException() // TODO support MULTISET and MAP?
+    case TIMESTAMP_WITH_TIME_ZONE =>
+      throw new UnsupportedOperationException()
     case ARRAY =>
       val at = t.asInstanceOf[ArrayType]
       val compareFunc = newName("compareArray")
@@ -150,6 +152,32 @@ object GenerateUtils {
         """
       ctx.addReusableMember(funcCode)
       s"$compareFunc($leftTerm, $rightTerm)"
+    case MAP =>
+      val at = t.asInstanceOf[MapType]
+      val compareFunc = newName("compareMap")
+      val compareCode = generateMapCompare(ctx, nullsIsLast = false, at, "a", "b")
+      val funcCode: String =
+        s"""
+          public int $compareFunc($MAP_DATA a, $MAP_DATA b) {
+            $compareCode
+            return 0;
+          }
+        """
+      ctx.addReusableMember(funcCode)
+      s"$compareFunc($leftTerm, $rightTerm)"
+    case MULTISET =>
+      val at = t.asInstanceOf[MultisetType]
+      val compareFunc = newName("compareMultiset")
+      val compareCode = generateMultisetCompare(ctx, nullsIsLast = false, at, "a", "b")
+      val funcCode: String =
+        s"""
+          public int $compareFunc($MAP_DATA a, $MAP_DATA b) {
+            $compareCode
+            return 0;
+          }
+        """
+      ctx.addReusableMember(funcCode)
+      s"$compareFunc($leftTerm, $rightTerm)"
     case ROW | STRUCTURED_TYPE =>
       val fieldCount = getFieldCount(t)
       val comparisons =
@@ -219,12 +247,12 @@ object GenerateUtils {
     val comp = newName("comp")
     val typeTerm = primitiveTypeTermForType(elementType)
     s"""
-        int $lengthA = a.size();
-        int $lengthB = b.size();
+        int $lengthA = $leftTerm.size();
+        int $lengthB = $rightTerm.size();
         int $minLength = ($lengthA > $lengthB) ? $lengthB : $lengthA;
         for (int $i = 0; $i < $minLength; $i++) {
-          boolean $isNullA = a.isNullAt($i);
-          boolean $isNullB = b.isNullAt($i);
+          boolean $isNullA = $leftTerm.isNullAt($i);
+          boolean $isNullB = $rightTerm.isNullAt($i);
           if ($isNullA && $isNullB) {
             // Continue to compare the next element
           } else if ($isNullA) {
@@ -249,6 +277,71 @@ object GenerateUtils {
       """
   }
 
+  /** Generates code for comparing map. */
+  def generateMapCompare(
+      ctx: CodeGeneratorContext,
+      nullsIsLast: Boolean,
+      mapType: MapType,
+      leftTerm: String,
+      rightTerm: String): String = {
+    val keyArrayType = new ArrayType(mapType.getKeyType)
+    val valueArrayType = new ArrayType(mapType.getKeyType)
+    generateMapDataCompare(ctx, nullsIsLast, leftTerm, rightTerm, keyArrayType, valueArrayType)
+  }
+
+  /** Generates code for comparing multiset. */
+  def generateMultisetCompare(
+      ctx: CodeGeneratorContext,
+      nullsIsLast: Boolean,
+      multisetType: MultisetType,
+      leftTerm: String,
+      rightTerm: String): String = {
+    val keyArrayType = new ArrayType(multisetType.getElementType)
+    val valueArrayType = new ArrayType(new IntType(false))
+    generateMapDataCompare(ctx, nullsIsLast, leftTerm, rightTerm, keyArrayType, valueArrayType)
+  }
+
+  def generateMapDataCompare(
+      ctx: CodeGeneratorContext,
+      nullsIsLast: Boolean,
+      leftTerm: String,
+      rightTerm: String,
+      keyArrayType: ArrayType,
+      valueArrayType: ArrayType): String = {
+    val keyArrayTerm = primitiveTypeTermForType(keyArrayType)
+    val valueArrayTerm = primitiveTypeTermForType(valueArrayType)
+    val lengthA = newName("lengthA")
+    val lengthB = newName("lengthB")
+    val comp = newName("comp")
+    val keyArrayA = newName("keyArrayA")
+    val keyArrayB = newName("keyArrayB")
+    val valueArrayA = newName("valueArrayA")
+    val valueArrayB = newName("valueArrayB")
+    s"""
+        int $lengthA = $leftTerm.size();
+        int $lengthB = $rightTerm.size();
+        if ($lengthA == $lengthB) {
+          $keyArrayTerm $keyArrayA = $leftTerm.keyArray();
+          $keyArrayTerm $keyArrayB = $rightTerm.keyArray();
+          int $comp = ${generateCompare(ctx, keyArrayType, nullsIsLast, keyArrayA, keyArrayB)};
+          if ($comp == 0) {
+            $valueArrayTerm $valueArrayA = $leftTerm.valueArray();
+            $valueArrayTerm $valueArrayB = $rightTerm.valueArray();
+            $comp = ${generateCompare(ctx, valueArrayType, nullsIsLast, valueArrayA, valueArrayB)};
+            if ($comp != 0) {
+              return $comp;
+            }
+          } else {
+            return $comp;
+          }
+        } else if ($lengthA < $lengthB) {
+          return -1;
+        } else if ($lengthA > $lengthB) {
+          return 1;
+        }
+     """
+  }
+
   /** Generates code for comparing row keys. */
   def generateRowCompare(
       ctx: CodeGeneratorContext,
diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 2b348037..86069a87 100644
--- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -25,7 +25,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringData;
@@ -33,7 +35,10 @@ import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
@@ -46,7 +51,9 @@ import java.math.BigDecimal;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -158,6 +165,10 @@ public abstract class FileStatsExtractorTestBase {
                 return randomTimestampData((TimestampType) type);
             case ARRAY:
                 return randomArray((ArrayType) type);
+            case MAP:
+                return randomMap((MapType) type);
+            case MULTISET:
+                return randomMultiset((MultisetType) type);
             default:
                 throw new UnsupportedOperationException(
                         "Unsupported type " + type.asSummaryString());
@@ -210,6 +221,25 @@ public abstract class FileStatsExtractorTestBase {
         return new GenericArrayData(javaArray);
     }
 
+    private MapData randomMap(MapType type) {
+        int length = ThreadLocalRandom.current().nextInt(10);
+        Map<Object, Object> javaMap = new HashMap<>(length);
+        for (int i = 0; i < length; i++) {
+            javaMap.put(createField(type.getKeyType()), createField(type.getValueType()));
+        }
+        return new GenericMapData(javaMap);
+    }
+
+    private MapData randomMultiset(MultisetType type) {
+        int length = ThreadLocalRandom.current().nextInt(10);
+        Map<Object, Object> javaMap = new HashMap<>(length);
+        IntType intType = new IntType(false);
+        for (int i = 0; i < length; i++) {
+            javaMap.put(createField(type.getElementType()), createField(intType));
+        }
+        return new GenericMapData(javaMap);
+    }
+
     protected abstract FileFormat createFormat();
 
     protected abstract RowType rowType();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 58b78e17..563f290d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -64,18 +64,18 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "1|10|100|binary|varbinary",
-                                "1|11|101|binary|varbinary",
-                                "1|12|102|binary|varbinary",
-                                "1|11|101|binary|varbinary",
-                                "1|12|102|binary|varbinary"));
+                                "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|12|102|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "2|20|200|binary|varbinary",
-                                "2|21|201|binary|varbinary",
-                                "2|22|202|binary|varbinary",
-                                "2|21|201|binary|varbinary"));
+                                "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|22|202|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -104,10 +104,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "2|21|201|binary|varbinary",
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
                                 // this record is in the same file with the first "2|21|201"
-                                "2|22|202|binary|varbinary",
-                                "2|21|201|binary|varbinary"));
+                                "2|22|202|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -119,9 +119,13 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
-                        Arrays.asList("+1|11|101|binary|varbinary", "+1|12|102|binary|varbinary"));
+                        Arrays.asList(
+                                "+1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                "+1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("+2|21|201|binary|varbinary"));
+                .isEqualTo(
+                        Collections.singletonList(
+                                "+2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -150,9 +154,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "+1|11|101|binary|varbinary",
+                                "+1|11|101|binary|varbinary|mapKey:mapVal|multiset",
                                 // this record is in the same file with "+1|11|101"
-                                "+1|12|102|binary|varbinary"));
+                                "+1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEmpty();
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index c74abf7e..24067da5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -56,15 +56,15 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "1|11|101|binary|varbinary",
-                                "1|11|101|binary|varbinary",
-                                "1|12|102|binary|varbinary"));
+                                "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "2|20|200|binary|varbinary",
-                                "2|21|201|binary|varbinary",
-                                "2|22|202|binary|varbinary"));
+                                "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -93,9 +93,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "2|21|201|binary|varbinary",
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
                                 // this record is in the same file with "delete 2|21|201"
-                                "2|22|202|binary|varbinary"));
+                                "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -107,13 +107,15 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
-                        Arrays.asList("-1|10|100|binary|varbinary", "+1|11|101|binary|varbinary"));
+                        Arrays.asList(
+                                "-1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                                "+1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "-2|21|201|binary|varbinary",
-                                "-2|21|201|binary|varbinary",
-                                "+2|22|202|binary|varbinary"));
+                                "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -143,10 +145,10 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "-2|21|201|binary|varbinary",
-                                "-2|21|201|binary|varbinary",
+                                "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+                                "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
                                 // this record is in the same file with "delete 2|21|201"
-                                "+2|22|202|binary|varbinary"));
+                                "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     private void writeData() throws Exception {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 2c9fc7fe..dd4116af 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.Snapshot;
@@ -36,6 +38,8 @@ import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.utils.CompatibilityTestUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -45,12 +49,42 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ChangelogWithKeyFileStoreTable}. */
 public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
+    protected static final RowType COMPATIBILITY_ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.BIGINT().getLogicalType(),
+                        DataTypes.BINARY(1).getLogicalType(),
+                        DataTypes.VARBINARY(1).getLogicalType()
+                    },
+                    new String[] {"pt", "a", "b", "c", "d"});
+
+    protected static final Function<RowData, String> COMPATIBILITY_BATCH_ROW_TO_STRING =
+            rowData ->
+                    rowData.getInt(0)
+                            + "|"
+                            + rowData.getInt(1)
+                            + "|"
+                            + rowData.getLong(2)
+                            + "|"
+                            + new String(rowData.getBinary(3))
+                            + "|"
+                            + new String(rowData.getBinary(4));
+
+    protected static final Function<RowData, String> COMPATIBILITY_CHANGELOG_ROW_TO_STRING =
+            rowData ->
+                    rowData.getRowKind().shortString()
+                            + " "
+                            + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
+
     @Test
     public void testSequenceNumber() throws Exception {
         FileStoreTable table =
@@ -68,7 +102,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("1|10|200|binary|varbinary", "1|11|101|binary|varbinary"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "1|10|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -79,10 +116,14 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
+                .isEqualTo(
+                        Collections.singletonList(
+                                "1|10|1000|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
                 .isEqualTo(
-                        Arrays.asList("2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
+                        Arrays.asList(
+                                "2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -113,7 +154,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
+                                "2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+                                "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -124,13 +166,15 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
+                .isEqualTo(
+                        Collections.singletonList(
+                                "-1|11|1001|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(
                         Arrays.asList(
-                                "-2|20|200|binary|varbinary",
-                                "+2|21|20001|binary|varbinary",
-                                "+2|22|202|binary|varbinary"));
+                                "-2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+                                "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -163,9 +207,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "-2|20|200|binary|varbinary",
-                                "+2|21|20001|binary|varbinary",
-                                "+2|22|202|binary|varbinary"));
+                                "-2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+                                "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -190,14 +234,14 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
-                        "+I 1|10|100|binary|varbinary",
-                        "+I 1|20|200|binary|varbinary",
-                        "-D 1|10|100|binary|varbinary",
-                        "+I 1|10|101|binary|varbinary",
-                        "-U 1|20|200|binary|varbinary",
-                        "+U 1|20|201|binary|varbinary",
-                        "-U 1|10|101|binary|varbinary",
-                        "+U 1|10|102|binary|varbinary");
+                        "+I 1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "+I 1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                        "-D 1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "+I 1|10|101|binary|varbinary|mapKey:mapVal|multiset",
+                        "-U 1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                        "+U 1|20|201|binary|varbinary|mapKey:mapVal|multiset",
+                        "-U 1|10|101|binary|varbinary|mapKey:mapVal|multiset",
+                        "+U 1|10|102|binary|varbinary|mapKey:mapVal|multiset");
     }
 
     @Test
@@ -224,9 +268,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
-                        "+I 1|10|110|binary|varbinary", "+I 1|20|120|binary|varbinary");
+                        "+I 1|10|110|binary|varbinary|mapKey:mapVal|multiset",
+                        "+I 1|20|120|binary|varbinary|mapKey:mapVal|multiset");
         assertThat(getResult(read, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING))
-                .containsExactlyInAnyOrder("+I 2|20|220|binary|varbinary");
+                .containsExactlyInAnyOrder("+I 2|20|220|binary|varbinary|mapKey:mapVal|multiset");
 
         write.write(rowData(1, 30, 130L));
         write.write(rowData(1, 40, 140L));
@@ -242,10 +287,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
         splits = table.newScan().withIncremental(true).plan().splits();
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
-                .containsExactlyInAnyOrder("+I 1|30|130|binary|varbinary");
+                .containsExactlyInAnyOrder("+I 1|30|130|binary|varbinary|mapKey:mapVal|multiset");
         assertThat(getResult(read, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
-                        "+I 2|30|230|binary|varbinary", "+I 2|40|241|binary|varbinary");
+                        "+I 2|30|230|binary|varbinary|mapKey:mapVal|multiset",
+                        "+I 2|40|241|binary|varbinary|mapKey:mapVal|multiset");
 
         write.write(rowData(1, 20, 121L));
         write.write(rowData(1, 30, 131L));
@@ -266,17 +312,17 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         splits = table.newScan().withIncremental(true).plan().splits();
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
-                        "-D 1|20|120|binary|varbinary",
-                        "-U 1|30|130|binary|varbinary",
-                        "+U 1|30|132|binary|varbinary",
-                        "+I 1|40|141|binary|varbinary");
+                        "-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset",
+                        "-U 1|30|130|binary|varbinary|mapKey:mapVal|multiset",
+                        "+U 1|30|132|binary|varbinary|mapKey:mapVal|multiset",
+                        "+I 1|40|141|binary|varbinary|mapKey:mapVal|multiset");
         assertThat(getResult(read, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING))
                 .containsExactlyInAnyOrder(
-                        "-D 2|20|220|binary|varbinary",
-                        "-U 2|30|230|binary|varbinary",
-                        "+U 2|30|231|binary|varbinary",
-                        "-U 2|40|241|binary|varbinary",
-                        "+U 2|40|242|binary|varbinary");
+                        "-D 2|20|220|binary|varbinary|mapKey:mapVal|multiset",
+                        "-U 2|30|230|binary|varbinary|mapKey:mapVal|multiset",
+                        "+U 2|30|231|binary|varbinary|mapKey:mapVal|multiset",
+                        "-U 2|40|241|binary|varbinary|mapKey:mapVal|multiset",
+                        "+U 2|40|242|binary|varbinary|mapKey:mapVal|multiset");
     }
 
     @Test
@@ -285,7 +331,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         CompatibilityTestUtils.unzip("compatibility/table-changelog-0.2.zip", tablePath.getPath());
         FileStoreTable table =
                 createFileStoreTable(
-                        conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
+                        conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT),
+                        COMPATIBILITY_ROW_TYPE);
 
         List<List<List<String>>> expected =
                 Arrays.asList(
@@ -341,7 +388,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                                                 splits,
                                                 binaryRow(j + 1),
                                                 0,
-                                                CHANGELOG_ROW_TO_STRING))
+                                                COMPATIBILITY_CHANGELOG_ROW_TO_STRING))
                                 .isEqualTo(expected.get(i).get(j));
                     }
 
@@ -423,13 +470,17 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead().withFilter(builder.equal(1, 30));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+                        Arrays.asList(
+                                "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
 
         // push down value filter b = 300L
         read = table.newRead().withFilter(builder.equal(2, 300L));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+                        Arrays.asList(
+                                "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
 
         // push down both key filter and value filter
         read =
@@ -439,10 +490,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "1|10|100|binary|varbinary",
-                                "1|20|200|binary|varbinary",
-                                "1|30|300|binary|varbinary",
-                                "1|40|400|binary|varbinary"));
+                                "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
 
         // update pk 60, 10
         write.write(rowData(1, 60, 500L));
@@ -457,12 +508,12 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
                         Arrays.asList(
-                                "1|10|10|binary|varbinary",
-                                "1|20|200|binary|varbinary",
-                                "1|30|300|binary|varbinary",
-                                "1|40|400|binary|varbinary",
-                                "1|50|500|binary|varbinary",
-                                "1|60|500|binary|varbinary"));
+                                "1|10|10|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|40|400|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|50|500|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|60|500|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -487,7 +538,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("1|10|200|binary|varbinary"));
+                .isEqualTo(
+                        Collections.singletonList(
+                                "1|10|200|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -515,14 +568,24 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("1|10|100|binary|varbinary", "1|20|201|binary|varbinary"));
+                .isEqualTo(
+                        Arrays.asList(
+                                "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|20|201|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Collections.singletonList("2|10|300|binary|varbinary"));
+                .isEqualTo(
+                        Collections.singletonList(
+                                "2|10|300|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {
+        return createFileStoreTable(configure, ROW_TYPE);
+    }
+
+    private FileStoreTable createFileStoreTable(Consumer<Configuration> configure, RowType rowType)
+            throws Exception {
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
@@ -531,7 +594,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
                         new UpdateSchema(
-                                ROW_TYPE,
+                                rowType,
                                 Collections.singletonList("pt"),
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 3aeffe9a..58c7d088 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -22,8 +22,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
 import org.apache.flink.table.store.file.Snapshot;
@@ -80,9 +82,11 @@ public abstract class FileStoreTableTestBase {
                         DataTypes.INT().getLogicalType(),
                         DataTypes.BIGINT().getLogicalType(),
                         DataTypes.BINARY(1).getLogicalType(),
-                        DataTypes.VARBINARY(1).getLogicalType()
+                        DataTypes.VARBINARY(1).getLogicalType(),
+                        DataTypes.MAP(DataTypes.VARCHAR(8), DataTypes.VARCHAR(8)).getLogicalType(),
+                        DataTypes.MULTISET(DataTypes.VARCHAR(8)).getLogicalType()
                     },
-                    new String[] {"pt", "a", "b", "c", "d"});
+                    new String[] {"pt", "a", "b", "c", "d", "e", "f"});
     protected static final int[] PROJECTION = new int[] {2, 1};
     protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
             rowData ->
@@ -94,7 +98,14 @@ public abstract class FileStoreTableTestBase {
                             + "|"
                             + new String(rowData.getBinary(3))
                             + "|"
-                            + new String(rowData.getBinary(4));
+                            + new String(rowData.getBinary(4))
+                            + "|"
+                            + String.format(
+                                    "%s:%s",
+                                    rowData.getMap(5).keyArray().getString(0).toString(),
+                                    rowData.getMap(5).valueArray().getString(0))
+                            + "|"
+                            + rowData.getMap(6).keyArray().getString(0).toString();
     protected static final Function<RowData, String> BATCH_PROJECTED_ROW_TO_STRING =
             rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
     protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
@@ -154,9 +165,13 @@ public abstract class FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits();
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
+                .hasSameElementsAs(
+                        Collections.singletonList(
+                                "1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("2|21|201|binary|varbinary"));
+                .hasSameElementsAs(
+                        Collections.singletonList(
+                                "2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -212,7 +227,9 @@ public abstract class FileStoreTableTestBase {
         TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
                 .hasSameElementsAs(
-                        Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+                        Arrays.asList(
+                                "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                                "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
     @Test
@@ -342,7 +359,15 @@ public abstract class FileStoreTableTestBase {
 
     protected GenericRowData rowData(Object... values) {
         return GenericRowData.of(
-                values[0], values[1], values[2], "binary".getBytes(), "varbinary".getBytes());
+                values[0],
+                values[1],
+                values[2],
+                "binary".getBytes(),
+                "varbinary".getBytes(),
+                new GenericMapData(
+                        Collections.singletonMap(
+                                StringData.fromString("mapKey"), StringData.fromString("mapVal"))),
+                new GenericMapData(Collections.singletonMap(StringData.fromString("multiset"), 1)));
     }
 
     protected GenericRowData rowDataWithKind(RowKind rowKind, Object... values) {
@@ -352,7 +377,11 @@ public abstract class FileStoreTableTestBase {
                 values[1],
                 values[2],
                 "binary".getBytes(),
-                "varbinary".getBytes());
+                "varbinary".getBytes(),
+                new GenericMapData(
+                        Collections.singletonMap(
+                                StringData.fromString("mapKey"), StringData.fromString("mapVal"))),
+                new GenericMapData(Collections.singletonMap(StringData.fromString("multiset"), 1)));
     }
 
     protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception {
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index b7e8ce39..663260ab 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -37,8 +37,10 @@ import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.orc.TypeDescription;
@@ -111,10 +113,10 @@ public class OrcFileFormat extends FileFormat {
      */
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-        LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
+        LogicalType refinedType = refineLogicalType(type);
+        LogicalType[] orcTypes = refinedType.getChildren().toArray(new LogicalType[0]);
 
-        TypeDescription typeDescription =
-                OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type));
+        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(refinedType);
         Vectorizer<RowData> vectorizer =
                 new RowDataVectorizer(typeDescription.toString(), orcTypes);
 
@@ -145,6 +147,11 @@ public class OrcFileFormat extends FileFormat {
                 return new MapType(
                         refineLogicalType(mapType.getKeyType()),
                         refineLogicalType(mapType.getValueType()));
+            case MULTISET:
+                MultisetType multisetType = (MultisetType) type;
+                return new MapType(
+                        refineLogicalType(multisetType.getElementType()),
+                        refineLogicalType(new IntType(false)));
             case ROW:
                 RowType rowType = (RowType) type;
                 return new RowType(
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 86aba5c4..03d2a69f 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -66,6 +68,8 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
                 new TimestampType(3),
                 // orc reader & writer currently cannot preserve a high precision timestamp
                 // new TimestampType(9),
-                new ArrayType(new IntType()));
+                new ArrayType(new IntType()),
+                new MapType(new VarCharType(8), new VarCharType(8)),
+                new MultisetType(new VarCharType(8)));
     }
 }