You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/03 00:11:05 UTC
[flink-table-store] branch master updated: [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 1012ea20 [FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData
1012ea20 is described below
commit 1012ea20cecbd6237acefc6427769bec164d5076
Author: tsreaper <ts...@gmail.com>
AuthorDate: Fri Jun 3 08:11:00 2022 +0800
[FLINK-27875] Introduce TableScan and TableRead as an abstraction layer above FileStore for reading RowData
This closes #145
---
.../flink/table/store/connector/TableStore.java | 15 +-
.../store/connector/TableStoreFactoryOptions.java | 16 --
.../store/connector/TableStoreManagedFactory.java | 2 +-
.../connector/source/FileStoreSourceTest.java | 1 -
.../flink/table/store/file/FileStoreImpl.java | 12 +-
.../flink/table/store/file/FileStoreOptions.java | 12 ++
.../store/file/predicate/PredicateBuilder.java | 18 ++
.../flink/table/store/file/schema/Schema.java | 22 +++
.../store/table/AppendOnlyFileStoreTable.java | 89 ++++++++++
.../table/ChangelogValueCountFileStoreTable.java | 105 ++++++++++++
.../table/ChangelogWithKeyFileStoreTable.java | 130 +++++++++++++++
.../flink/table/store/table/FileStoreTable.java | 39 +++++
.../table/store/table/FileStoreTableFactory.java | 53 ++++++
.../store/table/source/DefaultSplitGenerator.java | 60 +++++++
.../flink/table/store/table/source/Split.java | 59 +++++++
.../table/store/table/source/SplitGenerator.java | 34 ++++
.../flink/table/store/table/source/TableRead.java | 72 ++++++++
.../flink/table/store/table/source/TableScan.java | 119 ++++++++++++++
.../source/ValueContentRowDataRecordIterator.java | 64 +++++++
.../source/ValueCountRowDataRecordIterator.java | 86 ++++++++++
.../flink/table/store/file/TestFileStore.java | 1 -
.../store/file/predicate/PredicateBuilderTest.java | 53 ++++++
.../table/store/file/utils/ReusingTestData.java | 2 +-
.../source/RowDataRecordIteratorTestBase.java | 63 +++++++
.../ValueContentRowDataRecordIteratorTest.java | 51 ++++++
.../ValueCountRowDataRecordIteratorTest.java | 86 ++++++++++
.../flink/table/store/TableStoreJobConf.java | 1 +
.../table/store/mapred/TableStoreInputFormat.java | 183 ++++-----------------
.../table/store/mapred/TableStoreInputSplit.java | 8 +-
.../table/store/mapred/TableStoreRecordReader.java | 15 +-
.../flink/table/store/FileStoreTestHelper.java | 75 ++++-----
.../hive/TableStoreHiveStorageHandlerITCase.java | 40 ++---
.../store/mapred/TableStoreRecordReaderTest.java | 28 ++--
33 files changed, 1316 insertions(+), 298 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 3060323f..3e9b1f7c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -180,7 +180,6 @@ public class TableStore {
FileStoreOptions fileStoreOptions = new FileStoreOptions(options);
return FileStoreImpl.createWithAppendOnly(
- fileStoreOptions.path().toString(),
schema.id(),
fileStoreOptions,
user,
@@ -195,15 +194,9 @@ public class TableStore {
if (trimmedPrimaryKeys.length == 0) {
return FileStoreImpl.createWithValueCount(
- fileStoreOptions.path().toString(),
- schema.id(),
- fileStoreOptions,
- user,
- partitionType,
- type);
+ schema.id(), fileStoreOptions, user, partitionType, type);
} else {
return FileStoreImpl.createWithPrimaryKey(
- fileStoreOptions.path().toString(),
schema.id(),
fileStoreOptions,
user,
@@ -215,7 +208,7 @@ public class TableStore {
}
private FileStore buildFileStore() {
- WriteMode writeMode = options.get(TableStoreFactoryOptions.WRITE_MODE);
+ WriteMode writeMode = options.get(FileStoreOptions.WRITE_MODE);
switch (writeMode) {
case CHANGE_LOG:
@@ -320,7 +313,7 @@ public class TableStore {
}
private Source<RowData, ?, ?> buildSource() {
- WriteMode writeMode = options.get(TableStoreFactoryOptions.WRITE_MODE);
+ WriteMode writeMode = options.get(FileStoreOptions.WRITE_MODE);
if (isContinuous) {
if (schema.primaryKeys().size() > 0 && mergeEngine() == PARTIAL_UPDATE) {
throw new ValidationException(
@@ -411,7 +404,7 @@ public class TableStore {
public DataStreamSink<?> build() {
FileStore fileStore = buildFileStore();
int numBucket = options.get(BUCKET);
- WriteMode writeMode = options.get(TableStoreFactoryOptions.WRITE_MODE);
+ WriteMode writeMode = options.get(FileStoreOptions.WRITE_MODE);
BucketStreamPartitioner partitioner =
new BucketStreamPartitioner(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 2f429cce..b0aa9302 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -21,15 +21,11 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.description.Description;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.WriteMode;
import java.util.HashSet;
import java.util.Set;
-import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-
/** Options for {@link TableStoreManagedFactory}. */
public class TableStoreFactoryOptions {
@@ -67,18 +63,6 @@ public class TableStoreFactoryOptions {
.noDefaultValue()
.withDescription("The log system used to keep changes of the table.");
- public static final ConfigOption<WriteMode> WRITE_MODE =
- ConfigOptions.key("write-mode")
- .enumType(WriteMode.class)
- .defaultValue(WriteMode.CHANGE_LOG)
- .withDescription(
- Description.builder()
- .text("Specify the write mode for table.")
- .linebreak()
- .list(formatEnumOption(WriteMode.APPEND_ONLY))
- .list(formatEnumOption(WriteMode.CHANGE_LOG))
- .build());
-
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
public static final ConfigOption<Integer> SCAN_PARALLELISM =
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 60547536..e858ca5e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -40,10 +40,10 @@ import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.WRITE_MODE;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.file.FileStoreOptions.WRITE_MODE;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
/** Default implementation of {@link ManagedTableFactory}. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
index 037c09e1..a33816a9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java
@@ -146,7 +146,6 @@ public class FileStoreSourceTest {
MergeFunction mergeFunction =
hasPk ? new DeduplicateMergeFunction() : new ValueCountMergeFunction();
return new FileStoreImpl(
- "/fake/path",
0,
new FileStoreOptions(new Configuration()),
WriteMode.CHANGE_LOG,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index f27808b1..6c10899b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
@@ -49,7 +48,6 @@ import java.util.stream.Collectors;
/** File store implementation. */
public class FileStoreImpl implements FileStore {
- private final String tablePath;
private final long schemaId;
private final WriteMode writeMode;
private final FileStoreOptions options;
@@ -61,7 +59,6 @@ public class FileStoreImpl implements FileStore {
private final GeneratedRecordComparator genRecordComparator;
public FileStoreImpl(
- String tablePath,
long schemaId,
FileStoreOptions options,
WriteMode writeMode,
@@ -70,7 +67,6 @@ public class FileStoreImpl implements FileStore {
RowType keyType,
RowType valueType,
@Nullable MergeFunction mergeFunction) {
- this.tablePath = tablePath;
this.schemaId = schemaId;
this.options = options;
this.writeMode = writeMode;
@@ -86,7 +82,7 @@ public class FileStoreImpl implements FileStore {
public FileStorePathFactory pathFactory() {
return new FileStorePathFactory(
- new Path(tablePath), partitionType, options.partitionDefaultName());
+ options.path(), partitionType, options.partitionDefaultName());
}
@VisibleForTesting
@@ -187,14 +183,12 @@ public class FileStoreImpl implements FileStore {
}
public static FileStoreImpl createWithAppendOnly(
- String tablePath,
long schemaId,
FileStoreOptions options,
String user,
RowType partitionType,
RowType rowType) {
return new FileStoreImpl(
- tablePath,
schemaId,
options,
WriteMode.APPEND_ONLY,
@@ -206,7 +200,6 @@ public class FileStoreImpl implements FileStore {
}
public static FileStoreImpl createWithPrimaryKey(
- String tablePath,
long schemaId,
FileStoreOptions options,
String user,
@@ -244,7 +237,6 @@ public class FileStoreImpl implements FileStore {
}
return new FileStoreImpl(
- tablePath,
schemaId,
options,
WriteMode.CHANGE_LOG,
@@ -256,7 +248,6 @@ public class FileStoreImpl implements FileStore {
}
public static FileStoreImpl createWithValueCount(
- String tablePath,
long schemaId,
FileStoreOptions options,
String user,
@@ -267,7 +258,6 @@ public class FileStoreImpl implements FileStore {
new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
MergeFunction mergeFunction = new ValueCountMergeFunction();
return new FileStoreImpl(
- tablePath,
schemaId,
options,
WriteMode.CHANGE_LOG,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 06dd7285..569ecc4b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -129,6 +129,18 @@ public class FileStoreOptions implements Serializable {
formatEnumOption(MergeEngine.PARTIAL_UPDATE))
.build());
+ public static final ConfigOption<WriteMode> WRITE_MODE =
+ ConfigOptions.key("write-mode")
+ .enumType(WriteMode.class)
+ .defaultValue(WriteMode.CHANGE_LOG)
+ .withDescription(
+ Description.builder()
+ .text("Specify the write mode for table.")
+ .linebreak()
+ .list(formatEnumOption(WriteMode.APPEND_ONLY))
+ .list(formatEnumOption(WriteMode.CHANGE_LOG))
+ .build());
+
private final Configuration options;
public static Set<ConfigOption<?>> allOptions() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index b431d81e..03285ee0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.predicate;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -106,4 +107,21 @@ public class PredicateBuilder {
greaterOrEqual(idx, includedLowerBound),
lessOrEqual(idx, includedUpperBound)));
}
+
+ public static List<Predicate> splitAnd(Predicate predicate) {
+ List<Predicate> result = new ArrayList<>();
+ splitAnd(predicate, result);
+ return result;
+ }
+
+ private static void splitAnd(Predicate predicate, List<Predicate> result) {
+ if (predicate instanceof CompoundPredicate
+ && ((CompoundPredicate) predicate).function().equals(And.INSTANCE)) {
+ for (Predicate child : ((CompoundPredicate) predicate).children()) {
+ splitAnd(child, result);
+ }
+ } else {
+ result.add(predicate);
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
index 0dbc08c0..438f4dac 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
@@ -84,6 +84,10 @@ public class Schema {
return fields;
}
+ public List<String> fieldNames() {
+ return fields.stream().map(DataField::name).collect(Collectors.toList());
+ }
+
public int highestFieldId() {
return highestFieldId;
}
@@ -132,6 +136,24 @@ public class Schema {
return (RowType) new RowDataType(fields).logicalType;
}
+ public RowType logicalPartitionType() {
+ return projectedLogicalRowType(partitionKeys);
+ }
+
+ public RowType logicalPrimaryKeysType() {
+ return projectedLogicalRowType(primaryKeys);
+ }
+
+ private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
+ List<String> fieldNames = fieldNames();
+ return (RowType)
+ new RowDataType(
+ projectedFieldNames.stream()
+ .map(k -> fields.get(fieldNames.indexOf(k)))
+ .collect(Collectors.toList()))
+ .logicalType;
+ }
+
@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
new file mode 100644
index 00000000..53255ed7
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
+public class AppendOnlyFileStoreTable implements FileStoreTable {
+
+ private final Schema schema;
+ private final FileStoreImpl store;
+
+ AppendOnlyFileStoreTable(Schema schema, Configuration conf, String user) {
+ this.schema = schema;
+ this.store =
+ new FileStoreImpl(
+ schema.id(),
+ new FileStoreOptions(conf),
+ WriteMode.APPEND_ONLY,
+ user,
+ schema.logicalPartitionType(),
+ RowType.of(),
+ schema.logicalRowType(),
+ null);
+ }
+
+ @Override
+ public TableScan newScan(boolean incremental) {
+ FileStoreScan scan = store.newScan().withIncremental(incremental);
+ return new TableScan(scan, schema, store.pathFactory()) {
+ @Override
+ protected void withNonPartitionFilter(Predicate predicate) {
+ scan.withValueFilter(predicate);
+ }
+ };
+ }
+
+ @Override
+ public TableRead newRead(boolean incremental) {
+ return new TableRead(store.newRead()) {
+ @Override
+ public TableRead withProjection(int[][] projection) {
+ read.withValueProjection(projection);
+ return this;
+ }
+
+ @Override
+ protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+ RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
+ return new ValueContentRowDataRecordIterator(kvRecordIterator);
+ }
+ };
+ }
+
+ @Override
+ public FileStore fileStore() {
+ return store;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
new file mode 100644
index 00000000..feacd088
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without primary keys. */
+public class ChangelogValueCountFileStoreTable implements FileStoreTable {
+
+ private final Schema schema;
+ private final FileStoreImpl store;
+
+ ChangelogValueCountFileStoreTable(Schema schema, Configuration conf, String user) {
+ this.schema = schema;
+ RowType countType =
+ RowType.of(
+ new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
+ MergeFunction mergeFunction = new ValueCountMergeFunction();
+ this.store =
+ new FileStoreImpl(
+ schema.id(),
+ new FileStoreOptions(conf),
+ WriteMode.CHANGE_LOG,
+ user,
+ schema.logicalPartitionType(),
+ schema.logicalRowType(),
+ countType,
+ mergeFunction);
+ }
+
+ @Override
+ public TableScan newScan(boolean incremental) {
+ FileStoreScan scan = store.newScan().withIncremental(incremental);
+ return new TableScan(scan, schema, store.pathFactory()) {
+ @Override
+ protected void withNonPartitionFilter(Predicate predicate) {
+ scan.withKeyFilter(predicate);
+ }
+ };
+ }
+
+ @Override
+ public TableRead newRead(boolean incremental) {
+ FileStoreRead read = store.newRead().withDropDelete(!incremental);
+ return new TableRead(read) {
+ private int[][] projection = null;
+
+ @Override
+ public TableRead withProjection(int[][] projection) {
+ if (incremental) {
+ read.withKeyProjection(projection);
+ } else {
+ this.projection = projection;
+ }
+ return this;
+ }
+
+ @Override
+ protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+ RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
+ return new ValueCountRowDataRecordIterator(kvRecordIterator, projection);
+ }
+ };
+ }
+
+ @Override
+ public FileStore fileStore() {
+ return store;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
new file mode 100644
index 00000000..fd887f5d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with primary keys. */
+public class ChangelogWithKeyFileStoreTable implements FileStoreTable {
+
+ private final Schema schema;
+ private final FileStoreImpl store;
+
+ ChangelogWithKeyFileStoreTable(Schema schema, Configuration conf, String user) {
+ this.schema = schema;
+ RowType rowType = schema.logicalRowType();
+
+ // add _KEY_ prefix to avoid conflict with value
+ RowType keyType =
+ new RowType(
+ schema.logicalPrimaryKeysType().getFields().stream()
+ .map(
+ f ->
+ new RowType.RowField(
+ "_KEY_" + f.getName(),
+ f.getType(),
+ f.getDescription().orElse(null)))
+ .collect(Collectors.toList()));
+
+ FileStoreOptions.MergeEngine mergeEngine = conf.get(FileStoreOptions.MERGE_ENGINE);
+ MergeFunction mergeFunction;
+ switch (mergeEngine) {
+ case DEDUPLICATE:
+ mergeFunction = new DeduplicateMergeFunction();
+ break;
+ case PARTIAL_UPDATE:
+ List<LogicalType> fieldTypes = rowType.getChildren();
+ RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+ }
+ mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
+ }
+
+ this.store =
+ new FileStoreImpl(
+ schema.id(),
+ new FileStoreOptions(conf),
+ WriteMode.CHANGE_LOG,
+ user,
+ schema.logicalPartitionType(),
+ keyType,
+ rowType,
+ mergeFunction);
+ }
+
+ @Override
+ public TableScan newScan(boolean incremental) {
+ FileStoreScan scan = store.newScan().withIncremental(incremental);
+ return new TableScan(scan, schema, store.pathFactory()) {
+ @Override
+ protected void withNonPartitionFilter(Predicate predicate) {
+ scan.withValueFilter(predicate);
+ }
+ };
+ }
+
+ @Override
+ public TableRead newRead(boolean incremental) {
+ FileStoreRead read = store.newRead().withDropDelete(!incremental);
+ return new TableRead(read) {
+ @Override
+ public TableRead withProjection(int[][] projection) {
+ read.withValueProjection(projection);
+ return this;
+ }
+
+ @Override
+ protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+ RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
+ return new ValueContentRowDataRecordIterator(kvRecordIterator);
+ }
+ };
+ }
+
+ @Override
+ public FileStore fileStore() {
+ return store;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
new file mode 100644
index 00000000..b2542ce1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.TableScan;
+
+/**
+ * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
+ * and writing of {@link org.apache.flink.table.data.RowData}.
+ */
+public interface FileStoreTable {
+
+ TableScan newScan(boolean incremental);
+
+ TableRead newRead(boolean incremental);
+
+ // TODO remove this once TableWrite is introduced
+ @VisibleForTesting
+ FileStore fileStore();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
new file mode 100644
index 00000000..9f9f1d08
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+
+/** Factory to create {@link FileStoreTable}. */
+public class FileStoreTableFactory {
+
+ public static FileStoreTable create(Configuration conf, String user) {
+ Path tablePath = FileStoreOptions.path(conf);
+ Schema schema =
+ new SchemaManager(tablePath)
+ .latest()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Schema file not found in location "
+ + tablePath
+ + ". Please create table first."));
+
+ if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
+ return new AppendOnlyFileStoreTable(schema, conf, user);
+ } else {
+ if (schema.primaryKeys().isEmpty()) {
+ return new ChangelogValueCountFileStoreTable(schema, conf, user);
+ } else {
+ return new ChangelogWithKeyFileStoreTable(schema, conf, user);
+ }
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
new file mode 100644
index 00000000..01107bb8
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Default implementation of {@link SplitGenerator}. The whole bucket is marked as one split. */
+public class DefaultSplitGenerator implements SplitGenerator {
+
+ private final FileStorePathFactory pathFactory;
+
+ public DefaultSplitGenerator(FileStorePathFactory pathFactory) {
+ this.pathFactory = pathFactory;
+ }
+
+ @Override
+ public List<Split> generate(
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFileMetas) {
+ List<Split> splits = new ArrayList<>();
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
+ groupedDataFileMetas.entrySet()) {
+ BinaryRowData partition = entryWithPartition.getKey();
+ for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
+ entryWithPartition.getValue().entrySet()) {
+ int bucket = entryWithBucket.getKey();
+ splits.add(
+ new Split(
+ partition,
+ bucket,
+ entryWithBucket.getValue(),
+ pathFactory
+ .createDataFilePathFactory(partition, bucket)
+ .bucketPath()));
+ }
+ }
+ return splits;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
new file mode 100644
index 00000000..fcb12739
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.List;
+
+/** Input splits. Needed by most batch computation engines. */
+public class Split {
+
+ private final BinaryRowData partition;
+ private final int bucket;
+ private final List<DataFileMeta> files;
+
+ private final Path bucketPath;
+
+ public Split(BinaryRowData partition, int bucket, List<DataFileMeta> files, Path bucketPath) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.files = files;
+
+ this.bucketPath = bucketPath;
+ }
+
+ public BinaryRowData partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public List<DataFileMeta> files() {
+ return files;
+ }
+
+ public Path bucketPath() {
+ return bucketPath;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
new file mode 100644
index 00000000..fb0ce382
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate {@link Split}s from a map with partition and bucket as keys and {@link DataFileMeta}s as
+ * values.
+ */
+public interface SplitGenerator {
+
+ List<Split> generate(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFileMetas);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
new file mode 100644
index 00000000..c5989aa4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
+public abstract class TableRead {
+
+ protected final FileStoreRead read;
+
+ protected TableRead(FileStoreRead read) {
+ this.read = read;
+ }
+
+ public abstract TableRead withProjection(int[][] projection);
+
+ public RecordReader<RowData> createReader(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+ return new RowDataRecordReader(read.createReader(partition, bucket, files));
+ }
+
+ protected abstract RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
+ RecordReader.RecordIterator<KeyValue> kvRecordIterator);
+
+ private class RowDataRecordReader implements RecordReader<RowData> {
+
+ private final RecordReader<KeyValue> wrapped;
+
+ private RowDataRecordReader(RecordReader<KeyValue> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<RowData> readBatch() throws IOException {
+ RecordIterator<KeyValue> batch = wrapped.readBatch();
+ return batch == null ? null : rowDataRecordIteratorFromKv(batch);
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
new file mode 100644
index 00000000..efd6759e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.CompoundPredicate;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
+public abstract class TableScan {
+
+ protected final FileStoreScan scan;
+ private final Schema schema;
+ private final FileStorePathFactory pathFactory;
+
+ protected TableScan(FileStoreScan scan, Schema schema, FileStorePathFactory pathFactory) {
+ this.scan = scan;
+ this.schema = schema;
+ this.pathFactory = pathFactory;
+ }
+
+ public TableScan withSnapshot(long snapshotId) {
+ scan.withSnapshot(snapshotId);
+ return this;
+ }
+
+ public TableScan withFilter(Predicate predicate) {
+ List<String> partitionKeys = schema.partitionKeys();
+ int[] fieldIdxToPartitionIdx =
+ schema.fields().stream().mapToInt(f -> partitionKeys.indexOf(f.name())).toArray();
+
+ List<Predicate> partitionFilters = new ArrayList<>();
+ List<Predicate> nonPartitionFilters = new ArrayList<>();
+ for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+ Optional<Predicate> mapped = mapToPartitionFilter(p, fieldIdxToPartitionIdx);
+ if (mapped.isPresent()) {
+ partitionFilters.add(mapped.get());
+ } else {
+ nonPartitionFilters.add(p);
+ }
+ }
+
+ scan.withPartitionFilter(new CompoundPredicate(And.INSTANCE, partitionFilters));
+ withNonPartitionFilter(new CompoundPredicate(And.INSTANCE, nonPartitionFilters));
+ return this;
+ }
+
+ public Plan plan() {
+ FileStoreScan.Plan plan = scan.plan();
+ return new Plan(
+ plan.snapshotId(),
+ new DefaultSplitGenerator(pathFactory).generate(plan.groupByPartFiles()));
+ }
+
+ protected abstract void withNonPartitionFilter(Predicate predicate);
+
+ private Optional<Predicate> mapToPartitionFilter(
+ Predicate predicate, int[] fieldIdxToPartitionIdx) {
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
+ List<Predicate> children = new ArrayList<>();
+ for (Predicate child : compoundPredicate.children()) {
+ Optional<Predicate> mapped = mapToPartitionFilter(child, fieldIdxToPartitionIdx);
+ if (mapped.isPresent()) {
+ children.add(mapped.get());
+ } else {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(new CompoundPredicate(compoundPredicate.function(), children));
+ } else {
+ LeafPredicate leafPredicate = (LeafPredicate) predicate;
+ int mapped = fieldIdxToPartitionIdx[leafPredicate.index()];
+ if (mapped >= 0) {
+ return Optional.of(
+ new LeafPredicate(
+ leafPredicate.function(), mapped, leafPredicate.literal()));
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ /** Scanning plan containing snapshot ID and input splits. */
+ public static class Plan {
+ public final long snapshotId;
+ public final List<Split> splits;
+
+ private Plan(long snapshotId, List<Split> splits) {
+ this.snapshotId = snapshotId;
+ this.splits = splits;
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
new file mode 100644
index 00000000..3f8b7d81
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+
+/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
+public class ValueContentRowDataRecordIterator implements RecordReader.RecordIterator<RowData> {
+
+ private final RecordReader.RecordIterator<KeyValue> kvIterator;
+
+ public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> kvIterator) {
+ this.kvIterator = kvIterator;
+ }
+
+ @Override
+ public RowData next() throws IOException {
+ KeyValue kv = kvIterator.next();
+ if (kv == null) {
+ return null;
+ }
+
+ RowData rowData = kv.value();
+ // kv.value() is reused, so we need to set row kind each time
+ switch (kv.valueKind()) {
+ case ADD:
+ rowData.setRowKind(RowKind.INSERT);
+ break;
+ case DELETE:
+ rowData.setRowKind(RowKind.DELETE);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + kv.valueKind().name());
+ }
+ return rowData;
+ }
+
+ @Override
+ public void releaseBatch() {
+ kvIterator.releaseBatch();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
new file mode 100644
index 00000000..1bb2f2a5
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * An {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to several {@link RowData}
+ * according to its key. These {@link RowData}s are the same. The number of rows depends on the
+ * value of {@link KeyValue}.
+ */
+public class ValueCountRowDataRecordIterator implements RecordReader.RecordIterator<RowData> {
+
+ private final RecordReader.RecordIterator<KeyValue> kvIterator;
+ private final @Nullable ProjectedRowData projectedRowData;
+
+ private RowData rowData;
+ private long count;
+
+ public ValueCountRowDataRecordIterator(
+ RecordReader.RecordIterator<KeyValue> kvIterator, @Nullable int[][] projection) {
+ this.kvIterator = kvIterator;
+ this.projectedRowData = projection == null ? null : ProjectedRowData.from(projection);
+
+ this.rowData = null;
+ this.count = 0;
+ }
+
+ @Override
+ public RowData next() throws IOException {
+ while (true) {
+ if (count > 0) {
+ count--;
+ return rowData;
+ } else {
+ KeyValue kv = kvIterator.next();
+ if (kv == null) {
+ return null;
+ }
+
+ if (projectedRowData == null) {
+ rowData = kv.key();
+ } else {
+ rowData = projectedRowData.replaceRow(kv.key());
+ }
+ long value = kv.value().getLong(0);
+ // kv.value() is reused, so we need to set row kind each time
+ if (value > 0) {
+ rowData.setRowKind(RowKind.INSERT);
+ } else {
+ rowData.setRowKind(RowKind.DELETE);
+ }
+ count = Math.abs(value);
+ }
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ kvIterator.releaseBatch();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 03a81bdd..e1688f2b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -111,7 +111,6 @@ public class TestFileStore extends FileStoreImpl {
RowType valueType,
MergeFunction mergeFunction) {
super(
- options.path().toString(),
0L,
options,
WriteMode.CHANGE_LOG,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
new file mode 100644
index 00000000..dd190998
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PredicateBuilder}. */
+public class PredicateBuilderTest {
+
+ @Test
+ public void testSplitAnd() {
+ Predicate child1 =
+ PredicateBuilder.or(
+ PredicateBuilder.isNull(0),
+ PredicateBuilder.isNull(1),
+ PredicateBuilder.isNull(2));
+ Predicate child2 =
+ PredicateBuilder.and(
+ PredicateBuilder.isNull(3),
+ PredicateBuilder.isNull(4),
+ PredicateBuilder.isNull(5));
+ Predicate child3 = PredicateBuilder.isNull(6);
+
+ assertThat(PredicateBuilder.splitAnd(PredicateBuilder.and(child1, child2, child3)))
+ .isEqualTo(
+ Arrays.asList(
+ child1,
+ PredicateBuilder.isNull(3),
+ PredicateBuilder.isNull(4),
+ PredicateBuilder.isNull(5),
+ child3));
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
index 3e6a53ac..25581938 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
@@ -33,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* A simple test data structure which is mainly used for testing if other components handle reuse of
- * {@link KeyValue} correctly. Use along with {@link ReusingTestData}.
+ * {@link KeyValue} correctly. Use along with {@link ReusingKeyValue}.
*/
public class ReusingTestData implements Comparable<ReusingTestData> {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/RowDataRecordIteratorTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/RowDataRecordIteratorTestBase.java
new file mode 100644
index 00000000..e422514d
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/RowDataRecordIteratorTestBase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** Tests for {@link RecordReader.RecordIterator} of {@link RowData}. */
+public abstract class RowDataRecordIteratorTestBase {
+
+ protected void testIterator(
+ List<ReusingTestData> input,
+ Function<RecordReader.RecordIterator<KeyValue>, RecordReader.RecordIterator<RowData>>
+ rowDataIteratorSupplier,
+ BiConsumer<RowData, Integer> resultChecker)
+ throws Exception {
+ int cnt = 0;
+ TestReusingRecordReader recordReader = new TestReusingRecordReader(input);
+ while (true) {
+ RecordReader.RecordIterator<KeyValue> kvIterator = recordReader.readBatch();
+ if (kvIterator == null) {
+ break;
+ }
+ RecordReader.RecordIterator<RowData> rowDataIterator =
+ rowDataIteratorSupplier.apply(kvIterator);
+ RowData rowData;
+ while (true) {
+ rowData = rowDataIterator.next();
+ if (rowData == null) {
+ break;
+ }
+ resultChecker.accept(rowData, cnt);
+ cnt++;
+ }
+ rowDataIterator.releaseBatch();
+ }
+ recordReader.close();
+ recordReader.assertCleanUp();
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIteratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIteratorTest.java
new file mode 100644
index 00000000..601059f2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIteratorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ValueContentRowDataRecordIterator}. */
+public class ValueContentRowDataRecordIteratorTest extends RowDataRecordIteratorTestBase {
+
+ @Test
+ public void testIterator() throws Exception {
+ List<ReusingTestData> input =
+ ReusingTestData.parse("1, 1, +, 100 | 2, 2, +, 200 | 1, 3, -, 100 | 2, 4, +, 300");
+ List<Long> expectedValues = Arrays.asList(100L, 200L, 100L, 300L);
+ List<RowKind> expectedRowKinds =
+ Arrays.asList(RowKind.INSERT, RowKind.INSERT, RowKind.DELETE, RowKind.INSERT);
+
+ testIterator(
+ input,
+ ValueContentRowDataRecordIterator::new,
+ (rowData, idx) -> {
+ assertThat(rowData.getArity()).isEqualTo(1);
+ assertThat(rowData.getLong(0)).isEqualTo(expectedValues.get(idx));
+ assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
+ });
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
new file mode 100644
index 00000000..0f6dd6f5
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ValueCountRowDataRecordIterator}. */
+public class ValueCountRowDataRecordIteratorTest extends RowDataRecordIteratorTestBase {
+
+ @Test
+ public void testIteratorWithoutProjection() throws Exception {
+ List<ReusingTestData> input =
+ ReusingTestData.parse("1, 1, +, 3 | 2, 2, +, 1 | 1, 3, +, -2 | 2, 4, +, -1");
+ List<Integer> expectedValues = Arrays.asList(1, 1, 1, 2, 1, 1, 2);
+ List<RowKind> expectedRowKinds =
+ Arrays.asList(
+ RowKind.INSERT,
+ RowKind.INSERT,
+ RowKind.INSERT,
+ RowKind.INSERT,
+ RowKind.DELETE,
+ RowKind.DELETE,
+ RowKind.DELETE);
+
+ testIterator(
+ input,
+ kvIterator -> new ValueCountRowDataRecordIterator(kvIterator, null),
+ (rowData, idx) -> {
+ assertThat(rowData.getArity()).isEqualTo(1);
+ assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
+ assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
+ });
+ }
+
+ @Test
+ public void testIteratorWithProjection() throws Exception {
+ List<ReusingTestData> input =
+ ReusingTestData.parse("1, 1, +, 3 | 2, 2, +, 1 | 1, 3, +, -2 | 2, 4, +, -1");
+ List<Integer> expectedValues = Arrays.asList(1, 1, 1, 2, 1, 1, 2);
+ List<RowKind> expectedRowKinds =
+ Arrays.asList(
+ RowKind.INSERT,
+ RowKind.INSERT,
+ RowKind.INSERT,
+ RowKind.INSERT,
+ RowKind.DELETE,
+ RowKind.DELETE,
+ RowKind.DELETE);
+
+ testIterator(
+ input,
+ kvIterator ->
+ new ValueCountRowDataRecordIterator(
+ kvIterator, new int[][] {new int[] {0}, new int[] {0}}),
+ (rowData, idx) -> {
+ assertThat(rowData.getArity()).isEqualTo(2);
+ assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
+ assertThat(rowData.getInt(1)).isEqualTo(expectedValues.get(idx));
+ assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
+ });
+ }
+}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index d65a857f..9af0648f 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -136,6 +136,7 @@ public class TableStoreJobConf {
}
public void updateFileStoreOptions(Configuration fileStoreOptions) {
+ fileStoreOptions.set(FileStoreOptions.PATH, getLocation());
for (Map.Entry<String, String> entry :
jobConf.getPropsWithPrefix(INTERNAL_TBLPROPERTIES_PREFIX).entrySet()) {
fileStoreOptions.setString(entry.getKey(), entry.getValue());
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 76352c90..25f9dcda 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -19,21 +19,13 @@
package org.apache.flink.table.store.mapred;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
import org.apache.flink.table.store.TableStoreJobConf;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableScan;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -46,15 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import javax.annotation.Nullable;
-
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
/**
* {@link InputFormat} for table store. It divides all files into {@link InputSplit}s (one split per
@@ -64,155 +49,45 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
- FileStoreWrapper wrapper = new FileStoreWrapper(jobConf);
- FileStoreScan scan = wrapper.newScan();
- List<TableStoreInputSplit> result = new ArrayList<>();
- for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> pe :
- scan.plan().groupByPartFiles().entrySet()) {
- for (Map.Entry<Integer, List<DataFileMeta>> be : pe.getValue().entrySet()) {
- BinaryRowData partition = pe.getKey();
- int bucket = be.getKey();
- String bucketPath =
- wrapper.store
- .pathFactory()
- .createDataFilePathFactory(partition, bucket)
- .bucketPath()
- .toString();
- TableStoreInputSplit split =
- new TableStoreInputSplit(partition, bucket, be.getValue(), bucketPath);
- result.add(split);
- }
- }
- return result.toArray(new TableStoreInputSplit[0]);
+ FileStoreTable table = createFileStoreTable(jobConf);
+ TableScan scan = table.newScan(false);
+ createPredicate(jobConf).ifPresent(scan::withFilter);
+ return scan.plan().splits.stream()
+ .map(TableStoreInputSplit::create)
+ .toArray(TableStoreInputSplit[]::new);
}
@Override
public RecordReader<Void, RowDataContainer> getRecordReader(
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
- FileStoreWrapper wrapper = new FileStoreWrapper(jobConf);
- FileStoreRead read = wrapper.store.newRead();
+ FileStoreTable table = createFileStoreTable(jobConf);
TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
- org.apache.flink.table.store.file.utils.RecordReader<KeyValue> wrapped =
- read.withDropDelete(true)
- .createReader(split.partition(), split.bucket(), split.files());
long splitLength = split.getLength();
return new TableStoreRecordReader(
- wrapped,
- !new TableStoreJobConf(jobConf).getPrimaryKeyNames().isPresent(),
+ table.newRead(false).createReader(split.partition(), split.bucket(), split.files()),
splitLength);
}
- private static class FileStoreWrapper {
-
- private List<String> columnNames;
- private List<LogicalType> columnTypes;
-
- private FileStoreImpl store;
- private boolean valueCountMode;
- @Nullable private Predicate predicate;
-
- private FileStoreWrapper(JobConf jobConf) {
- createFileStore(jobConf);
- createPredicate(jobConf);
- }
-
- private void createFileStore(JobConf jobConf) {
- TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
-
- String dbName = wrapper.getDbName();
- String tableName = wrapper.getTableName();
-
- Configuration options = new Configuration();
- String tableLocation = wrapper.getLocation();
- wrapper.updateFileStoreOptions(options);
-
- String user = wrapper.getFileStoreUser();
-
- columnNames = wrapper.getColumnNames();
- columnTypes = wrapper.getColumnTypes();
-
- List<String> partitionColumnNames = wrapper.getPartitionColumnNames();
-
- RowType rowType =
- RowType.of(
- columnTypes.toArray(new LogicalType[0]),
- columnNames.toArray(new String[0]));
- LogicalType[] partitionLogicalTypes =
- partitionColumnNames.stream()
- .map(s -> columnTypes.get(columnNames.indexOf(s)))
- .toArray(LogicalType[]::new);
- RowType partitionType =
- RowType.of(partitionLogicalTypes, partitionColumnNames.toArray(new String[0]));
-
- Optional<List<String>> optionalPrimaryKeyNames = wrapper.getPrimaryKeyNames();
- if (optionalPrimaryKeyNames.isPresent()) {
- Function<String, RowType.RowField> rowFieldMapper =
- s -> {
- int idx = columnNames.indexOf(s);
- Preconditions.checkState(
- idx >= 0,
- "Primary key column "
- + s
- + " not found in table "
- + dbName
- + "."
- + tableName);
- return new RowType.RowField(s, columnTypes.get(idx));
- };
- RowType primaryKeyType =
- new RowType(
- optionalPrimaryKeyNames.get().stream()
- .map(rowFieldMapper)
- .collect(Collectors.toList()));
- store =
- FileStoreImpl.createWithPrimaryKey(
- tableLocation,
- 0, // TODO
- new FileStoreOptions(options),
- user,
- partitionType,
- primaryKeyType,
- rowType,
- options.get(FileStoreOptions.MERGE_ENGINE));
- valueCountMode = false;
- } else {
- store =
- FileStoreImpl.createWithValueCount(
- tableLocation,
- 0, // TODO
- new FileStoreOptions(options),
- user,
- partitionType,
- rowType);
- valueCountMode = true;
- }
- }
-
- private void createPredicate(JobConf jobConf) {
- String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
- if (hiveFilter == null) {
- return;
- }
+ private FileStoreTable createFileStoreTable(JobConf jobConf) {
+ TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
+ Configuration conf = new Configuration();
+ wrapper.updateFileStoreOptions(conf);
+ return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
+ }
- ExprNodeGenericFuncDesc exprNodeDesc =
- SerializationUtilities.deserializeObject(
- hiveFilter, ExprNodeGenericFuncDesc.class);
- SearchArgument sarg = ConvertAstToSearchArg.create(jobConf, exprNodeDesc);
- SearchArgumentToPredicateConverter converter =
- new SearchArgumentToPredicateConverter(sarg, columnNames, columnTypes);
- predicate = converter.convert().orElse(null);
+ private Optional<Predicate> createPredicate(JobConf jobConf) {
+ String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (hiveFilter == null) {
+ return Optional.empty();
}
- private FileStoreScanImpl newScan() {
- FileStoreScanImpl scan = store.newScan();
- if (predicate != null) {
- if (valueCountMode) {
- scan.withKeyFilter(predicate);
- } else {
- scan.withValueFilter(predicate);
- }
- }
- return scan;
- }
+ TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
+ ExprNodeGenericFuncDesc exprNodeDesc =
+ SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
+ SearchArgument sarg = ConvertAstToSearchArg.create(jobConf, exprNodeDesc);
+ SearchArgumentToPredicateConverter converter =
+ new SearchArgumentToPredicateConverter(
+ sarg, wrapper.getColumnNames(), wrapper.getColumnTypes());
+ return converter.convert();
}
}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index 924167b8..4bbef384 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.SerializationUtils;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -48,7 +49,6 @@ public class TableStoreInputSplit extends FileSplit {
private BinaryRowData partition;
private int bucket;
private List<DataFileMeta> files;
-
private String bucketPath;
// public no-argument constructor for deserialization
@@ -59,10 +59,14 @@ public class TableStoreInputSplit extends FileSplit {
this.partition = partition;
this.bucket = bucket;
this.files = files;
-
this.bucketPath = bucketPath;
}
+ public static TableStoreInputSplit create(Split split) {
+ return new TableStoreInputSplit(
+ split.partition(), split.bucket(), split.files(), split.bucketPath().toString());
+ }
+
public BinaryRowData partition() {
return partition;
}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
index c5dbb2cb..bf1f493c 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
@@ -21,14 +21,11 @@ package org.apache.flink.table.store.mapred;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.utils.PrimaryKeyRowDataSupplier;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.ValueCountRowDataSupplier;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
-import java.util.function.Supplier;
/**
* Base {@link RecordReader} for table store. Reads {@link KeyValue}s from data files and picks out
@@ -36,28 +33,22 @@ import java.util.function.Supplier;
*/
public class TableStoreRecordReader implements RecordReader<Void, RowDataContainer> {
- private final RecordReaderIterator<KeyValue> iterator;
- private final Supplier<RowData> rowDataSupplier;
+ private final RecordReaderIterator<RowData> iterator;
private final long splitLength;
private float progress;
public TableStoreRecordReader(
- org.apache.flink.table.store.file.utils.RecordReader<KeyValue> wrapped,
- boolean valueCountMode,
+ org.apache.flink.table.store.file.utils.RecordReader<RowData> wrapped,
long splitLength) {
this.iterator = new RecordReaderIterator<>(wrapped);
- this.rowDataSupplier =
- valueCountMode
- ? new ValueCountRowDataSupplier(iterator::next)
- : new PrimaryKeyRowDataSupplier(iterator::next);
this.splitLength = splitLength;
this.progress = 0;
}
@Override
public boolean next(Void key, RowDataContainer value) throws IOException {
- RowData rowData = rowDataSupplier.get();
+ RowData rowData = iterator.next();
if (rowData == null) {
progress = 1;
return false;
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
index c0b83c7c..13b007fc 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
@@ -20,23 +20,25 @@ package org.apache.flink.table.store;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
-import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
-import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
-import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
@@ -48,12 +50,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.stream.Collectors;
/** Helper class to write and read {@link RowData} with {@link FileStoreImpl}. */
public class FileStoreTestHelper {
- private final FileStoreImpl store;
+ private final FileStoreTable table;
+ private final FileStore store;
private final BiFunction<RowData, RowData, BinaryRowData> partitionCalculator;
private final Function<RowData, Integer> bucketCalculator;
private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
@@ -61,24 +63,18 @@ public class FileStoreTestHelper {
public FileStoreTestHelper(
Configuration conf,
- RowType partitionType,
- RowType keyType,
- RowType valueType,
- MergeFunction mergeFunction,
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
BiFunction<RowData, RowData, BinaryRowData> partitionCalculator,
- Function<RowData, Integer> bucketCalculator) {
- FileStoreOptions options = new FileStoreOptions(conf);
- this.store =
- new FileStoreImpl(
- options.path().toString(),
- 0,
- options,
- WriteMode.CHANGE_LOG,
- UUID.randomUUID().toString(),
- partitionType,
- keyType,
- valueType,
- mergeFunction);
+ Function<RowData, Integer> bucketCalculator)
+ throws Exception {
+ Path tablePath = FileStoreOptions.path(conf);
+ new SchemaManager(tablePath)
+ .commitNewVersion(
+ new UpdateSchema(rowType, partitionKeys, primaryKeys, new HashMap<>(), ""));
+ this.table = FileStoreTableFactory.create(conf, "user");
+ this.store = table.fileStore();
this.partitionCalculator = partitionCalculator;
this.bucketCalculator = bucketCalculator;
this.writers = new HashMap<>();
@@ -94,7 +90,7 @@ public class FileStoreTestHelper {
bucket,
(b, w) -> {
if (w == null) {
- FileStoreWriteImpl write = store.newWrite();
+ FileStoreWrite write = store.newWrite();
return write.createWriter(
partition, bucket, compactExecutor);
} else {
@@ -119,21 +115,20 @@ public class FileStoreTestHelper {
}
}
writers.clear();
- FileStoreCommitImpl commit = store.newCommit();
+ FileStoreCommit commit = store.newCommit();
commit.commit(committable, Collections.emptyMap());
}
- public Tuple2<RecordReader<KeyValue>, Long> read(BinaryRowData partition, int bucket)
+ public Tuple2<RecordReader<RowData>, Long> read(BinaryRowData partition, int bucket)
throws Exception {
- FileStoreScanImpl scan = store.newScan();
- scan.withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket);
- List<ManifestEntry> files = scan.plan().files();
- FileStoreReadImpl read = store.newRead();
- RecordReader<KeyValue> wrapped =
- read.createReader(
- partition,
- bucket,
- files.stream().map(ManifestEntry::file).collect(Collectors.toList()));
- return Tuple2.of(wrapped, files.stream().mapToLong(e -> e.file().fileSize()).sum());
+ for (Split split : table.newScan(false).plan().splits) {
+ if (split.partition().equals(partition) && split.bucket() == bucket) {
+ return Tuple2.of(
+ table.newRead(false).createReader(partition, bucket, split.files()),
+ split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
+ }
+ }
+ throw new IllegalArgumentException(
+ "Input split not found for partition " + partition + " and bucket " + bucket);
}
}
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 5f33a37e..2ff8d7a3 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -28,8 +28,6 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.FileStoreTestHelper;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -90,13 +88,6 @@ public class TableStoreHiveStorageHandlerITCase {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
- RowType.of(
- new LogicalType[] {
- DataTypes.INT().getLogicalType(),
- DataTypes.BIGINT().getLogicalType()
- },
- new String[] {"_KEY_a", "_KEY_b"}),
RowType.of(
new LogicalType[] {
DataTypes.INT().getLogicalType(),
@@ -104,7 +95,8 @@ public class TableStoreHiveStorageHandlerITCase {
DataTypes.STRING().getLogicalType()
},
new String[] {"a", "b", "c"}),
- new DeduplicateMergeFunction(),
+ Collections.emptyList(),
+ Arrays.asList("a", "b"),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> k.getInt(0) % 2);
@@ -166,7 +158,6 @@ public class TableStoreHiveStorageHandlerITCase {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
RowType.of(
new LogicalType[] {
DataTypes.INT().getLogicalType(),
@@ -174,10 +165,8 @@ public class TableStoreHiveStorageHandlerITCase {
DataTypes.STRING().getLogicalType()
},
new String[] {"a", "b", "c"}),
- RowType.of(
- new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
- new String[] {"_VALUE_COUNT"}),
- new ValueCountMergeFunction(),
+ Collections.emptyList(),
+ Collections.emptyList(),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> k.getInt(0) % 2);
@@ -238,10 +227,6 @@ public class TableStoreHiveStorageHandlerITCase {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
- RowType.of(
- new LogicalType[] {DataTypes.INT().getLogicalType()},
- new String[] {"_KEY_f_int"}),
RowType.of(
RandomGenericRowDataGenerator.TYPE_INFOS.stream()
.map(
@@ -250,7 +235,8 @@ public class TableStoreHiveStorageHandlerITCase {
.getLogicalType())
.toArray(LogicalType[]::new),
RandomGenericRowDataGenerator.FIELD_NAMES.toArray(new String[0])),
- new DeduplicateMergeFunction(),
+ Collections.emptyList(),
+ Collections.singletonList("f_int"),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> 0);
@@ -382,14 +368,11 @@ public class TableStoreHiveStorageHandlerITCase {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
RowType.of(
new LogicalType[] {DataTypes.INT().getLogicalType()},
new String[] {"a"}),
- RowType.of(
- new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
- new String[] {"_VALUE_COUNT"}),
- new ValueCountMergeFunction(),
+ Collections.emptyList(),
+ Collections.emptyList(),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> 0);
@@ -474,17 +457,14 @@ public class TableStoreHiveStorageHandlerITCase {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
RowType.of(
new LogicalType[] {
DataTypes.DATE().getLogicalType(),
DataTypes.TIMESTAMP(3).getLogicalType()
},
new String[] {"dt", "ts"}),
- RowType.of(
- new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
- new String[] {"_VALUE_COUNT"}),
- new ValueCountMergeFunction(),
+ Collections.emptyList(),
+ Collections.emptyList(),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> 0);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 4fc6fa53..0edb2548 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -28,10 +28,7 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.FileStoreTestHelper;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +36,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -59,17 +57,14 @@ public class TableStoreRecordReaderTest {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
- RowType.of(
- new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
- new String[] {"_KEY_a"}),
RowType.of(
new LogicalType[] {
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
new String[] {"a", "b"}),
- new DeduplicateMergeFunction(),
+ Collections.emptyList(),
+ Collections.singletonList("a"),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> 0);
@@ -95,8 +90,8 @@ public class TableStoreRecordReaderTest {
GenericRowData.of(2L, StringData.fromString("Hello")));
helper.commit();
- Tuple2<RecordReader<KeyValue>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
- TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, false, tuple.f1);
+ Tuple2<RecordReader<RowData>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+ TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
RowDataContainer container = reader.createValue();
Set<String> actual = new HashSet<>();
while (reader.next(null, container)) {
@@ -119,17 +114,14 @@ public class TableStoreRecordReaderTest {
FileStoreTestHelper helper =
new FileStoreTestHelper(
conf,
- RowType.of(),
RowType.of(
new LogicalType[] {
DataTypes.INT().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
- new String[] {"_KEY_a", "_KEY_b"}),
- RowType.of(
- new LogicalType[] {DataTypes.BIGINT().getLogicalType()},
- new String[] {"_VALUE_COUNT"}),
- new ValueCountMergeFunction(),
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Collections.emptyList(),
(k, v) -> BinaryRowDataUtil.EMPTY_ROW,
k -> 0);
@@ -159,8 +151,8 @@ public class TableStoreRecordReaderTest {
GenericRowData.of(1L));
helper.commit();
- Tuple2<RecordReader<KeyValue>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
- TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, true, tuple.f1);
+ Tuple2<RecordReader<RowData>, Long> tuple = helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+ TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
RowDataContainer container = reader.createValue();
Map<String, Integer> actual = new HashMap<>();
while (reader.next(null, container)) {