You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/07 07:53:15 UTC

[GitHub] [flink-table-store] LadyForest opened a new pull request, #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

LadyForest opened a new pull request, #111:
URL: https://github.com/apache/flink-table-store/pull/111

   Let `FileStoreSource` skip planning if manifest entries and snapshot id are specified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767378


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -69,7 +92,10 @@ public FileStoreSource(
             boolean latestContinuous,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
-            @Nullable Predicate fieldPredicate) {
+            @Nullable Predicate fieldPredicate,
+            @Nullable Long specifiedSnapshotId,
+            @Nullable
+                    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries) {

Review Comment:
   > What's the case that we will set those two fields with non-null values ? I see all of them will be set to be `null` in the `buildFileStore`.
   
   For `ALTER TABLE ... COMPACT`, the `buildFileStore` will be modified in the following PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868885469


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +216,65 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            new PartitionedDataManifestSerializer(
+                            fileStore.partitionType().getFieldCount(),
+                            fileStore.keyType(),
+                            fileStore.valueType())
+                    .serialize(specifiedManifestEntries, out);
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            specifiedManifestEntries =
+                    new PartitionedDataManifestSerializer(
+                                    fileStore.partitionType().getFieldCount(),
+                                    fileStore.keyType(),
+                                    fileStore.valueType())
+                            .deserialize(in);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   We dont need to add `equals` and `hashCode` for this class.
   The source is functional, not a pojo, and I don't think we need to implement these interfaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767988


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();
+            while (partitionCtr > 0) {
+                BinaryRowData partition = partSerializer.deserialize(view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+                int bucketCtr = view.readInt();
+                while (bucketCtr > 0) {
+                    int bucket = view.readInt();
+                    int entryCtr = view.readInt();
+                    if (entryCtr == 0) {
+                        bucketEntry.put(bucket, Collections.emptyList());
+                    } else {
+                        List<DataFileMeta> metas = new ArrayList<>();
+                        while (entryCtr > 0) {
+                            metas.add(metaSerializer.deserialize(view));
+                            entryCtr--;
+                        }
+                        bucketEntry.put(bucket, metas);
+                    }
+                    bucketCtr--;
+                }
+                specifiedManifestEntries.put(partition, bucketEntry);
+                partitionCtr--;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FileStoreSource)) {
+            return false;
+        }
+        FileStoreSource that = (FileStoreSource) o;
+        return valueCountMode == that.valueCountMode
+                && isContinuous == that.isContinuous
+                && discoveryInterval == that.discoveryInterval
+                && latestContinuous == that.latestContinuous
+                && fileStore.equals(that.fileStore)
+                && Arrays.equals(projectedFields, that.projectedFields)

Review Comment:
   Thanks for the reporting. This code snippet is auto-generated by Intellij. I should check more carefully. Blame on me, and I'll fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r867464932


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +66,13 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
+    @Nullable private final Long specifiedSnapshotId;
+
+    /** The manifest entries collected at planning phase when manual compaction is triggered. */
+    @Nullable
+    private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries;

Review Comment:
   `DataFileMeta` is not serializable, but `FileStoreSource ` is.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +66,13 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
+    @Nullable private final Long specifiedSnapshotId;
+
+    /** The manifest entries collected at planning phase when manual compaction is triggered. */
+    @Nullable
+    private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries;

Review Comment:
   `DataFileMeta` is not serializable, but `FileStoreSource` is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767106


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");

Review Comment:
   > The error message looks quite strange for me because I'm just curious : Is it the only case that we will set `specifiedSnapshotId` and `specifiedManifestEntries` for manual compaction ? Will be other case that we will set the two fields ?
   
   So far, only manually invoked compaction (triggered by `ALTER TABLE ... COMPACT`) will specify the snapshot id and manifest entries. Other conditions will perform a scan during the runtime.
   
   The reason is that `ALTER TABLE ... COMPACT` will pre-scan the latest snapshot during the planning phase, serialize the info as a string, and put it back to enriched options. Therefore, at the runtime, the source does not need to perform a scan anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r869075055


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +132,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;

Review Comment:
   revert these two fields?
   They should be in below.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +63,12 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /**
+     * The partitioned manifest meta collected at planning phase when manual compaction is
+     * triggered.
+     */
+    @Nullable private final PartitionedManifestMeta specifiedPartitionedManifestMeta;

Review Comment:
   just a shorter field name? `specifiedPartManifests`?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.PartitionedManifestMeta;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FileStoreSource}. */
+public class FileStoreSourceTest {
+
+    private static final RowType RECORD_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        new IntType(), new VarCharType(), new DoubleType(), new VarCharType()
+                    },
+                    new String[] {"k0", "k1", "v0", "v1"});
+
+    @MethodSource("parameters")
+    @ParameterizedTest
+    public void testSerDe(boolean hasPk, boolean partitioned, boolean specified)
+            throws ClassNotFoundException, IOException {
+        FileStore fileStore = buildFileStore(hasPk, partitioned);
+        Long specifiedSnapshotId = specified ? 1L : null;
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries =
+                specified ? buildManifestEntries(hasPk, partitioned) : null;
+        PartitionedManifestMeta partitionedManifestMeta =
+                specified
+                        ? new PartitionedManifestMeta(
+                                specifiedSnapshotId,
+                                specifiedManifestEntries,
+                                getPartitionType(partitioned).getFieldCount(),
+                                getKeyType(hasPk),
+                                getValueType(hasPk))
+                        : null;
+        FileStoreSource source =
+                new FileStoreSource(
+                        fileStore,
+                        !hasPk,
+                        true,
+                        Duration.ofSeconds(1).toMillis(),
+                        true,
+                        null,
+                        null,
+                        null,
+                        partitionedManifestMeta);
+        Object object = readObject(writeObject(source));
+        assertThat(object).isInstanceOf(FileStoreSource.class);
+        FileStoreSource deserialized = (FileStoreSource) object;
+        assertThat(deserialized.getBoundedness()).isEqualTo(source.getBoundedness());
+        if (specified) {
+            assertThat(deserialized.getSpecifiedPartitionedManifestMeta())
+                    .isEqualTo(source.getSpecifiedPartitionedManifestMeta());
+        } else {
+            assertThat(deserialized.getSpecifiedPartitionedManifestMeta()).isNull();
+        }
+    }
+
+    private byte[] writeObject(FileStoreSource source) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(source);
+        oos.close();
+        return baos.toByteArray();
+    }
+
+    private Object readObject(byte[] bytes) throws IOException, ClassNotFoundException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        Object object = ois.readObject();
+        ois.close();
+        return object;
+    }
+
+    public static Stream<Arguments> parameters() {
+        // hasPk, partitioned, specified
+        return Stream.of(
+                Arguments.of(true, true, false),
+                Arguments.of(true, false, false),
+                Arguments.of(false, false, false),
+                Arguments.of(false, true, false),
+                Arguments.of(true, true, true),
+                Arguments.of(true, false, true),
+                Arguments.of(false, false, true),
+                Arguments.of(false, true, true));
+    }
+
+    private static FileStore buildFileStore(boolean hasPk, boolean partitioned) {
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat", "db", "tbl");
+        String user = "user";
+        RowType partitionType = getPartitionType(partitioned);
+        RowType keyType = getKeyType(hasPk);
+        RowType valueType = getValueType(hasPk);
+        MergeFunction mergeFunction =
+                hasPk ? new DeduplicateMergeFunction() : new ValueCountMergeFunction();
+        return new FileStoreImpl(
+                tableIdentifier,
+                new Configuration(),
+                user,
+                partitionType,
+                keyType,
+                valueType,
+                mergeFunction);
+    }
+
+    private static Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> buildManifestEntries(
+            boolean hasPk, boolean partitioned) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries = new HashMap<>();
+        Map<Integer, List<DataFileMeta>> bucketEntries = new HashMap<>();
+        int totalBuckets = new Random().nextInt(10) + 1;
+        IntStream.range(0, totalBuckets)
+                .forEach(
+                        bucket ->
+                                bucketEntries.put(
+                                        bucket,
+                                        genMinMax(hasPk).stream()
+                                                .map(
+                                                        tuple -> {

Review Comment:
   This format looks difficult, can we switch to using foreach?
   It is not recommended to use too complex logic in lambda.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Manifest entries per partitioned with the corresponding snapshot id. */
+public class PartitionedManifestMeta implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
+    private final Long snapshotId;
+
+    /** The manifest entries collected at planning phase when manual compaction is triggered. */
+    private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries;
+
+    private final int partFieldCount;
+    private final RowType keyType;
+    private final RowType valueType;
+
+    public PartitionedManifestMeta(
+            Long snapshotId,
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries,
+            int partFieldCount,
+            RowType keyType,
+            RowType valueType) {
+        Preconditions.checkNotNull(snapshotId, "Specified snapshot should not be null.");
+        Preconditions.checkNotNull(
+                specifiedManifestEntries, "Specified manifest entries should not be null.");
+        this.snapshotId = snapshotId;
+        this.manifestEntries = specifiedManifestEntries;
+        this.partFieldCount = partFieldCount;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    public Long getSnapshotId() {
+        return snapshotId;
+    }
+
+    public Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> getManifestEntries() {
+        return manifestEntries;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+
+        BinaryRowDataSerializer partSerializer = new BinaryRowDataSerializer(partFieldCount);
+        DataFileMetaSerializer metaSerializer = new DataFileMetaSerializer(keyType, valueType);
+        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+        view.writeInt(manifestEntries.size());
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                manifestEntries.entrySet()) {
+            partSerializer.serialize(partEntry.getKey(), view);
+            Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+            view.writeInt(bucketEntry.size());
+            for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                view.writeInt(entry.getKey());
+                view.writeInt(entry.getValue().size());
+                for (DataFileMeta meta : entry.getValue()) {
+                    metaSerializer.serialize(meta, view);
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+        in.defaultReadObject();
+
+        manifestEntries = new HashMap<>();
+        BinaryRowDataSerializer partSerializer = new BinaryRowDataSerializer(partFieldCount);
+        DataFileMetaSerializer metaSerializer = new DataFileMetaSerializer(keyType, valueType);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        int partitionNum = view.readInt();
+        while (partitionNum > 0) {

Review Comment:
   Rather than maintaining a `partitionNum` variable, I find the `for (int i = 0....)` is more straightforward



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] openinx commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
openinx commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868754349


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -69,7 +92,10 @@ public FileStoreSource(
             boolean latestContinuous,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
-            @Nullable Predicate fieldPredicate) {
+            @Nullable Predicate fieldPredicate,
+            @Nullable Long specifiedSnapshotId,
+            @Nullable
+                    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries) {

Review Comment:
   What's the case that we will set those two fields with non-null values ?  I see all of them will be set to be `null` in the `buildFileStore`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");

Review Comment:
   The error message looks quite strange for me because I'm just curious :  Is it the only case that we will set `specifiedSnapshotId` and `specifiedManifestEntries` for manual compaction ?  Will be other case that we will set the two fields ? 



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();

Review Comment:
   What's the meaning of suffix `ctr`  ?  partition number or partition count ? So why not just use partitionNum ?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();
+            while (partitionCtr > 0) {
+                BinaryRowData partition = partSerializer.deserialize(view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+                int bucketCtr = view.readInt();
+                while (bucketCtr > 0) {
+                    int bucket = view.readInt();
+                    int entryCtr = view.readInt();
+                    if (entryCtr == 0) {
+                        bucketEntry.put(bucket, Collections.emptyList());
+                    } else {
+                        List<DataFileMeta> metas = new ArrayList<>();
+                        while (entryCtr > 0) {
+                            metas.add(metaSerializer.deserialize(view));
+                            entryCtr--;
+                        }
+                        bucketEntry.put(bucket, metas);
+                    }
+                    bucketCtr--;
+                }
+                specifiedManifestEntries.put(partition, bucketEntry);
+                partitionCtr--;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FileStoreSource)) {
+            return false;
+        }
+        FileStoreSource that = (FileStoreSource) o;
+        return valueCountMode == that.valueCountMode
+                && isContinuous == that.isContinuous
+                && discoveryInterval == that.discoveryInterval
+                && latestContinuous == that.latestContinuous
+                && fileStore.equals(that.fileStore)
+                && Arrays.equals(projectedFields, that.projectedFields)
+                && Objects.equals(partitionPredicate, that.partitionPredicate)
+                && Objects.equals(fieldPredicate, that.fieldPredicate)
+                && Objects.equals(specifiedSnapshotId, that.specifiedSnapshotId)
+                && Objects.equals(specifiedManifestEntries, that.specifiedManifestEntries);
+    }
+
+    @Override
+    public int hashCode() {
+        int result =
+                Objects.hash(
+                        fileStore,
+                        valueCountMode,
+                        isContinuous,
+                        discoveryInterval,
+                        latestContinuous,
+                        partitionPredicate,
+                        fieldPredicate,
+                        specifiedSnapshotId,
+                        specifiedManifestEntries);
+        result = 31 * result + Arrays.hashCode(projectedFields);

Review Comment:
   It's recommended to use `Arrays.deepHashCode` for multiple dimension array, which is similar to the above comment.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");
+            return new StaticFileStoreSplitEnumerator(
+                    context,
+                    scan.snapshot(specifiedSnapshotId),
+                    new FileStoreSourceSplitGenerator().createSplits(specifiedManifestEntries));
+        }

Review Comment:
   Nit: Please left an empty line for separating the different code blocks,  that makes code more clear for the reviewer.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {

Review Comment:
   Could we just introduce two separate class named `PartMeta` and `BucketMeta` to manage all the `DataFileMetaData` for a given partition and a given bucket, and moving this serializer & deserializer logic into those two classes ?  



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();
+            while (partitionCtr > 0) {
+                BinaryRowData partition = partSerializer.deserialize(view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+                int bucketCtr = view.readInt();
+                while (bucketCtr > 0) {
+                    int bucket = view.readInt();
+                    int entryCtr = view.readInt();
+                    if (entryCtr == 0) {
+                        bucketEntry.put(bucket, Collections.emptyList());
+                    } else {
+                        List<DataFileMeta> metas = new ArrayList<>();
+                        while (entryCtr > 0) {
+                            metas.add(metaSerializer.deserialize(view));
+                            entryCtr--;
+                        }
+                        bucketEntry.put(bucket, metas);
+                    }
+                    bucketCtr--;
+                }
+                specifiedManifestEntries.put(partition, bucketEntry);
+                partitionCtr--;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FileStoreSource)) {
+            return false;
+        }
+        FileStoreSource that = (FileStoreSource) o;
+        return valueCountMode == that.valueCountMode
+                && isContinuous == that.isContinuous
+                && discoveryInterval == that.discoveryInterval
+                && latestContinuous == that.latestContinuous
+                && fileStore.equals(that.fileStore)
+                && Arrays.equals(projectedFields, that.projectedFields)

Review Comment:
   It's recommended to use `deepEquals` for the multiple dimension arrays because the `Arrays.equals` will only check the object quality by using `Objects.equals()` for internal element, which does not work for an element whose type is a `int[]`. 
   
   
   ```java
   public static boolean equals(Object[] a, Object[] a2) {
           if (a==a2)
               return true;
           if (a==null || a2==null)
               return false;
   
           int length = a.length;
           if (a2.length != length)
               return false;
   
           for (int i=0; i<length; i++) {
               if (!Objects.equals(a[i], a2[i]))
                   return false;
           }
   
           return true;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868777111


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer getSplitSerializer() {
     public PendingSplitsCheckpointSerializer getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), fileStore.valueType());
+            DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();

Review Comment:
   > What's the meaning of suffix ctr ? partition number or partition count ? So why not just use partitionNum ?
   
   `ctr` is abbr. for the counter, stands for the size of the `Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>` .
   (<partition, <bucket, meta-list>>). Put `ctr` as a suffix is due to this value is mutable, while `partitionNum` should be a kind of immutable concept I think. Do you feel this is better?
   ```java
   int partitionNum = view.readInt();
   int partitionCtr = partitionNum;
   while(partitionCtr > 0) {
       ...
       partitionCtr--;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868886347


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java:
##########
@@ -60,4 +62,21 @@ private long count(RowData value) {
         checkArgument(!value.isNullAt(0), "Value count should not be null.");
         return value.getLong(0);
     }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   ditto, see above comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868771446


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");

Review Comment:
   Let me give a detailed explanation about 
   > `ALTER TABLE ... COMPACT` will pre-scan the latest snapshot during the planning phase
   
   `ALTER TABLE ... COMPACT` will be first converted to a `SinkModifyOperation` with `SourceQueryOperation` as the only child (on the Flink side). And then the `ManagedTableFactory#onCompactTable` is invoked, the impl(back to the TableStore) will perform a scan, collect manifest entries and the corresponding snapshot id, and serialize as a string. It will be put back to enriched options. So at the runtime, when the source is initialized, the splits can be directly generated from the options. This PR aims to pave the way for skipping runtime scans for this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868883566


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +72,13 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
+    @Nullable private final Long specifiedSnapshotId;
+
+    /** The manifest entries collected at planning phase when manual compaction is triggered. */
+    @Nullable
+    private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries;

Review Comment:
   Can we just create a `SpecificDataFiles` class which implements `Serializable`.
   And
   - `Preconditions.checkNotNull(specifiedManifestEntries)` should in it.
   - We dont need to have a `PartitionedDataManifestSerializer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868906444


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +72,13 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
+    @Nullable private final Long specifiedSnapshotId;
+
+    /** The manifest entries collected at planning phase when manual compaction is triggered. */
+    @Nullable
+    private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries;

Review Comment:
   > Can we just create a `SpecificDataFiles` class which implements `Serializable`. And
   > 
   > * `Preconditions.checkNotNull(specifiedManifestEntries)` should in it.
   > * We dont need to have a `PartitionedDataManifestSerializer`.
   
   Do you mean creating a new serializable class like
   ```java
   public class SpecificDataFiles implements Serializable {
       Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> meta;
   }
   ```
   Correct me if I'm wrong, but `BinaryRowData` and `DataFileMeta` are still not serializable, anyway we need to implement the ser/deser logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868771446


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +150,17 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (specifiedSnapshotId != null) {
+            Preconditions.checkNotNull(
+                    specifiedManifestEntries,
+                    "The manifest entries cannot be null for manual compaction.");

Review Comment:
   Let me give a detailed explanation about 
   > `ALTER TABLE ... COMPACT` will pre-scan the latest snapshot during the planning phase
   
   `ALTER TABLE ... COMPACT` will be first converted to a `SinkModifyOperation` with `SourceQueryOperation` as the only child (on the Flink side). And then the `ManagedTableFactory#onCompactTable` is invoked, the impl(back to the TableStore) will perform a scan, collected manifest entries and the corresponding snapshot id, and serialized as a string. It will be put back to enriched options. So at the runtime, when the source is initialized, the splits can be directly generated from the options. This PR aims to pave the way for skipping runtime scans for this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868906444


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +72,13 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /** The latest snapshot id seen at planning phase when manual compaction is triggered. */
+    @Nullable private final Long specifiedSnapshotId;
+
+    /** The manifest entries collected at planning phase when manual compaction is triggered. */
+    @Nullable
+    private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> specifiedManifestEntries;

Review Comment:
   > Can we just create a `SpecificDataFiles` class which implements `Serializable`. And
   > 
   > * `Preconditions.checkNotNull(specifiedManifestEntries)` should in it.
   > * We dont need to have a `PartitionedDataManifestSerializer`.
   
   Do you mean creating a new serializable class like
   ```java
   public class SpecificDataFiles implements Serializable {
       Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
   }
   ```
   Correct me if I'm wrong, but `BinaryRowData` and `DataFileMeta` are still not serializable, anyway we need to implement the ser/deser logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #111: [FLINK-27540] Let FileStoreSource accept pre-planned manifest entries

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #111:
URL: https://github.com/apache/flink-table-store/pull/111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org