You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/03 09:07:38 UTC

[flink-table-store] branch master updated: [hotfix] Rename ambiguous row type to value type

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 02b9892  [hotfix] Rename ambiguous row type to value type
02b9892 is described below

commit 02b9892cb3b40cbc917bf02215659b3ee0fd2aa5
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Mar 3 17:07:32 2022 +0800

    [hotfix] Rename ambiguous row type to value type
    
    This closes #34
---
 .../store/connector/sink/FileCommittableSerializer.java    |  4 ++--
 .../apache/flink/table/store/connector/sink/StoreSink.java | 12 ++++++------
 .../table/store/connector/source/FileStoreSource.java      | 14 +++++++-------
 .../connector/source/FileStoreSourceSplitSerializer.java   |  5 +++--
 .../store/file/manifest/ManifestCommittableSerializer.java |  5 +++--
 .../flink/table/store/file/manifest/ManifestEntry.java     |  4 ++--
 .../table/store/file/manifest/ManifestEntrySerializer.java |  6 +++---
 .../flink/table/store/file/manifest/ManifestFile.java      | 10 +++++-----
 .../flink/table/store/file/mergetree/sst/SstFileMeta.java  |  4 ++--
 .../store/file/mergetree/sst/SstFileMetaSerializer.java    |  6 +++---
 10 files changed, 36 insertions(+), 34 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
index 69bb44b..cb1c412 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
@@ -35,9 +35,9 @@ public class FileCommittableSerializer implements SimpleVersionedSerializer<File
     private final BinaryRowDataSerializer partSerializer;
     private final SstFileMetaSerializer sstSerializer;
 
-    public FileCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+    public FileCommittableSerializer(RowType partitionType, RowType keyType, RowType valueType) {
         this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
-        this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+        this.sstSerializer = new SstFileMetaSerializer(keyType, valueType);
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 1b980c6..fdd3cbe 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -56,7 +56,7 @@ public class StoreSink<WriterStateT, LogCommT>
 
     private final int[] partitions;
 
-    private final int[] keys;
+    private final int[] primaryKeys;
 
     private final int numBucket;
 
@@ -69,7 +69,7 @@ public class StoreSink<WriterStateT, LogCommT>
             FileStore fileStore,
             RowType rowType,
             int[] partitions,
-            int[] keys,
+            int[] primaryKeys,
             int numBucket,
             @Nullable CatalogLock.Factory lockFactory,
             @Nullable Map<String, String> overwritePartition) {
@@ -77,7 +77,7 @@ public class StoreSink<WriterStateT, LogCommT>
         this.fileStore = fileStore;
         this.rowType = rowType;
         this.partitions = partitions;
-        this.keys = keys;
+        this.primaryKeys = primaryKeys;
         this.numBucket = numBucket;
         this.lockFactory = lockFactory;
         this.overwritePartition = overwritePartition;
@@ -93,7 +93,7 @@ public class StoreSink<WriterStateT, LogCommT>
             InitContext initContext, Collection<WriterStateT> states) throws IOException {
         return new StoreSinkWriter<>(
                 fileStore.newWrite(),
-                new SinkRecordConverter(numBucket, rowType, partitions, keys),
+                new SinkRecordConverter(numBucket, rowType, partitions, primaryKeys),
                 fileCommitSerializer(),
                 overwritePartition != null);
     }
@@ -136,14 +136,14 @@ public class StoreSink<WriterStateT, LogCommT>
     public GlobalCommittableSerializer<LogCommT> getGlobalCommittableSerializer() {
         ManifestCommittableSerializer fileCommSerializer =
                 new ManifestCommittableSerializer(
-                        project(rowType, partitions), project(rowType, keys), rowType);
+                        project(rowType, partitions), project(rowType, primaryKeys), rowType);
         SimpleVersionedSerializer<LogCommT> logCommitSerializer = new NoOutputSerializer<>();
         return new GlobalCommittableSerializer<>(logCommitSerializer, fileCommSerializer);
     }
 
     private FileCommittableSerializer fileCommitSerializer() {
         return new FileCommittableSerializer(
-                project(rowType, partitions), project(rowType, keys), rowType);
+                project(rowType, partitions), project(rowType, primaryKeys), rowType);
     }
 
     private static class NoOutputSerializer<T> implements SimpleVersionedSerializer<T> {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index a9f1497..062a49f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -48,7 +48,7 @@ public class FileStoreSource
 
     private final int[] partitions;
 
-    private final int[] keys;
+    private final int[] primaryKeys;
 
     @Nullable private final int[][] projectedFields;
 
@@ -60,14 +60,14 @@ public class FileStoreSource
             FileStore fileStore,
             RowType rowType,
             int[] partitions,
-            int[] keys,
+            int[] primaryKeys,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
             @Nullable Predicate fieldsPredicate) {
         this.fileStore = fileStore;
         this.rowType = rowType;
         this.partitions = partitions;
-        this.keys = keys;
+        this.primaryKeys = primaryKeys;
         this.projectedFields = projectedFields;
         this.partitionPredicate = partitionPredicate;
         this.fieldsPredicate = fieldsPredicate;
@@ -83,13 +83,13 @@ public class FileStoreSource
     public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) {
         FileStoreRead read = fileStore.newRead();
         if (projectedFields != null) {
-            if (keys.length == 0) {
+            if (primaryKeys.length == 0) {
                 read.withKeyProjection(projectedFields);
             } else {
                 read.withValueProjection(projectedFields);
             }
         }
-        return new FileStoreSourceReader(context, read, keys.length == 0);
+        return new FileStoreSourceReader(context, read, primaryKeys.length == 0);
     }
 
     @Override
@@ -100,7 +100,7 @@ public class FileStoreSource
             scan.withPartitionFilter(partitionPredicate);
         }
         if (fieldsPredicate != null) {
-            if (keys.length == 0) {
+            if (primaryKeys.length == 0) {
                 scan.withKeyFilter(fieldsPredicate);
             } else {
                 scan.withValueFilter(fieldsPredicate);
@@ -120,7 +120,7 @@ public class FileStoreSource
     @Override
     public FileStoreSourceSplitSerializer getSplitSerializer() {
         return new FileStoreSourceSplitSerializer(
-                project(rowType, partitions), project(rowType, keys), rowType);
+                project(rowType, partitions), project(rowType, primaryKeys), rowType);
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 239facc..c91f0ee 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -35,9 +35,10 @@ public class FileStoreSourceSplitSerializer
     private final BinaryRowDataSerializer partSerializer;
     private final SstFileMetaSerializer sstSerializer;
 
-    public FileStoreSourceSplitSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+    public FileStoreSourceSplitSerializer(
+            RowType partitionType, RowType keyType, RowType valueType) {
         this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
-        this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+        this.sstSerializer = new SstFileMetaSerializer(keyType, valueType);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index 7ceb607..0cce230 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -41,9 +41,10 @@ public class ManifestCommittableSerializer
     private final BinaryRowDataSerializer partSerializer;
     private final SstFileMetaSerializer sstSerializer;
 
-    public ManifestCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+    public ManifestCommittableSerializer(
+            RowType partitionType, RowType keyType, RowType valueType) {
         this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
-        this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+        this.sstSerializer = new SstFileMetaSerializer(keyType, valueType);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
index 170b024..75b7e28 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
@@ -76,13 +76,13 @@ public class ManifestEntry {
         return new Identifier(partition, bucket, file.level(), file.fileName());
     }
 
-    public static RowType schema(RowType partitionType, RowType keyType, RowType rowType) {
+    public static RowType schema(RowType partitionType, RowType keyType, RowType valueType) {
         List<RowType.RowField> fields = new ArrayList<>();
         fields.add(new RowType.RowField("_KIND", new TinyIntType(false)));
         fields.add(new RowType.RowField("_PARTITION", partitionType));
         fields.add(new RowType.RowField("_BUCKET", new IntType(false)));
         fields.add(new RowType.RowField("_TOTAL_BUCKETS", new IntType(false)));
-        fields.add(new RowType.RowField("_FILE", SstFileMeta.schema(keyType, rowType)));
+        fields.add(new RowType.RowField("_FILE", SstFileMeta.schema(keyType, valueType)));
         return new RowType(fields);
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
index 3ab74fd..51736f6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
@@ -34,10 +34,10 @@ public class ManifestEntrySerializer extends ObjectSerializer<ManifestEntry> {
     private final RowDataSerializer partitionSerializer;
     private final SstFileMetaSerializer sstFileMetaSerializer;
 
-    public ManifestEntrySerializer(RowType partitionType, RowType keyType, RowType rowType) {
-        super(ManifestEntry.schema(partitionType, keyType, rowType));
+    public ManifestEntrySerializer(RowType partitionType, RowType keyType, RowType valueType) {
+        super(ManifestEntry.schema(partitionType, keyType, valueType));
         this.partitionSerializer = new RowDataSerializer(partitionType);
-        this.sstFileMetaSerializer = new SstFileMetaSerializer(keyType, rowType);
+        this.sstFileMetaSerializer = new SstFileMetaSerializer(keyType, valueType);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 57ec343..cbc1571 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -172,7 +172,7 @@ public class ManifestFile {
 
         private final RowType partitionType;
         private final RowType keyType;
-        private final RowType rowType;
+        private final RowType valueType;
         private final BulkFormat<RowData, FileSourceSplit> readerFactory;
         private final BulkWriter.Factory<RowData> writerFactory;
         private final FileStorePathFactory pathFactory;
@@ -181,14 +181,14 @@ public class ManifestFile {
         public Factory(
                 RowType partitionType,
                 RowType keyType,
-                RowType rowType,
+                RowType valueType,
                 FileFormat fileFormat,
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize) {
             this.partitionType = partitionType;
             this.keyType = keyType;
-            this.rowType = rowType;
-            RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
+            this.valueType = valueType;
+            RowType entryType = ManifestEntry.schema(partitionType, keyType, valueType);
             this.readerFactory = fileFormat.createReaderFactory(entryType);
             this.writerFactory = fileFormat.createWriterFactory(entryType);
             this.pathFactory = pathFactory;
@@ -198,7 +198,7 @@ public class ManifestFile {
         public ManifestFile create() {
             return new ManifestFile(
                     partitionType,
-                    new ManifestEntrySerializer(partitionType, keyType, rowType),
+                    new ManifestEntrySerializer(partitionType, keyType, valueType),
                     readerFactory,
                     writerFactory,
                     pathFactory,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
index 5a7e99a..da419e8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -170,14 +170,14 @@ public class SstFileMeta {
                 level);
     }
 
-    public static RowType schema(RowType keyType, RowType rowType) {
+    public static RowType schema(RowType keyType, RowType valueType) {
         List<RowType.RowField> fields = new ArrayList<>();
         fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
         fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
         fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
         fields.add(new RowType.RowField("_MIN_KEY", keyType));
         fields.add(new RowType.RowField("_MAX_KEY", keyType));
-        fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(rowType)));
+        fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(valueType)));
         fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
         fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
         fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
index ba79774..d03a71e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -34,10 +34,10 @@ public class SstFileMetaSerializer extends ObjectSerializer<SstFileMeta> {
     private final RowDataSerializer keySerializer;
     private final FieldStatsArraySerializer statsArraySerializer;
 
-    public SstFileMetaSerializer(RowType keyType, RowType rowType) {
-        super(SstFileMeta.schema(keyType, rowType));
+    public SstFileMetaSerializer(RowType keyType, RowType valueType) {
+        super(SstFileMeta.schema(keyType, valueType));
         this.keySerializer = new RowDataSerializer(keyType);
-        this.statsArraySerializer = new FieldStatsArraySerializer(rowType);
+        this.statsArraySerializer = new FieldStatsArraySerializer(valueType);
     }
 
     @Override