You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/10 02:45:19 UTC

[GitHub] [flink-table-store] openinx commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

openinx commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868754349


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -69,7 +92,10 @@ public FileStoreSource(
             boolean latestContinuous,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
-            @Nullable Predicate fieldPredicate) {
+            @Nullable Predicate fieldPredicate,
+            @Nullable Long specifiedSnapshotId,
+            @Nullable
+                    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries) {

Review Comment:
   What's the case that we will set those two fields with non-null values ?  I see all of them will be set to be `null` in the `buildFileStore`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");

Review Comment:
   The error message looks quite strange for me because I'm just curious :  Is it the only case that we will set `specifiedSnapshotId` and `specifiedManifestEntries` for manual compaction ?  Will be other case that we will set the two fields ? 



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();

Review Comment:
   What's the meaning of suffix `ctr`  ?  partition number or partition count ? So why not just use partitionNum ?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();
+            while (partitionCtr > 0) {
+                BinaryRowData partition = partSerializer.deserialize(view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+                int bucketCtr = view.readInt();
+                while (bucketCtr > 0) {
+                    int bucket = view.readInt();
+                    int entryCtr = view.readInt();
+                    if (entryCtr == 0) {
+                        bucketEntry.put(bucket, Collections.emptyList());
+                    } else {
+                        List<DataFileMeta> metas = new ArrayList<>();
+                        while (entryCtr > 0) {
+                            metas.add(metaSerializer.deserialize(view));
+                            entryCtr--;
+                        }
+                        bucketEntry.put(bucket, metas);
+                    }
+                    bucketCtr--;
+                }
+                specifiedManifestEntries.put(partition, bucketEntry);
+                partitionCtr--;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FileStoreSource)) {
+            return false;
+        }
+        FileStoreSource that = (FileStoreSource) o;
+        return valueCountMode == that.valueCountMode
+                && isContinuous == that.isContinuous
+                && discoveryInterval == that.discoveryInterval
+                && latestContinuous == that.latestContinuous
+                && fileStore.equals(that.fileStore)
+                && Arrays.equals(projectedFields, that.projectedFields)
+                && Objects.equals(partitionPredicate, that.partitionPredicate)
+                && Objects.equals(fieldPredicate, that.fieldPredicate)
+                && Objects.equals(specifiedSnapshotId, that.specifiedSnapshotId)
+                && Objects.equals(specifiedManifestEntries, that.specifiedManifestEntries);
+    }
+
+    @Override
+    public int hashCode() {
+        int result =
+                Objects.hash(
+                        fileStore,
+                        valueCountMode,
+                        isContinuous,
+                        discoveryInterval,
+                        latestContinuous,
+                        partitionPredicate,
+                        fieldPredicate,
+                        specifiedSnapshotId,
+                        specifiedManifestEntries);
+        result = 31 * result + Arrays.hashCode(projectedFields);

Review Comment:
   It's recommended to use `Arrays.deepHashCode` for multiple dimension array, which is similar to the above comment.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");
+            return new StaticFileStoreSplitEnumerator(
+                    context,
+                    scan.snapshot(specifiedSnapshotId),
+                    new FileStoreSourceSplitGenerator().createSplits(specifiedManifestEntries));
+        }

Review Comment:
   Nit: Please left an empty line for separating the different code blocks,  that makes code more clear for the reviewer.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {

Review Comment:
   Could we just introduce two separate class named `PartMeta` and `BucketMeta` to manage all the `DataFileMetaData` for a given partition and a given bucket, and moving this serializer & deserializer logic into those two classes ?  



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();
+            while (partitionCtr > 0) {
+                BinaryRowData partition = partSerializer.deserialize(view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+                int bucketCtr = view.readInt();
+                while (bucketCtr > 0) {
+                    int bucket = view.readInt();
+                    int entryCtr = view.readInt();
+                    if (entryCtr == 0) {
+                        bucketEntry.put(bucket, Collections.emptyList());
+                    } else {
+                        List<DataFileMeta> metas = new ArrayList<>();
+                        while (entryCtr > 0) {
+                            metas.add(metaSerializer.deserialize(view));
+                            entryCtr--;
+                        }
+                        bucketEntry.put(bucket, metas);
+                    }
+                    bucketCtr--;
+                }
+                specifiedManifestEntries.put(partition, bucketEntry);
+                partitionCtr--;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FileStoreSource)) {
+            return false;
+        }
+        FileStoreSource that = (FileStoreSource) o;
+        return valueCountMode == that.valueCountMode
+                && isContinuous == that.isContinuous
+                && discoveryInterval == that.discoveryInterval
+                && latestContinuous == that.latestContinuous
+                && fileStore.equals(that.fileStore)
+                && Arrays.equals(projectedFields, that.projectedFields)

Review Comment:
   It's recommended to use `deepEquals` for the multiple dimension arrays because the `Arrays.equals` will only check the object quality by using `Objects.equals()` for internal element, which does not work for an element whose type is a `int[]`. 
   
   
   ```java
   public static boolean equals(Object[] a, Object[] a2) {
           if (a==a2)
               return true;
           if (a==null || a2==null)
               return false;
   
           int length = a.length;
           if (a2.length != length)
               return false;
   
           for (int i=0; i<length; i++) {
               if (!Objects.equals(a[i], a2[i]))
                   return false;
           }
   
           return true;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org