You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/15 07:21:04 UTC
[flink-table-store] branch master updated: [FLINK-28552] GenerateUtils#generateCompare supports MULTISET and MAP
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new dbe49f7a [FLINK-28552] GenerateUtils#generateCompare supports MULTISET and MAP
dbe49f7a is described below
commit dbe49f7ab6b936a51c09a066c8212df96e744324
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Tue Nov 15 15:20:58 2022 +0800
[FLINK-28552] GenerateUtils#generateCompare supports MULTISET and MAP
This closes #375
---
.../GenerateUtils.scala | 105 ++++++++++++-
.../store/format/FileStatsExtractorTestBase.java | 30 ++++
.../store/table/AppendOnlyFileStoreTableTest.java | 36 +++--
.../ChangelogValueCountFileStoreTableTest.java | 32 ++--
.../table/ChangelogWithKeyFileStoreTableTest.java | 163 ++++++++++++++-------
.../table/store/table/FileStoreTableTestBase.java | 45 +++++-
.../table/store/format/orc/OrcFileFormat.java | 13 +-
.../format/orc/OrcFileStatsExtractorTest.java | 6 +-
8 files changed, 331 insertions(+), 99 deletions(-)
diff --git a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala
index 6c9e5dbb..b28c4a4a 100644
--- a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala
+++ b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/GenerateUtils.scala
@@ -47,6 +47,8 @@ object GenerateUtils {
val ARRAY_DATA: String = className[ArrayData]
+ val MAP_DATA: String = className[MapData]
+
val ROW_DATA: String = className[RowData]
val BINARY_STRING: String = className[BinaryStringData]
@@ -135,8 +137,8 @@ object GenerateUtils {
case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DATE | TIME_WITHOUT_TIME_ZONE |
INTERVAL_YEAR_MONTH | INTERVAL_DAY_TIME =>
s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
- case TIMESTAMP_WITH_TIME_ZONE | MULTISET | MAP =>
- throw new UnsupportedOperationException() // TODO support MULTISET and MAP?
+ case TIMESTAMP_WITH_TIME_ZONE =>
+ throw new UnsupportedOperationException()
case ARRAY =>
val at = t.asInstanceOf[ArrayType]
val compareFunc = newName("compareArray")
@@ -150,6 +152,32 @@ object GenerateUtils {
"""
ctx.addReusableMember(funcCode)
s"$compareFunc($leftTerm, $rightTerm)"
+ case MAP =>
+ val at = t.asInstanceOf[MapType]
+ val compareFunc = newName("compareMap")
+ val compareCode = generateMapCompare(ctx, nullsIsLast = false, at, "a", "b")
+ val funcCode: String =
+ s"""
+ public int $compareFunc($MAP_DATA a, $MAP_DATA b) {
+ $compareCode
+ return 0;
+ }
+ """
+ ctx.addReusableMember(funcCode)
+ s"$compareFunc($leftTerm, $rightTerm)"
+ case MULTISET =>
+ val at = t.asInstanceOf[MultisetType]
+ val compareFunc = newName("compareMultiset")
+ val compareCode = generateMultisetCompare(ctx, nullsIsLast = false, at, "a", "b")
+ val funcCode: String =
+ s"""
+ public int $compareFunc($MAP_DATA a, $MAP_DATA b) {
+ $compareCode
+ return 0;
+ }
+ """
+ ctx.addReusableMember(funcCode)
+ s"$compareFunc($leftTerm, $rightTerm)"
case ROW | STRUCTURED_TYPE =>
val fieldCount = getFieldCount(t)
val comparisons =
@@ -219,12 +247,12 @@ object GenerateUtils {
val comp = newName("comp")
val typeTerm = primitiveTypeTermForType(elementType)
s"""
- int $lengthA = a.size();
- int $lengthB = b.size();
+ int $lengthA = $leftTerm.size();
+ int $lengthB = $rightTerm.size();
int $minLength = ($lengthA > $lengthB) ? $lengthB : $lengthA;
for (int $i = 0; $i < $minLength; $i++) {
- boolean $isNullA = a.isNullAt($i);
- boolean $isNullB = b.isNullAt($i);
+ boolean $isNullA = $leftTerm.isNullAt($i);
+ boolean $isNullB = $rightTerm.isNullAt($i);
if ($isNullA && $isNullB) {
// Continue to compare the next element
} else if ($isNullA) {
@@ -249,6 +277,71 @@ object GenerateUtils {
"""
}
+ /** Generates code for comparing map. */
+ def generateMapCompare(
+ ctx: CodeGeneratorContext,
+ nullsIsLast: Boolean,
+ mapType: MapType,
+ leftTerm: String,
+ rightTerm: String): String = {
+ val keyArrayType = new ArrayType(mapType.getKeyType)
+ val valueArrayType = new ArrayType(mapType.getKeyType)
+ generateMapDataCompare(ctx, nullsIsLast, leftTerm, rightTerm, keyArrayType, valueArrayType)
+ }
+
+ /** Generates code for comparing multiset. */
+ def generateMultisetCompare(
+ ctx: CodeGeneratorContext,
+ nullsIsLast: Boolean,
+ multisetType: MultisetType,
+ leftTerm: String,
+ rightTerm: String): String = {
+ val keyArrayType = new ArrayType(multisetType.getElementType)
+ val valueArrayType = new ArrayType(new IntType(false))
+ generateMapDataCompare(ctx, nullsIsLast, leftTerm, rightTerm, keyArrayType, valueArrayType)
+ }
+
+ def generateMapDataCompare(
+ ctx: CodeGeneratorContext,
+ nullsIsLast: Boolean,
+ leftTerm: String,
+ rightTerm: String,
+ keyArrayType: ArrayType,
+ valueArrayType: ArrayType): String = {
+ val keyArrayTerm = primitiveTypeTermForType(keyArrayType)
+ val valueArrayTerm = primitiveTypeTermForType(valueArrayType)
+ val lengthA = newName("lengthA")
+ val lengthB = newName("lengthB")
+ val comp = newName("comp")
+ val keyArrayA = newName("keyArrayA")
+ val keyArrayB = newName("keyArrayB")
+ val valueArrayA = newName("valueArrayA")
+ val valueArrayB = newName("valueArrayB")
+ s"""
+ int $lengthA = $leftTerm.size();
+ int $lengthB = $rightTerm.size();
+ if ($lengthA == $lengthB) {
+ $keyArrayTerm $keyArrayA = $leftTerm.keyArray();
+ $keyArrayTerm $keyArrayB = $rightTerm.keyArray();
+ int $comp = ${generateCompare(ctx, keyArrayType, nullsIsLast, keyArrayA, keyArrayB)};
+ if ($comp == 0) {
+ $valueArrayTerm $valueArrayA = $leftTerm.valueArray();
+ $valueArrayTerm $valueArrayB = $rightTerm.valueArray();
+ $comp = ${generateCompare(ctx, valueArrayType, nullsIsLast, valueArrayA, valueArrayB)};
+ if ($comp != 0) {
+ return $comp;
+ }
+ } else {
+ return $comp;
+ }
+ } else if ($lengthA < $lengthB) {
+ return -1;
+ } else if ($lengthA > $lengthB) {
+ return 1;
+ }
+ """
+ }
+
/** Generates code for comparing row keys. */
def generateRowCompare(
ctx: CodeGeneratorContext,
diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 2b348037..86069a87 100644
--- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -25,7 +25,9 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
@@ -33,7 +35,10 @@ import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
@@ -46,7 +51,9 @@ import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -158,6 +165,10 @@ public abstract class FileStatsExtractorTestBase {
return randomTimestampData((TimestampType) type);
case ARRAY:
return randomArray((ArrayType) type);
+ case MAP:
+ return randomMap((MapType) type);
+ case MULTISET:
+ return randomMultiset((MultisetType) type);
default:
throw new UnsupportedOperationException(
"Unsupported type " + type.asSummaryString());
@@ -210,6 +221,25 @@ public abstract class FileStatsExtractorTestBase {
return new GenericArrayData(javaArray);
}
+ private MapData randomMap(MapType type) {
+ int length = ThreadLocalRandom.current().nextInt(10);
+ Map<Object, Object> javaMap = new HashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ javaMap.put(createField(type.getKeyType()), createField(type.getValueType()));
+ }
+ return new GenericMapData(javaMap);
+ }
+
+ private MapData randomMultiset(MultisetType type) {
+ int length = ThreadLocalRandom.current().nextInt(10);
+ Map<Object, Object> javaMap = new HashMap<>(length);
+ IntType intType = new IntType(false);
+ for (int i = 0; i < length; i++) {
+ javaMap.put(createField(type.getElementType()), createField(intType));
+ }
+ return new GenericMapData(javaMap);
+ }
+
protected abstract FileFormat createFormat();
protected abstract RowType rowType();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 58b78e17..563f290d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -64,18 +64,18 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "1|10|100|binary|varbinary",
- "1|11|101|binary|varbinary",
- "1|12|102|binary|varbinary",
- "1|11|101|binary|varbinary",
- "1|12|102|binary|varbinary"));
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+ "1|12|102|binary|varbinary|mapKey:mapVal|multiset",
+ "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+ "1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "2|20|200|binary|varbinary",
- "2|21|201|binary|varbinary",
- "2|22|202|binary|varbinary",
- "2|21|201|binary|varbinary"));
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+ "2|22|202|binary|varbinary|mapKey:mapVal|multiset",
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -104,10 +104,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "2|21|201|binary|varbinary",
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
// this record is in the same file with the first "2|21|201"
- "2|22|202|binary|varbinary",
- "2|21|201|binary|varbinary"));
+ "2|22|202|binary|varbinary|mapKey:mapVal|multiset",
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -119,9 +119,13 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
- Arrays.asList("+1|11|101|binary|varbinary", "+1|12|102|binary|varbinary"));
+ Arrays.asList(
+ "+1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+ "+1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("+2|21|201|binary|varbinary"));
+ .isEqualTo(
+ Collections.singletonList(
+ "+2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -150,9 +154,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "+1|11|101|binary|varbinary",
+ "+1|11|101|binary|varbinary|mapKey:mapVal|multiset",
// this record is in the same file with "+1|11|101"
- "+1|12|102|binary|varbinary"));
+ "+1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEmpty();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index c74abf7e..24067da5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -56,15 +56,15 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "1|11|101|binary|varbinary",
- "1|11|101|binary|varbinary",
- "1|12|102|binary|varbinary"));
+ "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+ "1|11|101|binary|varbinary|mapKey:mapVal|multiset",
+ "1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "2|20|200|binary|varbinary",
- "2|21|201|binary|varbinary",
- "2|22|202|binary|varbinary"));
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+ "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -93,9 +93,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "2|21|201|binary|varbinary",
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset",
// this record is in the same file with "delete 2|21|201"
- "2|22|202|binary|varbinary"));
+ "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -107,13 +107,15 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
- Arrays.asList("-1|10|100|binary|varbinary", "+1|11|101|binary|varbinary"));
+ Arrays.asList(
+ "-1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "+1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "-2|21|201|binary|varbinary",
- "-2|21|201|binary|varbinary",
- "+2|22|202|binary|varbinary"));
+ "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+ "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+ "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -143,10 +145,10 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "-2|21|201|binary|varbinary",
- "-2|21|201|binary|varbinary",
+ "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
+ "-2|21|201|binary|varbinary|mapKey:mapVal|multiset",
// this record is in the same file with "delete 2|21|201"
- "+2|22|202|binary|varbinary"));
+ "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
private void writeData() throws Exception {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 2c9fc7fe..dd4116af 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.file.Snapshot;
@@ -36,6 +38,8 @@ import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.utils.CompatibilityTestUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.function.FunctionWithException;
@@ -45,12 +49,42 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
+ protected static final RowType COMPATIBILITY_ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.BINARY(1).getLogicalType(),
+ DataTypes.VARBINARY(1).getLogicalType()
+ },
+ new String[] {"pt", "a", "b", "c", "d"});
+
+ protected static final Function<RowData, String> COMPATIBILITY_BATCH_ROW_TO_STRING =
+ rowData ->
+ rowData.getInt(0)
+ + "|"
+ + rowData.getInt(1)
+ + "|"
+ + rowData.getLong(2)
+ + "|"
+ + new String(rowData.getBinary(3))
+ + "|"
+ + new String(rowData.getBinary(4));
+
+ protected static final Function<RowData, String> COMPATIBILITY_CHANGELOG_ROW_TO_STRING =
+ rowData ->
+ rowData.getRowKind().shortString()
+ + " "
+ + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
+
@Test
public void testSequenceNumber() throws Exception {
FileStoreTable table =
@@ -68,7 +102,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("1|10|200|binary|varbinary", "1|11|101|binary|varbinary"));
+ .isEqualTo(
+ Arrays.asList(
+ "1|10|200|binary|varbinary|mapKey:mapVal|multiset",
+ "1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -79,10 +116,14 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
+ .isEqualTo(
+ Collections.singletonList(
+ "1|10|1000|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
- Arrays.asList("2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
+ Arrays.asList(
+ "2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+ "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -113,7 +154,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Arrays.asList(
// only filter on key should be performed,
// and records from the same file should also be selected
- "2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
+ "2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+ "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -124,13 +166,15 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
+ .isEqualTo(
+ Collections.singletonList(
+ "-1|11|1001|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "-2|20|200|binary|varbinary",
- "+2|21|20001|binary|varbinary",
- "+2|22|202|binary|varbinary"));
+ "-2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+ "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -163,9 +207,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Arrays.asList(
// only filter on key should be performed,
// and records from the same file should also be selected
- "-2|20|200|binary|varbinary",
- "+2|21|20001|binary|varbinary",
- "+2|22|202|binary|varbinary"));
+ "-2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset",
+ "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -190,14 +234,14 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
- "+I 1|10|100|binary|varbinary",
- "+I 1|20|200|binary|varbinary",
- "-D 1|10|100|binary|varbinary",
- "+I 1|10|101|binary|varbinary",
- "-U 1|20|200|binary|varbinary",
- "+U 1|20|201|binary|varbinary",
- "-U 1|10|101|binary|varbinary",
- "+U 1|10|102|binary|varbinary");
+ "+I 1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "+I 1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "-D 1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "+I 1|10|101|binary|varbinary|mapKey:mapVal|multiset",
+ "-U 1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "+U 1|20|201|binary|varbinary|mapKey:mapVal|multiset",
+ "-U 1|10|101|binary|varbinary|mapKey:mapVal|multiset",
+ "+U 1|10|102|binary|varbinary|mapKey:mapVal|multiset");
}
@Test
@@ -224,9 +268,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
- "+I 1|10|110|binary|varbinary", "+I 1|20|120|binary|varbinary");
+ "+I 1|10|110|binary|varbinary|mapKey:mapVal|multiset",
+ "+I 1|20|120|binary|varbinary|mapKey:mapVal|multiset");
assertThat(getResult(read, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING))
- .containsExactlyInAnyOrder("+I 2|20|220|binary|varbinary");
+ .containsExactlyInAnyOrder("+I 2|20|220|binary|varbinary|mapKey:mapVal|multiset");
write.write(rowData(1, 30, 130L));
write.write(rowData(1, 40, 140L));
@@ -242,10 +287,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
splits = table.newScan().withIncremental(true).plan().splits();
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
- .containsExactlyInAnyOrder("+I 1|30|130|binary|varbinary");
+ .containsExactlyInAnyOrder("+I 1|30|130|binary|varbinary|mapKey:mapVal|multiset");
assertThat(getResult(read, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
- "+I 2|30|230|binary|varbinary", "+I 2|40|241|binary|varbinary");
+ "+I 2|30|230|binary|varbinary|mapKey:mapVal|multiset",
+ "+I 2|40|241|binary|varbinary|mapKey:mapVal|multiset");
write.write(rowData(1, 20, 121L));
write.write(rowData(1, 30, 131L));
@@ -266,17 +312,17 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
splits = table.newScan().withIncremental(true).plan().splits();
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
- "-D 1|20|120|binary|varbinary",
- "-U 1|30|130|binary|varbinary",
- "+U 1|30|132|binary|varbinary",
- "+I 1|40|141|binary|varbinary");
+ "-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset",
+ "-U 1|30|130|binary|varbinary|mapKey:mapVal|multiset",
+ "+U 1|30|132|binary|varbinary|mapKey:mapVal|multiset",
+ "+I 1|40|141|binary|varbinary|mapKey:mapVal|multiset");
assertThat(getResult(read, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
- "-D 2|20|220|binary|varbinary",
- "-U 2|30|230|binary|varbinary",
- "+U 2|30|231|binary|varbinary",
- "-U 2|40|241|binary|varbinary",
- "+U 2|40|242|binary|varbinary");
+ "-D 2|20|220|binary|varbinary|mapKey:mapVal|multiset",
+ "-U 2|30|230|binary|varbinary|mapKey:mapVal|multiset",
+ "+U 2|30|231|binary|varbinary|mapKey:mapVal|multiset",
+ "-U 2|40|241|binary|varbinary|mapKey:mapVal|multiset",
+ "+U 2|40|242|binary|varbinary|mapKey:mapVal|multiset");
}
@Test
@@ -285,7 +331,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
CompatibilityTestUtils.unzip("compatibility/table-changelog-0.2.zip", tablePath.getPath());
FileStoreTable table =
createFileStoreTable(
- conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
+ conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT),
+ COMPATIBILITY_ROW_TYPE);
List<List<List<String>>> expected =
Arrays.asList(
@@ -341,7 +388,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
splits,
binaryRow(j + 1),
0,
- CHANGELOG_ROW_TO_STRING))
+ COMPATIBILITY_CHANGELOG_ROW_TO_STRING))
.isEqualTo(expected.get(i).get(j));
}
@@ -423,13 +470,17 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead().withFilter(builder.equal(1, 30));
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+ Arrays.asList(
+ "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
// push down value filter b = 300L
read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+ Arrays.asList(
+ "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
// push down both key filter and value filter
read =
@@ -439,10 +490,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "1|10|100|binary|varbinary",
- "1|20|200|binary|varbinary",
- "1|30|300|binary|varbinary",
- "1|40|400|binary|varbinary"));
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
// update pk 60, 10
write.write(rowData(1, 60, 500L));
@@ -457,12 +508,12 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "1|10|10|binary|varbinary",
- "1|20|200|binary|varbinary",
- "1|30|300|binary|varbinary",
- "1|40|400|binary|varbinary",
- "1|50|500|binary|varbinary",
- "1|60|500|binary|varbinary"));
+ "1|10|10|binary|varbinary|mapKey:mapVal|multiset",
+ "1|20|200|binary|varbinary|mapKey:mapVal|multiset",
+ "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "1|40|400|binary|varbinary|mapKey:mapVal|multiset",
+ "1|50|500|binary|varbinary|mapKey:mapVal|multiset",
+ "1|60|500|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -487,7 +538,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("1|10|200|binary|varbinary"));
+ .isEqualTo(
+ Collections.singletonList(
+ "1|10|200|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -515,14 +568,24 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("1|10|100|binary|varbinary", "1|20|201|binary|varbinary"));
+ .isEqualTo(
+ Arrays.asList(
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "1|20|201|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("2|10|300|binary|varbinary"));
+ .isEqualTo(
+ Collections.singletonList(
+ "2|10|300|binary|varbinary|mapKey:mapVal|multiset"));
}
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {
+ return createFileStoreTable(configure, ROW_TYPE);
+ }
+
+ private FileStoreTable createFileStoreTable(Consumer<Configuration> configure, RowType rowType)
+ throws Exception {
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
@@ -531,7 +594,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableSchema tableSchema =
schemaManager.commitNewVersion(
new UpdateSchema(
- ROW_TYPE,
+ rowType,
Collections.singletonList("pt"),
Arrays.asList("pt", "a"),
conf.toMap(),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 3aeffe9a..58c7d088 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -22,8 +22,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.file.Snapshot;
@@ -80,9 +82,11 @@ public abstract class FileStoreTableTestBase {
DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.BINARY(1).getLogicalType(),
- DataTypes.VARBINARY(1).getLogicalType()
+ DataTypes.VARBINARY(1).getLogicalType(),
+ DataTypes.MAP(DataTypes.VARCHAR(8), DataTypes.VARCHAR(8)).getLogicalType(),
+ DataTypes.MULTISET(DataTypes.VARCHAR(8)).getLogicalType()
},
- new String[] {"pt", "a", "b", "c", "d"});
+ new String[] {"pt", "a", "b", "c", "d", "e", "f"});
protected static final int[] PROJECTION = new int[] {2, 1};
protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
rowData ->
@@ -94,7 +98,14 @@ public abstract class FileStoreTableTestBase {
+ "|"
+ new String(rowData.getBinary(3))
+ "|"
- + new String(rowData.getBinary(4));
+ + new String(rowData.getBinary(4))
+ + "|"
+ + String.format(
+ "%s:%s",
+ rowData.getMap(5).keyArray().getString(0).toString(),
+ rowData.getMap(5).valueArray().getString(0))
+ + "|"
+ + rowData.getMap(6).keyArray().getString(0).toString();
protected static final Function<RowData, String> BATCH_PROJECTED_ROW_TO_STRING =
rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
@@ -154,9 +165,13 @@ public abstract class FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
+ .hasSameElementsAs(
+ Collections.singletonList(
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("2|21|201|binary|varbinary"));
+ .hasSameElementsAs(
+ Collections.singletonList(
+ "2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -212,7 +227,9 @@ public abstract class FileStoreTableTestBase {
TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+ Arrays.asList(
+ "1|30|300|binary|varbinary|mapKey:mapVal|multiset",
+ "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
}
@Test
@@ -342,7 +359,15 @@ public abstract class FileStoreTableTestBase {
protected GenericRowData rowData(Object... values) {
return GenericRowData.of(
- values[0], values[1], values[2], "binary".getBytes(), "varbinary".getBytes());
+ values[0],
+ values[1],
+ values[2],
+ "binary".getBytes(),
+ "varbinary".getBytes(),
+ new GenericMapData(
+ Collections.singletonMap(
+ StringData.fromString("mapKey"), StringData.fromString("mapVal"))),
+ new GenericMapData(Collections.singletonMap(StringData.fromString("multiset"), 1)));
}
protected GenericRowData rowDataWithKind(RowKind rowKind, Object... values) {
@@ -352,7 +377,11 @@ public abstract class FileStoreTableTestBase {
values[1],
values[2],
"binary".getBytes(),
- "varbinary".getBytes());
+ "varbinary".getBytes(),
+ new GenericMapData(
+ Collections.singletonMap(
+ StringData.fromString("mapKey"), StringData.fromString("mapVal"))),
+ new GenericMapData(Collections.singletonMap(StringData.fromString("multiset"), 1)));
}
protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception {
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index b7e8ce39..663260ab 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -37,8 +37,10 @@ import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.orc.TypeDescription;
@@ -111,10 +113,10 @@ public class OrcFileFormat extends FileFormat {
*/
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
+ LogicalType refinedType = refineLogicalType(type);
+ LogicalType[] orcTypes = refinedType.getChildren().toArray(new LogicalType[0]);
- TypeDescription typeDescription =
- OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type));
+ TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(refinedType);
Vectorizer<RowData> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);
@@ -145,6 +147,11 @@ public class OrcFileFormat extends FileFormat {
return new MapType(
refineLogicalType(mapType.getKeyType()),
refineLogicalType(mapType.getValueType()));
+ case MULTISET:
+ MultisetType multisetType = (MultisetType) type;
+ return new MapType(
+ refineLogicalType(multisetType.getElementType()),
+ refineLogicalType(new IntType(false)));
case ROW:
RowType rowType = (RowType) type;
return new RowType(
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 86aba5c4..03d2a69f 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -66,6 +68,8 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
new TimestampType(3),
// orc reader & writer currently cannot preserve a high precision timestamp
// new TimestampType(9),
- new ArrayType(new IntType()));
+ new ArrayType(new IntType()),
+ new MapType(new VarCharType(8), new VarCharType(8)),
+ new MultisetType(new VarCharType(8)));
}
}