You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/21 14:15:21 UTC
[flink-table-store] 02/02: [FLINK-29278] BINARY type is not supported in table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 3eee4bf4eddd6d22a0225e1958601a44f7dd9ba7
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Sep 21 19:21:03 2022 +0800
[FLINK-29278] BINARY type is not supported in table store
This closes #292
---
.../store/format/FileStatsExtractorTestBase.java | 8 ++
.../store/table/AppendOnlyFileStoreTableTest.java | 51 +++++-----
.../ChangelogValueCountFileStoreTableTest.java | 56 ++++++-----
.../table/ChangelogWithKeyFileStoreTableTest.java | 103 ++++++++++-----------
.../table/store/table/FileStoreTableTestBase.java | 75 ++++++++++-----
.../table/store/table/WritePreemptMemoryTest.java | 3 +-
.../table/store/format/orc/OrcFileFormat.java | 45 ++++++++-
.../format/orc/OrcFileStatsExtractorTest.java | 4 +
8 files changed, 221 insertions(+), 124 deletions(-)
diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 90a53f7f..18d5c89a 100644
--- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -30,11 +30,13 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;
@@ -128,6 +130,12 @@ public abstract class FileStatsExtractorTestBase {
randomString(random.nextInt(varCharType.getLength()) + 1));
case BOOLEAN:
return random.nextBoolean();
+ case BINARY:
+ BinaryType binaryType = (BinaryType) type;
+ return randomString(binaryType.getLength()).getBytes();
+ case VARBINARY:
+ VarBinaryType varBinaryType = (VarBinaryType) type;
+ return randomString(varBinaryType.getLength()).getBytes();
case TINYINT:
return (byte) random.nextInt(10);
case SMALLINT:
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 9cc7ed32..82f6508e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -63,9 +62,19 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Arrays.asList("1|10|100", "1|11|101", "1|12|102", "1|11|101", "1|12|102"));
+ Arrays.asList(
+ "1|10|100|binary|varbinary",
+ "1|11|101|binary|varbinary",
+ "1|12|102|binary|varbinary",
+ "1|11|101|binary|varbinary",
+ "1|12|102|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
+ .hasSameElementsAs(
+ Arrays.asList(
+ "2|20|200|binary|varbinary",
+ "2|21|201|binary|varbinary",
+ "2|22|202|binary|varbinary",
+ "2|21|201|binary|varbinary"));
}
@Test
@@ -94,10 +103,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
- "2|21|201",
+ "2|21|201|binary|varbinary",
// this record is in the same file with the first "2|21|201"
- "2|22|202",
- "2|21|201"));
+ "2|22|202|binary|varbinary",
+ "2|21|201|binary|varbinary"));
}
@Test
@@ -108,9 +117,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
+ .isEqualTo(
+ Arrays.asList("+1|11|101|binary|varbinary", "+1|12|102|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("+2|21|201"));
+ .isEqualTo(Collections.singletonList("+2|21|201|binary|varbinary"));
}
@Test
@@ -139,9 +149,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "+1|11|101",
+ "+1|11|101|binary|varbinary",
// this record is in the same file with "+1|11|101"
- "+1|12|102"));
+ "+1|12|102|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEmpty();
}
@@ -161,8 +171,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
for (int j = 0; j < Math.max(random.nextInt(200), 1); j++) {
BinaryRowData data =
serializer
- .toBinaryRow(
- GenericRowData.of(i, random.nextInt(), random.nextLong()))
+ .toBinaryRow(rowData(i, random.nextInt(), random.nextLong()))
.copy();
int bucket = bucket(hashcode(data), numOfBucket);
dataPerBucket.compute(
@@ -204,19 +213,19 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(2, 20, 200L));
- write.write(GenericRowData.of(1, 11, 101L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(1, 11, 101L));
commit.commit("0", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 12, 102L));
- write.write(GenericRowData.of(2, 21, 201L));
- write.write(GenericRowData.of(2, 22, 202L));
+ write.write(rowData(1, 12, 102L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(2, 22, 202L));
commit.commit("1", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 11, 101L));
- write.write(GenericRowData.of(2, 21, 201L));
- write.write(GenericRowData.of(1, 12, 102L));
+ write.write(rowData(1, 11, 101L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(1, 12, 102L));
commit.commit("2", write.prepareCommit(true));
write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 894f5d85..8ce299dd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.predicate.Predicate;
@@ -53,9 +52,17 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("1|11|101", "1|11|101", "1|12|102"));
+ .isEqualTo(
+ Arrays.asList(
+ "1|11|101|binary|varbinary",
+ "1|11|101|binary|varbinary",
+ "1|12|102|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
+ .isEqualTo(
+ Arrays.asList(
+ "2|20|200|binary|varbinary",
+ "2|21|201|binary|varbinary",
+ "2|22|202|binary|varbinary"));
}
@Test
@@ -84,9 +91,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "2|21|201",
+ "2|21|201|binary|varbinary",
// this record is in the same file with "delete 2|21|201"
- "2|22|202"));
+ "2|22|202|binary|varbinary"));
}
@Test
@@ -97,9 +104,14 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
+ .isEqualTo(
+ Arrays.asList("-1|10|100|binary|varbinary", "+1|11|101|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("-2|21|201", "-2|21|201", "+2|22|202"));
+ .isEqualTo(
+ Arrays.asList(
+ "-2|21|201|binary|varbinary",
+ "-2|21|201|binary|varbinary",
+ "+2|22|202|binary|varbinary"));
}
@Test
@@ -129,10 +141,10 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "-2|21|201",
- "-2|21|201",
+ "-2|21|201|binary|varbinary",
+ "-2|21|201|binary|varbinary",
// this record is in the same file with "delete 2|21|201"
- "+2|22|202"));
+ "+2|22|202|binary|varbinary"));
}
private void writeData() throws Exception {
@@ -140,22 +152,22 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(2, 20, 200L));
- write.write(GenericRowData.of(1, 11, 101L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(1, 11, 101L));
commit.commit("0", write.prepareCommit(true));
- write.write(GenericRowData.of(2, 21, 201L));
- write.write(GenericRowData.of(1, 12, 102L));
- write.write(GenericRowData.of(2, 21, 201L));
- write.write(GenericRowData.of(2, 21, 201L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(1, 12, 102L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(2, 21, 201L));
commit.commit("1", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 11, 101L));
- write.write(GenericRowData.of(2, 22, 202L));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
+ write.write(rowData(1, 11, 101L));
+ write.write(rowData(2, 22, 202L));
+ write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+ write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
commit.commit("2", write.prepareCommit(true));
write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 9b1495fe..2d143c39 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.file.WriteMode;
@@ -52,18 +51,18 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
createFileStoreTable(conf -> conf.set(CoreOptions.SEQUENCE_FIELD, "b"));
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 200L));
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(1, 11, 101L));
+ write.write(rowData(1, 10, 200L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 11, 101L));
commit.commit("0", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 11, 55L));
+ write.write(rowData(1, 11, 55L));
commit.commit("1", write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("1|10|200", "1|11|101"));
+ .isEqualTo(Arrays.asList("1|10|200|binary|varbinary", "1|11|101|binary|varbinary"));
}
@Test
@@ -74,9 +73,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("1|10|1000"));
+ .isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("2|21|20001", "2|22|202"));
+ .isEqualTo(
+ Arrays.asList("2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
}
@Test
@@ -107,7 +107,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Arrays.asList(
// only filter on key should be performed,
// and records from the same file should also be selected
- "2|21|20001", "2|22|202"));
+ "2|21|20001|binary|varbinary", "2|22|202|binary|varbinary"));
}
@Test
@@ -118,9 +118,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Collections.singletonList("-1|11|1001"));
+ .isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
- .isEqualTo(Arrays.asList("-2|20|200", "+2|21|20001", "+2|22|202"));
+ .isEqualTo(
+ Arrays.asList(
+ "-2|20|200|binary|varbinary",
+ "+2|21|20001|binary|varbinary",
+ "+2|22|202|binary|varbinary"));
}
@Test
@@ -153,7 +157,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
Arrays.asList(
// only filter on key should be performed,
// and records from the same file should also be selected
- "-2|20|200", "+2|21|20001", "+2|22|202"));
+ "-2|20|200|binary|varbinary",
+ "+2|21|20001|binary|varbinary",
+ "+2|22|202|binary|varbinary"));
}
@Test
@@ -163,11 +169,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
- write.write(GenericRowData.of(1, 10, 101L));
- write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
- write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+ write.write(rowData(1, 10, 101L));
+ write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
+ write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
commit.commit("0", write.prepareCommit(true));
write.close();
@@ -176,11 +182,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
- "+I 1|10|100",
- "-D 1|10|100",
- "+I 1|10|101",
- "-U 1|10|101",
- "+U 1|10|102"));
+ "+I 1|10|100|binary|varbinary",
+ "-D 1|10|100|binary|varbinary",
+ "+I 1|10|101|binary|varbinary",
+ "-U 1|10|101|binary|varbinary",
+ "+U 1|10|102|binary|varbinary"));
}
private void writeData() throws Exception {
@@ -188,21 +194,21 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(2, 20, 200L));
- write.write(GenericRowData.of(1, 11, 101L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(1, 11, 101L));
commit.commit("0", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 10, 1000L));
- write.write(GenericRowData.of(2, 21, 201L));
- write.write(GenericRowData.of(2, 21, 2001L));
+ write.write(rowData(1, 10, 1000L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(2, 21, 2001L));
commit.commit("1", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 11, 1001L));
- write.write(GenericRowData.of(2, 21, 20001L));
- write.write(GenericRowData.of(2, 22, 202L));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
+ write.write(rowData(1, 11, 1001L));
+ write.write(rowData(2, 21, 20001L));
+ write.write(rowData(2, 22, 202L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
+ write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
commit.commit("2", write.prepareCommit(true));
write.close();
@@ -216,37 +222,28 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(1, 20, 200L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
commit.commit("0", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 30, 300L));
- write.write(GenericRowData.of(1, 40, 400L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowData(1, 40, 400L));
commit.commit("1", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 50, 500L));
- write.write(GenericRowData.of(1, 60, 600L));
+ write.write(rowData(1, 50, 500L));
+ write.write(rowData(1, 60, 600L));
commit.commit("2", write.prepareCommit(true));
- write.close();
-
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = table.newScan().plan().splits;
- TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+ // push down key filter a = 30
+ TableRead read = table.newRead().withFilter(builder.equal(1, 30));
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
- Arrays.asList(
- "1|10|100",
- "1|20|200",
- "1|30|300",
- "1|40|400",
- "1|50|500",
- "1|60|600"));
-
- read = table.newRead().withFilter(builder.equal(1, 30));
- assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+ Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
+
+ write.close();
}
@Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 74b824db..19b7a836 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -65,12 +65,23 @@ public abstract class FileStoreTableTestBase {
new LogicalType[] {
DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
- DataTypes.BIGINT().getLogicalType()
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.BINARY(1).getLogicalType(),
+ DataTypes.VARBINARY(1).getLogicalType()
},
- new String[] {"pt", "a", "b"});
+ new String[] {"pt", "a", "b", "c", "d"});
protected static final int[] PROJECTION = new int[] {2, 1};
protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
- rowData -> rowData.getInt(0) + "|" + rowData.getInt(1) + "|" + rowData.getLong(2);
+ rowData ->
+ rowData.getInt(0)
+ + "|"
+ + rowData.getInt(1)
+ + "|"
+ + rowData.getLong(2)
+ + "|"
+ + new String(rowData.getBinary(3))
+ + "|"
+ + new String(rowData.getBinary(4));
protected static final Function<RowData, String> BATCH_PROJECTED_ROW_TO_STRING =
rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
@@ -100,14 +111,14 @@ public abstract class FileStoreTableTestBase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(2, 20, 200L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
commit.commit("0", write.prepareCommit(true));
write.close();
write = table.newWrite().withOverwrite(true);
commit = table.newCommit("user");
- write.write(GenericRowData.of(2, 21, 201L));
+ write.write(rowData(2, 21, 201L));
Map<String, String> overwritePartition = new HashMap<>();
overwritePartition.put("pt", "2");
commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
@@ -116,9 +127,9 @@ public abstract class FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("1|10|100"));
+ .hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Collections.singletonList("2|21|201"));
+ .hasSameElementsAs(Collections.singletonList("2|21|201|binary|varbinary"));
}
@Test
@@ -131,11 +142,11 @@ public abstract class FileStoreTableTestBase {
});
TableWrite write = table.newWrite();
- write.write(GenericRowData.of(1, 1, 2L));
- write.write(GenericRowData.of(1, 3, 4L));
- write.write(GenericRowData.of(1, 5, 6L));
- write.write(GenericRowData.of(1, 7, 8L));
- write.write(GenericRowData.of(1, 9, 10L));
+ write.write(rowData(1, 1, 2L));
+ write.write(rowData(1, 3, 4L));
+ write.write(rowData(1, 5, 6L));
+ write.write(rowData(1, 7, 8L));
+ write.write(rowData(1, 9, 10L));
table.newCommit("user").commit("0", write.prepareCommit(true));
write.close();
@@ -155,16 +166,16 @@ public abstract class FileStoreTableTestBase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10, 100L));
- write.write(GenericRowData.of(1, 20, 200L));
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
commit.commit("0", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 30, 300L));
- write.write(GenericRowData.of(1, 40, 400L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowData(1, 40, 400L));
commit.commit("1", write.prepareCommit(true));
- write.write(GenericRowData.of(1, 50, 500L));
- write.write(GenericRowData.of(1, 60, 600L));
+ write.write(rowData(1, 50, 500L));
+ write.write(rowData(1, 60, 600L));
commit.commit("2", write.prepareCommit(true));
write.close();
@@ -173,7 +184,8 @@ public abstract class FileStoreTableTestBase {
List<Split> splits = table.newScan().plan().splits;
TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
- .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
+ .hasSameElementsAs(
+ Arrays.asList("1|30|300|binary|varbinary", "1|40|400|binary|varbinary"));
}
@Test
@@ -185,21 +197,21 @@ public abstract class FileStoreTableTestBase {
for (int i = 0; i < 4; i++) {
// write lots of records, let compaction be slower
for (int j = 0; j < 1000; j++) {
- write.write(GenericRowData.of(1, 10 * i * j, 100L * i * j));
+ write.write(rowData(1, 10 * i * j, 100L * i * j));
}
commit.commit(String.valueOf(i), write.prepareCommit(false));
}
- write.write(GenericRowData.of(1, 40, 400L));
+ write.write(rowData(1, 40, 400L));
List<FileCommittable> commit4 = write.prepareCommit(false);
// trigger compaction, but not wait it.
- write.write(GenericRowData.of(2, 20, 200L));
+ write.write(rowData(2, 20, 200L));
List<FileCommittable> commit5 = write.prepareCommit(true);
// wait compaction finish
// commit5 should be a compaction commit
- write.write(GenericRowData.of(1, 60, 600L));
+ write.write(rowData(1, 60, 600L));
List<FileCommittable> commit6 = write.prepareCommit(true);
// if remove writer too fast, will see old files, do another compaction
// then will be conflicts
@@ -251,6 +263,21 @@ public abstract class FileStoreTableTestBase {
return b;
}
+ protected GenericRowData rowData(Object... values) {
+ return GenericRowData.of(
+ values[0], values[1], values[2], "binary".getBytes(), "varbinary".getBytes());
+ }
+
+ protected GenericRowData rowDataWithKind(RowKind rowKind, Object... values) {
+ return GenericRowData.ofKind(
+ rowKind,
+ values[0],
+ values[1],
+ values[2],
+ "binary".getBytes(),
+ "varbinary".getBytes());
+ }
+
protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception {
return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 7ddb76dd..353ac24e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -68,8 +68,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
Random random = new Random();
List<String> expected = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
- GenericRowData row =
- GenericRowData.of(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
+ GenericRowData row = rowData(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
write.write(row);
expected.add(BATCH_ROW_TO_STRING.apply(row));
}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 3192cf56..5d07840a 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -29,12 +29,15 @@ import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.vector.Vectorizer;
import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.orc.OrcFile;
@@ -44,6 +47,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
+import java.util.stream.Collectors;
import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
@@ -90,14 +94,18 @@ public class OrcFileFormat extends FileFormat {
}
return OrcInputFormatFactory.create(
- readerConf, type, Projection.of(projection).toTopLevelIndexes(), orcPredicates);
+ readerConf,
+ (RowType) refineLogicalType(type),
+ Projection.of(projection).toTopLevelIndexes(),
+ orcPredicates);
}
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
LogicalType[] orcTypes = type.getChildren().toArray(new LogicalType[0]);
- TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(type);
+ TypeDescription typeDescription =
+ OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type));
Vectorizer<RowData> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);
OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(orcProperties, writerConf);
@@ -113,4 +121,37 @@ public class OrcFileFormat extends FileFormat {
properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v));
return orcProperties;
}
+
+ private static LogicalType refineLogicalType(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BINARY:
+ case VARBINARY:
+ // OrcSplitReaderUtil#logicalTypeToOrcType() only supports the DataTypes.BYTES()
+ // logical type for BINARY and VARBINARY.
+ return DataTypes.BYTES().getLogicalType();
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) type;
+ return new ArrayType(
+ arrayType.isNullable(), refineLogicalType(arrayType.getElementType()));
+ case MAP:
+ MapType mapType = (MapType) type;
+ return new MapType(
+ refineLogicalType(mapType.getKeyType()),
+ refineLogicalType(mapType.getValueType()));
+ case ROW:
+ RowType rowType = (RowType) type;
+ return new RowType(
+ rowType.isNullable(),
+ rowType.getFields().stream()
+ .map(
+ f ->
+ new RowType.RowField(
+ f.getName(),
+ refineLogicalType(f.getType()),
+ f.getDescription().orElse(null)))
+ .collect(Collectors.toList()));
+ default:
+ return type;
+ }
+ }
}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index eaac4b29..86aba5c4 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
@@ -34,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
/** Tests for {@link OrcFileStatsExtractor}. */
@@ -50,6 +52,8 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
new CharType(8),
new VarCharType(8),
new BooleanType(),
+ new BinaryType(8),
+ new VarBinaryType(8),
new TinyIntType(),
new SmallIntType(),
new IntType(),