You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/01 11:28:47 UTC
[flink-table-store] branch master updated: [FLINK-26031] Support projection pushdown on keys and values
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 6b7efeb [FLINK-26031] Support projection pushdown on keys and values
6b7efeb is described below
commit 6b7efeb22f0501c6e3859059684b219d69e7d4e6
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 1 19:28:43 2022 +0800
[FLINK-26031] Support projection pushdown on keys and values
This closes #18
---
.../apache/flink/table/store/file/FileFormat.java | 17 +-
.../flink/table/store/file/FileFormatImpl.java | 23 ++-
.../apache/flink/table/store/file/KeyValue.java | 27 +++
.../store/file/mergetree/sst/SstFileReader.java | 49 ++++-
.../table/store/file/operation/FileStoreRead.java | 13 +-
.../store/file/operation/FileStoreReadImpl.java | 38 +++-
.../store/file/mergetree/sst/SstFileTest.java | 211 ++++++++++++-------
.../store/file/operation/FileStoreReadTest.java | 230 +++++++++++++++++++++
.../table/store/file/utils/FlushingAvroFormat.java | 4 +-
9 files changed, 515 insertions(+), 97 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
index bec0775..4309284 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
@@ -37,18 +37,29 @@ import java.util.List;
public interface FileFormat {
/**
- * Create a {@link BulkFormat} from the type.
+ * Create a {@link BulkFormat} from the type, with projection pushed down.
*
+ * @param type Type without projection.
+ * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
* @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
*/
BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, List<ResolvedExpression> filters);
+ RowType type, int[][] projection, List<ResolvedExpression> filters);
/** Create a {@link BulkWriter.Factory} from the type. */
BulkWriter.Factory<RowData> createWriterFactory(RowType type);
default BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
- return createReaderFactory(rowType, new ArrayList<>());
+ int[][] projection = new int[rowType.getFieldCount()][];
+ for (int i = 0; i < projection.length; i++) {
+ projection[i] = new int[] {i};
+ }
+ return createReaderFactory(rowType, projection);
+ }
+
+ default BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType rowType, int[][] projection) {
+ return createReaderFactory(rowType, projection, new ArrayList<>());
}
/** Create a {@link FileFormatImpl} from format identifier and format options. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
index 2008ee2..cdcdd84 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
@@ -35,6 +36,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import java.util.List;
@@ -54,15 +56,24 @@ public class FileFormatImpl implements FileFormat {
this.formatOptions = formatOptions;
}
+ @SuppressWarnings("unchecked")
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType rowType, List<ResolvedExpression> filters) {
- BulkDecodingFormat<RowData> decodingFormat =
- FactoryUtil.discoverFactory(
- classLoader, BulkReaderFormatFactory.class, formatIdentifier)
- .createDecodingFormat(null, formatOptions); // context is useless
+ RowType rowType, int[][] projection, List<ResolvedExpression> filters) {
+ BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
+ // TODO use ProjectingBulkFormat if not supported
+ Preconditions.checkState(
+ decodingFormat instanceof ProjectableDecodingFormat,
+ "Format " + formatIdentifier + " does not support projection push down");
decodingFormat.applyFilters(filters);
- return decodingFormat.createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(rowType));
+ return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
+ .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(rowType), projection);
+ }
+
+ private BulkDecodingFormat<RowData> getDecodingFormat() {
+ return FactoryUtil.discoverFactory(
+ classLoader, BulkReaderFormatFactory.class, formatIdentifier)
+ .createDecodingFormat(null, formatOptions); // context is useless
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 8461d64..a470376 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -78,6 +78,33 @@ public class KeyValue {
return new RowType(fields);
}
+ public static int[][] project(
+ int[][] keyProjection, int[][] valueProjection, int numKeyFields) {
+ int[][] projection = new int[keyProjection.length + 2 + valueProjection.length][];
+
+ // key
+ for (int i = 0; i < keyProjection.length; i++) {
+ projection[i] = new int[keyProjection[i].length];
+ System.arraycopy(keyProjection[i], 0, projection[i], 0, keyProjection[i].length);
+ }
+
+ // seq
+ projection[keyProjection.length] = new int[] {numKeyFields};
+
+ // value kind
+ projection[keyProjection.length + 1] = new int[] {numKeyFields + 1};
+
+ // value
+ for (int i = 0; i < valueProjection.length; i++) {
+ int idx = keyProjection.length + 2 + i;
+ projection[idx] = new int[valueProjection[i].length];
+ System.arraycopy(valueProjection[i], 0, projection[idx], 0, valueProjection[i].length);
+ projection[idx][0] += numKeyFields + 2;
+ }
+
+ return projection;
+ }
+
@VisibleForTesting
public KeyValue copy(RowDataSerializer keySerializer, RowDataSerializer valueSerializer) {
return new KeyValue()
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
index fa54477..f9ce7f3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
@@ -22,6 +22,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileFormat;
@@ -36,7 +37,12 @@ import javax.annotation.Nullable;
import java.io.IOException;
-/** Reads {@link KeyValue}s from sst files. */
+/**
+ * Reads {@link KeyValue}s from sst files.
+ *
+ * <p>NOTE: Sst files store records ordered by keys without projections. If key projections are
+ * applied the produced iterator is no longer ordered.
+ */
public class SstFileReader {
private final RowType keyType;
@@ -112,8 +118,14 @@ public class SstFileReader {
private final RowType keyType;
private final RowType valueType;
+ private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
- private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+
+ private int[][] keyProjection;
+ private int[][] valueProjection;
+ private RowType projectedKeyType;
+ private RowType projectedValueType;
+ private BulkFormat<RowData, FileSourceSplit> readerFactory;
public Factory(
RowType keyType,
@@ -122,17 +134,42 @@ public class SstFileReader {
FileStorePathFactory pathFactory) {
this.keyType = keyType;
this.valueType = valueType;
+ this.fileFormat = fileFormat;
this.pathFactory = pathFactory;
- RowType recordType = KeyValue.schema(keyType, valueType);
- this.readerFactory = fileFormat.createReaderFactory(recordType);
+
+ this.keyProjection = Projection.range(0, keyType.getFieldCount()).toNestedIndexes();
+ this.valueProjection = Projection.range(0, valueType.getFieldCount()).toNestedIndexes();
+ applyProjection();
+ }
+
+ public Factory withKeyProjection(int[][] projection) {
+ keyProjection = projection;
+ applyProjection();
+ return this;
+ }
+
+ public Factory withValueProjection(int[][] projection) {
+ valueProjection = projection;
+ applyProjection();
+ return this;
}
public SstFileReader create(BinaryRowData partition, int bucket) {
return new SstFileReader(
- keyType,
- valueType,
+ projectedKeyType,
+ projectedValueType,
readerFactory,
pathFactory.createSstPathFactory(partition, bucket));
}
+
+ private void applyProjection() {
+ projectedKeyType = (RowType) Projection.of(keyProjection).project(keyType);
+ projectedValueType = (RowType) Projection.of(valueProjection).project(valueType);
+
+ RowType recordType = KeyValue.schema(keyType, valueType);
+ int[][] projection =
+ KeyValue.project(keyProjection, valueProjection, keyType.getFieldCount());
+ readerFactory = fileFormat.createReaderFactory(recordType, projection);
+ }
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index 33a6233..cbf95e4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -34,7 +34,18 @@ public interface FileStoreRead {
/** With value nested projection. */
void withValueProjection(int[][] projectedFields);
- /** Create a {@link RecordReader} from partition and bucket and files. */
+ /**
+ * Create a {@link RecordReader} from partition and bucket and files.
+ *
+ * <p>The resulting reader has the following characteristics:
+ *
+ * <ul>
+ * <li>If {@link FileStoreRead#withKeyProjection} is called, key-values produced by this
+ * reader may be unordered and may contain duplicated keys.
+ * <li>If {@link FileStoreRead#withKeyProjection} is not called, key-values produced by this
+ * reader is guaranteed to be ordered by keys and does not contain duplicated keys.
+ * </ul>
+ */
RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
throws IOException;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index 6c5383b..0df5aa7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
@@ -31,6 +32,7 @@ import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -41,6 +43,8 @@ public class FileStoreReadImpl implements FileStoreRead {
private final Comparator<RowData> keyComparator;
private final Accumulator accumulator;
+ private boolean keyProjected;
+
public FileStoreReadImpl(
RowType keyType,
RowType valueType,
@@ -52,28 +56,42 @@ public class FileStoreReadImpl implements FileStoreRead {
new SstFileReader.Factory(keyType, valueType, fileFormat, pathFactory);
this.keyComparator = keyComparator;
this.accumulator = accumulator;
+
+ this.keyProjected = false;
}
@Override
public void withKeyProjection(int[][] projectedFields) {
- // TODO
- throw new UnsupportedOperationException();
+ sstFileReaderFactory.withKeyProjection(projectedFields);
+ keyProjected = true;
}
@Override
public void withValueProjection(int[][] projectedFields) {
- // TODO
- throw new UnsupportedOperationException();
+ sstFileReaderFactory.withValueProjection(projectedFields);
}
@Override
public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
throws IOException {
- return new MergeTreeReader(
- new IntervalPartition(files, keyComparator).partition(),
- true,
- sstFileReaderFactory.create(partition, bucket),
- keyComparator,
- accumulator.copy());
+ SstFileReader sstFileReader = sstFileReaderFactory.create(partition, bucket);
+ if (keyProjected) {
+ // key projection has been applied, so sst readers will not return key-values in order,
+ // we have to return the raw file contents without merging
+ List<ConcatRecordReader.ReaderSupplier> suppliers = new ArrayList<>();
+ for (SstFileMeta file : files) {
+ suppliers.add(() -> sstFileReader.read(file.fileName()));
+ }
+ return ConcatRecordReader.create(suppliers);
+ } else {
+ // key projection is not applied, so sst readers will return key-values in order,
+ // in this case merge tree can merge records with same key for us
+ return new MergeTreeReader(
+ new IntervalPartition(files, keyComparator).partition(),
+ true,
+ sstFileReader,
+ keyComparator,
+ accumulator.copy());
+ }
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index b83efb4..e5bde46 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -18,16 +18,12 @@
package org.apache.flink.table.store.file.mergetree.sst;
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializerTest;
@@ -35,17 +31,24 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
@@ -71,29 +74,15 @@ public class SstFileTest {
checkRollingFiles(data.meta, actualMetas, writer.suggestedFileSize());
- SstFileReader reader = createSstFileReader(tempDir.toString());
- Iterator<KeyValue> expectedIterator = data.content.iterator();
- for (SstFileMeta meta : actualMetas) {
- // check the contents of sst file
- CloseableIterator<KeyValue> actualKvsIterator =
- new RecordReaderIterator(reader.read(meta.fileName()));
- while (actualKvsIterator.hasNext()) {
- assertThat(expectedIterator.hasNext()).isTrue();
- KeyValue actualKv = actualKvsIterator.next();
- assertThat(
- KeyValueSerializerTest.equals(
- expectedIterator.next(),
- actualKv,
- TestKeyValueGenerator.KEY_SERIALIZER,
- TestKeyValueGenerator.ROW_SERIALIZER))
- .isTrue();
- }
- actualKvsIterator.close();
-
- // check that each sst file meta is serializable
- assertThat(serializer.fromRow(serializer.toRow(meta))).isEqualTo(meta);
- }
- assertThat(expectedIterator.hasNext()).isFalse();
+ SstFileReader reader = createSstFileReader(tempDir.toString(), null, null);
+ assertData(
+ data,
+ actualMetas,
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ TestKeyValueGenerator.ROW_SERIALIZER,
+ serializer,
+ reader,
+ kv -> kv);
}
@RepeatedTest(10)
@@ -118,6 +107,88 @@ public class SstFileTest {
}
}
+ @Test
+ public void testKeyProjection() throws Exception {
+ SstTestDataGenerator.Data data = gen.next();
+ SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+ SstFileMetaSerializer serializer =
+ new SstFileMetaSerializer(
+ TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+ List<SstFileMeta> actualMetas =
+ sstFileWriter.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+ // projection: (shopId, orderId) -> (orderId)
+ SstFileReader sstFileReader =
+ createSstFileReader(tempDir.toString(), new int[][] {new int[] {1}}, null);
+ RowType projectedKeyType =
+ RowType.of(new LogicalType[] {new BigIntType(false)}, new String[] {"key_orderId"});
+ RowDataSerializer projectedKeySerializer = new RowDataSerializer(projectedKeyType);
+ assertData(
+ data,
+ actualMetas,
+ projectedKeySerializer,
+ TestKeyValueGenerator.ROW_SERIALIZER,
+ serializer,
+ sstFileReader,
+ kv ->
+ new KeyValue()
+ .replace(
+ GenericRowData.of(kv.key().getLong(1)),
+ kv.sequenceNumber(),
+ kv.valueKind(),
+ kv.value()));
+ }
+
+ @Test
+ public void testValueProjection() throws Exception {
+ SstTestDataGenerator.Data data = gen.next();
+ SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+ SstFileMetaSerializer serializer =
+ new SstFileMetaSerializer(
+ TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+ List<SstFileMeta> actualMetas =
+ sstFileWriter.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+ // projection:
+ // (dt, hr, shopId, orderId, itemId, priceAmount, comment) ->
+ // (shopId, itemId, dt, hr)
+ SstFileReader sstFileReader =
+ createSstFileReader(
+ tempDir.toString(),
+ null,
+ new int[][] {new int[] {2}, new int[] {4}, new int[] {0}, new int[] {1}});
+ RowType projectedValueType =
+ RowType.of(
+ new LogicalType[] {
+ new IntType(false),
+ new BigIntType(),
+ new VarCharType(false, 8),
+ new IntType(false)
+ },
+ new String[] {"shopId", "itemId", "dt", "hr"});
+ RowDataSerializer projectedValueSerializer = new RowDataSerializer(projectedValueType);
+ assertData(
+ data,
+ actualMetas,
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ projectedValueSerializer,
+ serializer,
+ sstFileReader,
+ kv ->
+ new KeyValue()
+ .replace(
+ kv.key(),
+ kv.sequenceNumber(),
+ kv.valueKind(),
+ GenericRowData.of(
+ kv.value().getInt(2),
+ kv.value().isNullAt(4)
+ ? null
+ : kv.value().getLong(4),
+ kv.value().getString(0),
+ kv.value().getInt(1))));
+ }
+
private SstFileWriter createSstFileWriter(String path) {
FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
@@ -133,7 +204,8 @@ public class SstFileTest {
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
}
- private SstFileReader createSstFileReader(String path) {
+ private SstFileReader createSstFileReader(
+ String path, int[][] keyProjection, int[][] valueProjection) {
FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
SstFileReader.Factory factory =
new SstFileReader.Factory(
@@ -141,9 +213,49 @@ public class SstFileTest {
TestKeyValueGenerator.ROW_TYPE,
flushingAvro,
pathFactory);
+ if (keyProjection != null) {
+ factory.withKeyProjection(keyProjection);
+ }
+ if (valueProjection != null) {
+ factory.withValueProjection(valueProjection);
+ }
return factory.create(BinaryRowDataUtil.EMPTY_ROW, 0);
}
+ private void assertData(
+ SstTestDataGenerator.Data data,
+ List<SstFileMeta> actualMetas,
+ RowDataSerializer keySerializer,
+ RowDataSerializer projectedValueSerializer,
+ SstFileMetaSerializer sstFileMetaSerializer,
+ SstFileReader sstFileReader,
+ Function<KeyValue, KeyValue> toExpectedKv)
+ throws Exception {
+ Iterator<KeyValue> expectedIterator = data.content.iterator();
+ for (SstFileMeta meta : actualMetas) {
+ // check the contents of sst file
+ CloseableIterator<KeyValue> actualKvsIterator =
+ new RecordReaderIterator(sstFileReader.read(meta.fileName()));
+ while (actualKvsIterator.hasNext()) {
+ assertThat(expectedIterator.hasNext()).isTrue();
+ KeyValue actualKv = actualKvsIterator.next();
+ assertThat(
+ KeyValueSerializerTest.equals(
+ toExpectedKv.apply(expectedIterator.next()),
+ actualKv,
+ keySerializer,
+ projectedValueSerializer))
+ .isTrue();
+ }
+ actualKvsIterator.close();
+
+ // check that each sst file meta is serializable
+ assertThat(sstFileMetaSerializer.fromRow(sstFileMetaSerializer.toRow(meta)))
+ .isEqualTo(meta);
+ }
+ assertThat(expectedIterator.hasNext()).isFalse();
+ }
+
private void checkRollingFiles(
SstFileMeta expected, List<SstFileMeta> actual, long suggestedFileSize) {
// all but last file should be no smaller than suggestedFileSize
@@ -216,43 +328,4 @@ public class SstFileTest {
assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
.isEqualTo(expected.nullCount());
}
-
- /** A special avro {@link FileFormat} which flushes for every added element. */
- public static class FlushingAvroFormat implements FileFormat {
-
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- SstFileTest.class.getClassLoader(), "avro", new Configuration());
-
- @Override
- public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, List<ResolvedExpression> filters) {
- return avro.createReaderFactory(type, filters);
- }
-
- @Override
- public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- return fsDataOutputStream -> {
- BulkWriter<RowData> wrapped =
- avro.createWriterFactory(type).create(fsDataOutputStream);
- return new BulkWriter<RowData>() {
- @Override
- public void addElement(RowData rowData) throws IOException {
- wrapped.addElement(rowData);
- wrapped.flush();
- }
-
- @Override
- public void flush() throws IOException {
- wrapped.flush();
- }
-
- @Override
- public void finish() throws IOException {
- wrapped.finish();
- }
- };
- };
- }
- }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
new file mode 100644
index 0000000..406a214
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestFileStore;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreReadImpl}. */
+public class FileStoreReadTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeEach
+ public void beforeEach() throws IOException {
+ Path root = new Path(tempDir.toString());
+ root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+ }
+
+ @Test
+ public void testKeyProjection() throws Exception {
+ // (a, b, c) -> (b, a), c is the partition, all integers are in range [0, 2]
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numRecords = random.nextInt(1000) + 1;
+ List<KeyValue> data = new ArrayList<>();
+ Map<Integer, Long> expected = new HashMap<>();
+ for (int i = 0; i < numRecords; i++) {
+ int a = random.nextInt(3);
+ int b = random.nextInt(3);
+ int c = random.nextInt(3);
+ long delta = random.nextLong(21) - 10;
+ expected.compute(b * 10 + a, (k, v) -> v == null ? delta : v + delta);
+ data.add(
+ new KeyValue()
+ .replace(
+ GenericRowData.of(a, b, c),
+ i,
+ ValueKind.ADD,
+ GenericRowData.of(delta)));
+ }
+
+ RowType partitionType =
+ RowType.of(new LogicalType[] {new IntType(false)}, new String[] {"c"});
+ RowDataSerializer partitionSerializer = new RowDataSerializer(partitionType);
+ RowType keyType =
+ RowType.of(
+ new LogicalType[] {
+ new IntType(false), new IntType(false), new IntType(false)
+ },
+ new String[] {"a", "b", "c"});
+ RowType projectedKeyType = RowType.of(new IntType(false), new IntType(false));
+ RowDataSerializer projectedKeySerializer = new RowDataSerializer(projectedKeyType);
+ RowType valueType =
+ RowType.of(new LogicalType[] {new BigIntType(false)}, new String[] {"count"});
+ RowDataSerializer valueSerializer = new RowDataSerializer(valueType);
+
+ TestFileStore store =
+ createStore(partitionType, keyType, valueType, new ValueCountAccumulator());
+ List<KeyValue> readData =
+ writeThenRead(
+ data,
+ new int[][] {new int[] {1}, new int[] {0}},
+ null,
+ projectedKeySerializer,
+ valueSerializer,
+ store,
+ kv ->
+ partitionSerializer
+ .toBinaryRow(GenericRowData.of(kv.key().getInt(2)))
+ .copy());
+ Map<Integer, Long> actual = new HashMap<>();
+ for (KeyValue kv : readData) {
+ assertThat(kv.key().getArity()).isEqualTo(2);
+ int key = kv.key().getInt(0) * 10 + kv.key().getInt(1);
+ long delta = kv.value().getLong(0);
+ actual.compute(key, (k, v) -> v == null ? delta : v + delta);
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testValueProjection() throws Exception {
+ // (dt, hr, shopId, orderId, itemId, priceAmount, comment) -> (shopId, itemId, dt, hr)
+
+ TestKeyValueGenerator gen = new TestKeyValueGenerator();
+ int numRecords = ThreadLocalRandom.current().nextInt(1000) + 1;
+ List<KeyValue> data = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ data.add(gen.next());
+ }
+ TestFileStore store =
+ createStore(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ new DeduplicateAccumulator());
+
+ RowDataSerializer projectedValueSerializer =
+ new RowDataSerializer(
+ new IntType(false),
+ new BigIntType(),
+ new VarCharType(false, 8),
+ new IntType(false));
+ Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(data);
+ expected.replaceAll(
+ (k, v) ->
+ projectedValueSerializer
+ .toBinaryRow(
+ GenericRowData.of(
+ v.getInt(2),
+ v.isNullAt(4) ? null : v.getLong(4),
+ v.getString(0),
+ v.getInt(1)))
+ .copy());
+
+ List<KeyValue> readData =
+ writeThenRead(
+ data,
+ null,
+ new int[][] {new int[] {2}, new int[] {4}, new int[] {0}, new int[] {1}},
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ projectedValueSerializer,
+ store,
+ gen::getPartition);
+ for (KeyValue kv : readData) {
+ assertThat(kv.value().getArity()).isEqualTo(4);
+ BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key());
+ BinaryRowData value = projectedValueSerializer.toBinaryRow(kv.value());
+ assertThat(expected).containsKey(key);
+ assertThat(value).isEqualTo(expected.get(key));
+ }
+ }
+
+ private List<KeyValue> writeThenRead(
+ List<KeyValue> data,
+ int[][] keyProjection,
+ int[][] valueProjection,
+ RowDataSerializer projectedKeySerializer,
+ RowDataSerializer projectedValueSerializer,
+ TestFileStore store,
+ Function<KeyValue, BinaryRowData> partitionCalculator)
+ throws Exception {
+ store.commitData(data, partitionCalculator, kv -> 0);
+ FileStoreScan scan = store.newScan();
+ Map<BinaryRowData, List<ManifestEntry>> filesGroupedByPartition =
+ scan.withSnapshot(store.pathFactory().latestSnapshotId()).plan().files().stream()
+ .collect(Collectors.groupingBy(ManifestEntry::partition));
+ FileStoreRead read = store.newRead();
+ if (keyProjection != null) {
+ read.withKeyProjection(keyProjection);
+ }
+ if (valueProjection != null) {
+ read.withValueProjection(valueProjection);
+ }
+
+ List<KeyValue> result = new ArrayList<>();
+ for (Map.Entry<BinaryRowData, List<ManifestEntry>> entry :
+ filesGroupedByPartition.entrySet()) {
+ RecordReader reader =
+ read.createReader(
+ entry.getKey(),
+ 0,
+ entry.getValue().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList()));
+ RecordReaderIterator actualIterator = new RecordReaderIterator(reader);
+ while (actualIterator.hasNext()) {
+ result.add(
+ actualIterator
+ .next()
+ .copy(projectedKeySerializer, projectedValueSerializer));
+ }
+ }
+ return result;
+ }
+
+ private TestFileStore createStore(
+ RowType partitionType, RowType keyType, RowType valueType, Accumulator accumulator) {
+ return TestFileStore.create(
+ "avro", tempDir.toString(), 1, partitionType, keyType, valueType, accumulator);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
index 5279b37..a6ff06b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
@@ -40,8 +40,8 @@ public class FlushingAvroFormat implements FileFormat {
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
- RowType type, List<ResolvedExpression> filters) {
- return avro.createReaderFactory(type, filters);
+ RowType type, int[][] projection, List<ResolvedExpression> filters) {
+ return avro.createReaderFactory(type, projection, filters);
}
@Override