You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/23 12:34:30 UTC
[flink-table-store] branch master updated: [FLINK-26791] Finish projection push down for table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 8f223c2 [FLINK-26791] Finish projection push down for table store
8f223c2 is described below
commit 8f223c20ddd2d9293a3c64271a8451c1db265635
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Mar 23 20:34:26 2022 +0800
[FLINK-26791] Finish projection push down for table store
This closes #57
---
.../flink/table/store/connector/TableStore.java | 12 +++-
.../store/connector/source/FileStoreSource.java | 13 +++--
.../connector/source/FileStoreSourceReader.java | 9 ++-
.../source/FileStoreSourceSplitReader.java | 27 ++++++++-
.../store/connector/source/TableStoreSource.java | 3 +-
.../table/store/connector/FileStoreITCase.java | 57 ++++++++++++++++++-
.../store/connector/sink/LogStoreSinkITCase.java | 2 +-
.../source/FileStoreSourceReaderTest.java | 2 +-
.../table/store/log/LogStoreTableFactory.java | 5 +-
...ema.java => KafkaLogDeserializationSchema.java} | 66 ++++++++++++++++++----
.../table/store/kafka/KafkaLogSourceProvider.java | 16 ++++--
.../table/store/kafka/KafkaLogStoreFactory.java | 7 ++-
.../flink/table/store/kafka/KafkaLogITCase.java | 57 +++++++++++++------
.../store/kafka/KafkaLogSerializationTest.java | 4 +-
14 files changed, 230 insertions(+), 50 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a21ebfe..051fd0e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
@@ -52,6 +53,7 @@ import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
@@ -60,6 +62,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -286,8 +289,6 @@ public class TableStore {
return buildFileSource(true);
}
- // TODO project log source
-
if (isHybrid) {
return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
buildFileSource(false))
@@ -304,11 +305,16 @@ public class TableStore {
}
public DataStreamSource<RowData> build(StreamExecutionEnvironment env) {
+ LogicalType produceType =
+ Optional.ofNullable(projectedFields)
+ .map(Projection::of)
+ .map(p -> p.project(type))
+ .orElse(type);
return env.fromSource(
build(),
WatermarkStrategy.noWatermarks(),
tableIdentifier.asSummaryString(),
- InternalTypeInfo.of(type));
+ InternalTypeInfo.of(produceType));
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index ba38ae3..93faca2 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -93,17 +93,22 @@ public class FileStoreSource
read.withDropDelete(false);
}
+ int[][] valueCountModeProjects = null;
if (projectedFields != null) {
if (valueCountMode) {
- // TODO when isContinuous is false, don't project keys, and add key projection to
- // split reader
- read.withKeyProjection(projectedFields);
+ // push projection to file store for better performance under continuous read mode,
+ // because the merge cannot be performed anyway
+ if (isContinuous) {
+ read.withKeyProjection(projectedFields);
+ } else {
+ valueCountModeProjects = projectedFields;
+ }
} else {
read.withValueProjection(projectedFields);
}
}
- return new FileStoreSourceReader(context, read, valueCountMode);
+ return new FileStoreSourceReader(context, read, valueCountMode, valueCountModeProjects);
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
index 66ebc46..6a5a263 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
@@ -25,6 +25,8 @@ import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.s
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.operation.FileStoreRead;
+import javax.annotation.Nullable;
+
import java.util.Map;
/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
@@ -38,9 +40,12 @@ public final class FileStoreSourceReader
public FileStoreSourceReader(
SourceReaderContext readerContext,
FileStoreRead fileStoreRead,
- boolean valueCountMode) {
+ boolean valueCountMode,
+ @Nullable int[][] valueCountModeProjects) {
super(
- () -> new FileStoreSourceSplitReader(fileStoreRead, valueCountMode),
+ () ->
+ new FileStoreSourceSplitReader(
+ fileStoreRead, valueCountMode, valueCountModeProjects),
(element, output, splitState) -> {
output.collect(element.getRecord());
splitState.setPosition(element);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 728e1a0..82931d3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.connector.source;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.src.impl.FileRecords;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
@@ -28,6 +29,7 @@ import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.s
import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.operation.FileStoreRead;
@@ -38,6 +40,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.LinkedList;
+import java.util.Optional;
import java.util.Queue;
/** The {@link SplitReader} implementation for the file store source. */
@@ -46,6 +49,7 @@ public class FileStoreSourceSplitReader
private final FileStoreRead fileStoreRead;
private final boolean valueCountMode;
+ @Nullable private final int[][] valueCountModeProjects;
private final Queue<FileStoreSourceSplit> splits;
@@ -56,9 +60,18 @@ public class FileStoreSourceSplitReader
private long currentNumRead;
private RecordReader.RecordIterator currentFirstBatch;
+ @VisibleForTesting
public FileStoreSourceSplitReader(FileStoreRead fileStoreRead, boolean valueCountMode) {
+ this(fileStoreRead, valueCountMode, null);
+ }
+
+ public FileStoreSourceSplitReader(
+ FileStoreRead fileStoreRead,
+ boolean valueCountMode,
+ @Nullable int[][] valueCountModeProjects) {
this.fileStoreRead = fileStoreRead;
this.valueCountMode = valueCountMode;
+ this.valueCountModeProjects = valueCountModeProjects;
this.splits = new LinkedList<>();
this.pool = new Pool<>(1);
this.pool.add(
@@ -221,6 +234,12 @@ public class FileStoreSourceSplitReader
private long count = 0;
@Nullable
+ private final ProjectedRowData projectedRow =
+ Optional.ofNullable(valueCountModeProjects)
+ .map(ProjectedRowData::from)
+ .orElse(null);
+
+ @Nullable
@Override
public RecordAndPosition<RowData> next() {
try {
@@ -240,8 +259,9 @@ public class FileStoreSourceSplitReader
if (value < 0) {
row.setRowKind(RowKind.DELETE);
}
- recordAndPosition.setNext(row);
+ setNext(row);
} else {
+ // move forward recordSkipCount
recordAndPosition.setNext(recordAndPosition.getRecord());
}
count--;
@@ -251,5 +271,10 @@ public class FileStoreSourceSplitReader
throw new RuntimeException(e);
}
}
+
+ private void setNext(RowData row) {
+ row = projectedRow == null ? row : projectedRow.replaceRow(row);
+ recordAndPosition.setNext(row);
+ }
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 171ccb5..11da93e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -128,7 +128,8 @@ public class TableStoreSource
return scanContext.createDataStructureConverter(
producedDataType);
}
- });
+ },
+ projectFields);
}
TableStore.SourceBuilder builder =
tableStore
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 95e1f9b..ed4c493 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -213,6 +214,54 @@ public class FileStoreITCase extends AbstractTestBase {
}
@Test
+ public void testKeyedProjection() throws Exception {
+ testProjection();
+ }
+
+ @Test
+ public void testNonKeyedProjection() throws Exception {
+ store.withPrimaryKeys(new int[0]);
+ testProjection();
+ }
+
+ private void testProjection() throws Exception {
+ // write
+ store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+ env.execute();
+
+ // read
+ Projection projection = Projection.of(new int[] {1, 2});
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ DataStructureConverter<RowData, Row> converter =
+ (DataStructureConverter)
+ DataStructureConverters.getConverter(
+ TypeConversions.fromLogicalToDataType(
+ projection.project(TABLE_TYPE)));
+ List<Row> results =
+ executeAndCollect(
+ store.sourceBuilder()
+ .withProjection(projection.toNestedIndexes())
+ .build(env),
+ converter);
+
+ // assert
+ Row[] expected = new Row[] {Row.of("p2", 1), Row.of("p1", 2), Row.of("p2", 5)};
+ if (store.primaryKeys().isEmpty()) {
+ // in streaming mode, expect origin data X 2 (FiniteTestSource)
+ Stream<RowData> expectedStream =
+ isBatch
+ ? SOURCE_DATA.stream()
+ : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
+ expected =
+ expectedStream
+ .map(CONVERTER::toExternal)
+ .map(r -> Row.of(r.getField(1), r.getField(2)))
+ .toArray(Row[]::new);
+ }
+ assertThat(results).containsExactlyInAnyOrder(expected);
+ }
+
+ @Test
public void testContinuous() throws Exception {
store.withPrimaryKeys(new int[] {2});
innerTestContinuous();
@@ -330,10 +379,16 @@ public class FileStoreITCase extends AbstractTestBase {
}
public static List<Row> executeAndCollect(DataStreamSource<RowData> source) throws Exception {
+ return executeAndCollect(source, CONVERTER);
+ }
+
+ public static List<Row> executeAndCollect(
+ DataStreamSource<RowData> source, DataStructureConverter<RowData, Row> converter)
+ throws Exception {
CloseableIterator<RowData> iterator = source.executeAndCollect();
List<Row> results = new ArrayList<>();
while (iterator.hasNext()) {
- results.add(CONVERTER.toExternal(iterator.next()));
+ results.add(converter.toExternal(iterator.next()));
}
iterator.close();
return results;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index d12848f..15e7be3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -121,7 +121,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
KafkaLogStoreFactory factory = discoverKafkaLogFactory();
KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
KafkaLogSourceProvider sourceProvider =
- factory.createSourceProvider(context, SOURCE_CONTEXT);
+ factory.createSourceProvider(context, SOURCE_CONTEXT, null);
factory.onCreateTable(context, 3, true);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index d33afe1..d795ade 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -58,7 +58,7 @@ public class FileStoreSourceReaderTest {
private FileStoreSourceReader createReader(TestingReaderContext context) {
return new FileStoreSourceReader(
- context, new TestDataReadWrite(tempDir.toString(), null).createRead(), false);
+ context, new TestDataReadWrite(tempDir.toString(), null).createRead(), false, null);
}
private static FileStoreSourceSplit createTestFileSplit() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 2300fcf..5f28a3d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.types.RowKind;
+import javax.annotation.Nullable;
+
/**
* Base interface for configuring a default log table connector. The log table is used by managed
* table factory.
@@ -55,7 +57,8 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
* Creates a {@link LogSourceProvider} instance from a {@link CatalogTable} and additional
* context information.
*/
- LogSourceProvider createSourceProvider(Context context, SourceContext sourceContext);
+ LogSourceProvider createSourceProvider(
+ Context context, SourceContext sourceContext, @Nullable int[][] projectFields);
/**
* Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
similarity index 62%
rename from flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
rename to flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
index 50485c6..e17699f 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.store.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
@@ -30,30 +32,41 @@ import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import javax.annotation.Nullable;
+
import java.util.stream.IntStream;
/** A {@link KafkaDeserializationSchema} for the table with primary key in log store. */
-public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
private final TypeInformation<RowData> producedType;
private final int fieldCount;
private final int[] primaryKey;
- private final DeserializationSchema<RowData> keyDeserializer;
+ @Nullable private final DeserializationSchema<RowData> keyDeserializer;
private final DeserializationSchema<RowData> valueDeserializer;
private final RowData.FieldGetter[] keyFieldGetters;
+ @Nullable private final int[][] projectFields;
+
+ private transient ProjectCollector projectCollector;
- public KafkaLogKeyedDeserializationSchema(
+ public KafkaLogDeserializationSchema(
DataType physicalType,
int[] primaryKey,
- DeserializationSchema<RowData> keyDeserializer,
- DeserializationSchema<RowData> valueDeserializer) {
+ @Nullable DeserializationSchema<RowData> keyDeserializer,
+ DeserializationSchema<RowData> valueDeserializer,
+ @Nullable int[][] projectFields) {
this.primaryKey = primaryKey;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
- this.producedType = InternalTypeInfo.of(physicalType.getLogicalType());
+ DataType projectedType =
+ projectFields == null
+ ? physicalType
+ : Projection.of(projectFields).project(physicalType);
+ this.producedType = InternalTypeInfo.of(projectedType.getLogicalType());
this.fieldCount = physicalType.getChildren().size();
+ this.projectFields = projectFields;
this.keyFieldGetters =
IntStream.range(0, primaryKey.length)
.mapToObj(
@@ -69,8 +82,11 @@ public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationS
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
- keyDeserializer.open(context);
+ if (keyDeserializer != null) {
+ keyDeserializer.open(context);
+ }
valueDeserializer.open(context);
+ projectCollector = new ProjectCollector();
}
@Override
@@ -85,17 +101,20 @@ public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationS
}
@Override
- public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> out)
+ public void deserialize(
+ ConsumerRecord<byte[], byte[]> record, Collector<RowData> underCollector)
throws Exception {
- if (record.value() == null) {
+ Collector<RowData> collector = projectCollector.project(underCollector);
+
+ if (primaryKey.length > 0 && record.value() == null) {
RowData key = keyDeserializer.deserialize(record.key());
GenericRowData value = new GenericRowData(RowKind.DELETE, fieldCount);
for (int i = 0; i < primaryKey.length; i++) {
value.setField(primaryKey[i], keyFieldGetters[i].getFieldOrNull(key));
}
- out.collect(value);
+ collector.collect(value);
} else {
- valueDeserializer.deserialize(record.value(), out);
+ valueDeserializer.deserialize(record.value(), collector);
}
}
@@ -103,4 +122,29 @@ public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationS
public TypeInformation<RowData> getProducedType() {
return producedType;
}
+
+ private class ProjectCollector implements Collector<RowData> {
+
+ private final ProjectedRowData projectedRow =
+ projectFields == null ? null : ProjectedRowData.from(projectFields);
+
+ private Collector<RowData> underCollector;
+
+ private Collector<RowData> project(Collector<RowData> underCollector) {
+ if (projectedRow == null) {
+ return underCollector;
+ }
+
+ this.underCollector = underCollector;
+ return this;
+ }
+
+ @Override
+ public void collect(RowData rowData) {
+ underCollector.collect(projectedRow.replaceRow(rowData));
+ }
+
+ @Override
+ public void close() {}
+ }
}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index 68dbe63..d504802 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -56,6 +56,8 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
private final DeserializationSchema<RowData> valueDeserializer;
+ @Nullable private final int[][] projectFields;
+
private final LogConsistency consistency;
private final LogStartupMode scanMode;
@@ -69,6 +71,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
int[] primaryKey,
@Nullable DeserializationSchema<RowData> keyDeserializer,
DeserializationSchema<RowData> valueDeserializer,
+ @Nullable int[][] projectFields,
LogConsistency consistency,
LogStartupMode scanMode,
@Nullable Long timestampMills) {
@@ -78,6 +81,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
this.primaryKey = primaryKey;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
+ this.projectFields = projectFields;
this.consistency = consistency;
this.scanMode = scanMode;
this.timestampMills = timestampMills;
@@ -109,11 +113,13 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
@VisibleForTesting
KafkaRecordDeserializationSchema<RowData> createDeserializationSchema() {
- return primaryKey.length > 0
- ? KafkaRecordDeserializationSchema.of(
- new KafkaLogKeyedDeserializationSchema(
- physicalType, primaryKey, keyDeserializer, valueDeserializer))
- : KafkaRecordDeserializationSchema.valueOnly(valueDeserializer);
+ return KafkaRecordDeserializationSchema.of(
+ new KafkaLogDeserializationSchema(
+ physicalType,
+ primaryKey,
+ keyDeserializer,
+ valueDeserializer,
+ projectFields));
}
private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, Long> bucketOffsets) {
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 943d0b6..58386fb 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -159,7 +161,9 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
@Override
public KafkaLogSourceProvider createSourceProvider(
- DynamicTableFactory.Context context, SourceContext sourceContext) {
+ DynamicTableFactory.Context context,
+ SourceContext sourceContext,
+ @Nullable int[][] projectFields) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
@@ -181,6 +185,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
primaryKey,
keyDeserializer,
valueDeserializer,
+ projectFields,
helper.getOptions().get(CONSISTENCY),
helper.getOptions().get(SCAN),
helper.getOptions().get(SCAN_TIMESTAMP_MILLS));
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index bb18a11..ab05fa7 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.kafka;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
@@ -138,8 +139,6 @@ public class KafkaLogITCase extends KafkaTableTestBase {
testContext(name, getBootstrapServers(), changelogMode, consistency, keyed);
KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
- KafkaLogSourceProvider sourceProvider =
- factory.createSourceProvider(context, SOURCE_CONTEXT);
factory.onCreateTable(context, 3, true);
try {
@@ -158,12 +157,10 @@ public class KafkaLogITCase extends KafkaTableTestBase {
// 1.2 read
List<RowData> records =
- env.fromSource(
- sourceProvider.createSource(null),
- WatermarkStrategy.noWatermarks(),
- "source")
- .executeAndCollect(4);
- records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+ collect(
+ factory.createSourceProvider(context, SOURCE_CONTEXT, null)
+ .createSource(null),
+ 4);
// delete, upsert mode
if (changelogMode == LogChangelogMode.UPSERT) {
@@ -171,12 +168,30 @@ public class KafkaLogITCase extends KafkaTableTestBase {
} else {
assertRow(records.get(0), RowKind.DELETE, 1, 2);
}
-
// inserts
assertRow(records.get(1), RowKind.INSERT, 3, 4);
assertRow(records.get(2), RowKind.INSERT, 5, 6);
assertRow(records.get(3), RowKind.INSERT, 7, 8);
+ // 1.3 read with projection
+ records =
+ collect(
+ factory.createSourceProvider(
+ context, SOURCE_CONTEXT, new int[][] {new int[] {1}})
+ .createSource(null),
+ 4);
+
+ // delete, upsert mode
+ if (changelogMode == LogChangelogMode.UPSERT) {
+ assertValue(records.get(0), RowKind.DELETE, null);
+ } else {
+ assertValue(records.get(0), RowKind.DELETE, 2);
+ }
+ // inserts
+ assertValue(records.get(1), RowKind.INSERT, 4);
+ assertValue(records.get(2), RowKind.INSERT, 6);
+ assertValue(records.get(3), RowKind.INSERT, 8);
+
// 2.1 sink
env.fromElements(
testRecord(true, 0, 9, 10, RowKind.INSERT),
@@ -187,13 +202,10 @@ public class KafkaLogITCase extends KafkaTableTestBase {
// 2.2 read from offsets
records =
- env.fromSource(
- sourceProvider.createSource(
- TestOffsetsLogSink.drainOffsets(uuid)),
- WatermarkStrategy.noWatermarks(),
- "source")
- .executeAndCollect(3);
- records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+ collect(
+ factory.createSourceProvider(context, SOURCE_CONTEXT, null)
+ .createSource(TestOffsetsLogSink.drainOffsets(uuid)),
+ 3);
assertRow(records.get(0), RowKind.INSERT, 9, 10);
assertRow(records.get(1), RowKind.INSERT, 11, 12);
assertRow(records.get(2), RowKind.INSERT, 13, 14);
@@ -202,6 +214,14 @@ public class KafkaLogITCase extends KafkaTableTestBase {
}
}
+ private List<RowData> collect(KafkaSource<RowData> source, int numRecord) throws Exception {
+ List<RowData> records =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ .executeAndCollect(numRecord);
+ records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+ return records;
+ }
+
private void enableCheckpoint() {
Configuration configuration = new Configuration();
configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
@@ -214,4 +234,9 @@ public class KafkaLogITCase extends KafkaTableTestBase {
Assert.assertEquals(k, row.isNullAt(0) ? null : row.getInt(0));
Assert.assertEquals(v, row.isNullAt(1) ? null : row.getInt(1));
}
+
+ private void assertValue(RowData row, RowKind rowKind, Integer v) {
+ Assert.assertEquals(rowKind, row.getRowKind());
+ Assert.assertEquals(v, row.isNullAt(0) ? null : row.getInt(0));
+ }
}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index c8461a0..1a48f77 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -38,7 +38,7 @@ import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testContext;
import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testRecord;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link KafkaLogSerializationSchema} and {@link KafkaLogKeyedDeserializationSchema}. */
+/** Test for {@link KafkaLogSerializationSchema} and {@link KafkaLogDeserializationSchema}. */
public class KafkaLogSerializationTest {
private static final String TOPIC = "my_topic";
@@ -140,7 +140,7 @@ public class KafkaLogSerializationTest {
private static KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema(
DynamicTableFactory.Context context) {
return discoverKafkaLogFactory()
- .createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT)
+ .createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT, null)
.createDeserializationSchema();
}
}