You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/03 09:07:38 UTC
[flink-table-store] branch master updated: [hotfix] Rename ambiguous row type to value type
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 02b9892 [hotfix] Rename ambiguous row type to value type
02b9892 is described below
commit 02b9892cb3b40cbc917bf02215659b3ee0fd2aa5
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Mar 3 17:07:32 2022 +0800
[hotfix] Rename ambiguous row type to value type
This closes #34
---
.../store/connector/sink/FileCommittableSerializer.java | 4 ++--
.../apache/flink/table/store/connector/sink/StoreSink.java | 12 ++++++------
.../table/store/connector/source/FileStoreSource.java | 14 +++++++-------
.../connector/source/FileStoreSourceSplitSerializer.java | 5 +++--
.../store/file/manifest/ManifestCommittableSerializer.java | 5 +++--
.../flink/table/store/file/manifest/ManifestEntry.java | 4 ++--
.../table/store/file/manifest/ManifestEntrySerializer.java | 6 +++---
.../flink/table/store/file/manifest/ManifestFile.java | 10 +++++-----
.../flink/table/store/file/mergetree/sst/SstFileMeta.java | 4 ++--
.../store/file/mergetree/sst/SstFileMetaSerializer.java | 6 +++---
10 files changed, 36 insertions(+), 34 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
index 69bb44b..cb1c412 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
@@ -35,9 +35,9 @@ public class FileCommittableSerializer implements SimpleVersionedSerializer<File
private final BinaryRowDataSerializer partSerializer;
private final SstFileMetaSerializer sstSerializer;
- public FileCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+ public FileCommittableSerializer(RowType partitionType, RowType keyType, RowType valueType) {
this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
- this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+ this.sstSerializer = new SstFileMetaSerializer(keyType, valueType);
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 1b980c6..fdd3cbe 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -56,7 +56,7 @@ public class StoreSink<WriterStateT, LogCommT>
private final int[] partitions;
- private final int[] keys;
+ private final int[] primaryKeys;
private final int numBucket;
@@ -69,7 +69,7 @@ public class StoreSink<WriterStateT, LogCommT>
FileStore fileStore,
RowType rowType,
int[] partitions,
- int[] keys,
+ int[] primaryKeys,
int numBucket,
@Nullable CatalogLock.Factory lockFactory,
@Nullable Map<String, String> overwritePartition) {
@@ -77,7 +77,7 @@ public class StoreSink<WriterStateT, LogCommT>
this.fileStore = fileStore;
this.rowType = rowType;
this.partitions = partitions;
- this.keys = keys;
+ this.primaryKeys = primaryKeys;
this.numBucket = numBucket;
this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
@@ -93,7 +93,7 @@ public class StoreSink<WriterStateT, LogCommT>
InitContext initContext, Collection<WriterStateT> states) throws IOException {
return new StoreSinkWriter<>(
fileStore.newWrite(),
- new SinkRecordConverter(numBucket, rowType, partitions, keys),
+ new SinkRecordConverter(numBucket, rowType, partitions, primaryKeys),
fileCommitSerializer(),
overwritePartition != null);
}
@@ -136,14 +136,14 @@ public class StoreSink<WriterStateT, LogCommT>
public GlobalCommittableSerializer<LogCommT> getGlobalCommittableSerializer() {
ManifestCommittableSerializer fileCommSerializer =
new ManifestCommittableSerializer(
- project(rowType, partitions), project(rowType, keys), rowType);
+ project(rowType, partitions), project(rowType, primaryKeys), rowType);
SimpleVersionedSerializer<LogCommT> logCommitSerializer = new NoOutputSerializer<>();
return new GlobalCommittableSerializer<>(logCommitSerializer, fileCommSerializer);
}
private FileCommittableSerializer fileCommitSerializer() {
return new FileCommittableSerializer(
- project(rowType, partitions), project(rowType, keys), rowType);
+ project(rowType, partitions), project(rowType, primaryKeys), rowType);
}
private static class NoOutputSerializer<T> implements SimpleVersionedSerializer<T> {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index a9f1497..062a49f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -48,7 +48,7 @@ public class FileStoreSource
private final int[] partitions;
- private final int[] keys;
+ private final int[] primaryKeys;
@Nullable private final int[][] projectedFields;
@@ -60,14 +60,14 @@ public class FileStoreSource
FileStore fileStore,
RowType rowType,
int[] partitions,
- int[] keys,
+ int[] primaryKeys,
@Nullable int[][] projectedFields,
@Nullable Predicate partitionPredicate,
@Nullable Predicate fieldsPredicate) {
this.fileStore = fileStore;
this.rowType = rowType;
this.partitions = partitions;
- this.keys = keys;
+ this.primaryKeys = primaryKeys;
this.projectedFields = projectedFields;
this.partitionPredicate = partitionPredicate;
this.fieldsPredicate = fieldsPredicate;
@@ -83,13 +83,13 @@ public class FileStoreSource
public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) {
FileStoreRead read = fileStore.newRead();
if (projectedFields != null) {
- if (keys.length == 0) {
+ if (primaryKeys.length == 0) {
read.withKeyProjection(projectedFields);
} else {
read.withValueProjection(projectedFields);
}
}
- return new FileStoreSourceReader(context, read, keys.length == 0);
+ return new FileStoreSourceReader(context, read, primaryKeys.length == 0);
}
@Override
@@ -100,7 +100,7 @@ public class FileStoreSource
scan.withPartitionFilter(partitionPredicate);
}
if (fieldsPredicate != null) {
- if (keys.length == 0) {
+ if (primaryKeys.length == 0) {
scan.withKeyFilter(fieldsPredicate);
} else {
scan.withValueFilter(fieldsPredicate);
@@ -120,7 +120,7 @@ public class FileStoreSource
@Override
public FileStoreSourceSplitSerializer getSplitSerializer() {
return new FileStoreSourceSplitSerializer(
- project(rowType, partitions), project(rowType, keys), rowType);
+ project(rowType, partitions), project(rowType, primaryKeys), rowType);
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 239facc..c91f0ee 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -35,9 +35,10 @@ public class FileStoreSourceSplitSerializer
private final BinaryRowDataSerializer partSerializer;
private final SstFileMetaSerializer sstSerializer;
- public FileStoreSourceSplitSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+ public FileStoreSourceSplitSerializer(
+ RowType partitionType, RowType keyType, RowType valueType) {
this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
- this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+ this.sstSerializer = new SstFileMetaSerializer(keyType, valueType);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index 7ceb607..0cce230 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -41,9 +41,10 @@ public class ManifestCommittableSerializer
private final BinaryRowDataSerializer partSerializer;
private final SstFileMetaSerializer sstSerializer;
- public ManifestCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+ public ManifestCommittableSerializer(
+ RowType partitionType, RowType keyType, RowType valueType) {
this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
- this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+ this.sstSerializer = new SstFileMetaSerializer(keyType, valueType);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
index 170b024..75b7e28 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
@@ -76,13 +76,13 @@ public class ManifestEntry {
return new Identifier(partition, bucket, file.level(), file.fileName());
}
- public static RowType schema(RowType partitionType, RowType keyType, RowType rowType) {
+ public static RowType schema(RowType partitionType, RowType keyType, RowType valueType) {
List<RowType.RowField> fields = new ArrayList<>();
fields.add(new RowType.RowField("_KIND", new TinyIntType(false)));
fields.add(new RowType.RowField("_PARTITION", partitionType));
fields.add(new RowType.RowField("_BUCKET", new IntType(false)));
fields.add(new RowType.RowField("_TOTAL_BUCKETS", new IntType(false)));
- fields.add(new RowType.RowField("_FILE", SstFileMeta.schema(keyType, rowType)));
+ fields.add(new RowType.RowField("_FILE", SstFileMeta.schema(keyType, valueType)));
return new RowType(fields);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
index 3ab74fd..51736f6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
@@ -34,10 +34,10 @@ public class ManifestEntrySerializer extends ObjectSerializer<ManifestEntry> {
private final RowDataSerializer partitionSerializer;
private final SstFileMetaSerializer sstFileMetaSerializer;
- public ManifestEntrySerializer(RowType partitionType, RowType keyType, RowType rowType) {
- super(ManifestEntry.schema(partitionType, keyType, rowType));
+ public ManifestEntrySerializer(RowType partitionType, RowType keyType, RowType valueType) {
+ super(ManifestEntry.schema(partitionType, keyType, valueType));
this.partitionSerializer = new RowDataSerializer(partitionType);
- this.sstFileMetaSerializer = new SstFileMetaSerializer(keyType, rowType);
+ this.sstFileMetaSerializer = new SstFileMetaSerializer(keyType, valueType);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 57ec343..cbc1571 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -172,7 +172,7 @@ public class ManifestFile {
private final RowType partitionType;
private final RowType keyType;
- private final RowType rowType;
+ private final RowType valueType;
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
@@ -181,14 +181,14 @@ public class ManifestFile {
public Factory(
RowType partitionType,
RowType keyType,
- RowType rowType,
+ RowType valueType,
FileFormat fileFormat,
FileStorePathFactory pathFactory,
long suggestedFileSize) {
this.partitionType = partitionType;
this.keyType = keyType;
- this.rowType = rowType;
- RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
+ this.valueType = valueType;
+ RowType entryType = ManifestEntry.schema(partitionType, keyType, valueType);
this.readerFactory = fileFormat.createReaderFactory(entryType);
this.writerFactory = fileFormat.createWriterFactory(entryType);
this.pathFactory = pathFactory;
@@ -198,7 +198,7 @@ public class ManifestFile {
public ManifestFile create() {
return new ManifestFile(
partitionType,
- new ManifestEntrySerializer(partitionType, keyType, rowType),
+ new ManifestEntrySerializer(partitionType, keyType, valueType),
readerFactory,
writerFactory,
pathFactory,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
index 5a7e99a..da419e8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -170,14 +170,14 @@ public class SstFileMeta {
level);
}
- public static RowType schema(RowType keyType, RowType rowType) {
+ public static RowType schema(RowType keyType, RowType valueType) {
List<RowType.RowField> fields = new ArrayList<>();
fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
fields.add(new RowType.RowField("_MIN_KEY", keyType));
fields.add(new RowType.RowField("_MAX_KEY", keyType));
- fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(rowType)));
+ fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(valueType)));
fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
index ba79774..d03a71e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -34,10 +34,10 @@ public class SstFileMetaSerializer extends ObjectSerializer<SstFileMeta> {
private final RowDataSerializer keySerializer;
private final FieldStatsArraySerializer statsArraySerializer;
- public SstFileMetaSerializer(RowType keyType, RowType rowType) {
- super(SstFileMeta.schema(keyType, rowType));
+ public SstFileMetaSerializer(RowType keyType, RowType valueType) {
+ super(SstFileMeta.schema(keyType, valueType));
this.keySerializer = new RowDataSerializer(keyType);
- this.statsArraySerializer = new FieldStatsArraySerializer(rowType);
+ this.statsArraySerializer = new FieldStatsArraySerializer(valueType);
}
@Override