You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/08 22:19:09 UTC

[GitHub] [flink-table-store] LadyForest opened a new pull request, #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

LadyForest opened a new pull request, #138:
URL: https://github.com/apache/flink-table-store/pull/138

   This PR implements `onCompactTable` to perform the scan on the latest snapshot and (pick data files to compact for non-rescale bucket condition) and serialize the meta to JSON string.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893116440


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,

Review Comment:
   > And we can not new a separate thread for this bucket.
   
   Do you mean that different buckets share the same compaction thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893131205


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,
+            DataFileWriter dataFileWriter) {
+        this.unit = unit;
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory("compaction-thread"));
+        this.compactor =
+                new CompactManager(
+                        compactExecutor,
+                        (numLevels, runs) ->
+                                Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
+                        keyComparator,
+                        targetFileSize,
+                        rewriter);
+        this.dataFileWriter = dataFileWriter;
+    }
+
+    @Override
+    public Increment prepareCommit() throws IOException, InterruptedException {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        if (compactor.isCompactionFinished()) {
+            compactor.submitCompaction(unit, true);
+        }
+        while (!compactor.isCompactionFinished()) {
+            try {
+                compactor
+                        .finishCompaction(true)
+                        .ifPresent(
+                                result -> {
+                                    compactBefore.addAll(result.before());
+                                    compactAfter.addAll(result.after());
+                                });
+            } catch (ExecutionException e) {
+                throw new IOException(e.getCause());
+            }
+        }
+        return Increment.forCompact(compactBefore, compactAfter);
+    }
+
+    @Override
+    public List<DataFileMeta> close() throws Exception {
+        compactExecutor.shutdownNow();
+        List<DataFileMeta> delete = new ArrayList<>();
+        try {
+            compactor.finishCompaction(true).ifPresent(result -> delete.addAll(result.after()));

Review Comment:
   > Do not block `close` here. Currently, we don't have a good way to remove compaction's temporary files. This requires `CompactManager` to keep track of the intermediate files, and you can just leave a TODO here for now.
   
   I found that orphan files are auto-cleaned by `RollingKvWriter#abort`. So I think `close` can stay aside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893749689


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +186,11 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+        newOptions.put(
+                COMPACTION_PARTITION_SPEC.key(),
+                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));

Review Comment:
   > Compact without partition is not supported now?
   
   For partitioned table, both `ALTER TABLE table_identifier COMPACT` and `ALTER TABLE table_identifier PARTITION (part_spec) COMPACT` are supported. If the user specifies a partition to compact, we need to keep track of that info in the options. Otherwise, we cannot know which partition needs to be compacted.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884390344


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java:
##########
@@ -61,20 +61,24 @@ public FileStorePathFactory(Path root, RowType partitionType, String defaultPart
         this.root = root;
         this.uuid = UUID.randomUUID().toString();
 
-        String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
-        this.partitionComputer =
-                new RowDataPartitionComputer(
-                        defaultPartValue,
-                        partitionColumns,
-                        partitionType.getFields().stream()
-                                .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
-                                .toArray(DataType[]::new),
-                        partitionColumns);
+        this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
     }
 
+    public static RowDataPartitionComputer getPartitionComputer(

Review Comment:
   > Just for test?
   
   Maybe for logging too. Currently, the log does not reveal the readable partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882370652


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new PartitionedManifestMeta(plan.snapshotId(), groupBy)));
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+        UniversalCompaction compaction =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent,
+                        options.sizeRatio,
+                        options.numSortedRunCompactionTrigger);
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                Levels levels =

Review Comment:
   > 1. the file is too small, smaller than the target file size
   > 2. there is overlap between the files (we can use IntervalPartition)
   
   I wondered why we use different strategies for manual invoked compaction and auto-compaction? Can we be sure that the proposed option can invoke universal compaction? I've checked the logic in the universal compaction, and the pick strategy is far more complicated than this one.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884388037


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =

Review Comment:
   Here we cannot pick small files at random, which would cause the merged files min and max to skip the middle data.
   For example:
   inputs: File1(0-9) File2(10-89) File3(90-100)
   merged: File4(0-100) File2(10-89)
   This can lead to results with overlapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest closed pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest closed pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable
URL: https://github.com/apache/flink-table-store/pull/138


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r892008041


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -79,6 +82,7 @@ public FileStoreSource(
             boolean isContinuous,
             long discoveryInterval,
             boolean latestContinuous,
+            boolean nonRescaleCompact,

Review Comment:
   > FromElementsSource
   
   `FromElementsSource` resides in package `flink-connector-test-utils`, is it proper to use?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r892147691


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +156,8 @@ static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
                                 Map.Entry::getValue));
     }
 
-    static TableStore buildTableStore(DynamicTableFactory.Context context) {
+    @VisibleForTesting

Review Comment:
   revert it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893493060


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +186,11 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+        newOptions.put(
+                COMPACTION_PARTITION_SPEC.key(),
+                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));

Review Comment:
   Compact without partition is not supported now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #138:
URL: https://github.com/apache/flink-table-store/pull/138


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884375368


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +155,7 @@ static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
                                 Map.Entry::getValue));
     }
 
-    static TableStore buildTableStore(DynamicTableFactory.Context context) {

Review Comment:
   Add document



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884472377


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(

Review Comment:
   > You have picked files here, but how to make sure that writer will compact these files?
   
   As offline discussed, the main purpose for `ALTER TABLE COMPACT` is to squeeze those files which have key range overlapped or too small. It is not exactly what universal compaction does. As a result, when after picking these files at the planning phase, the runtime should not pick them again, because they are already picked. So `FileStoreWriteImpl` should create different compact strategies for ① the auto-compaction triggered by ordinary writes v.s. ② the manual triggered compaction. For the latter, the strategy should directly return all the files it receives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884472377


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(

Review Comment:
   > You have picked files here, but how to make sure that writer will compact these files?
   
   As offline discussed, the main purpose for `ALTER TABLE COMPACT` is to squeeze those files which have key range overlapped or too small. It is not exactly what universal compaction does. As a result, when after picking these files at the planning phase, the runtime should not pick them again, because they are already picked. So `FileStoreWriteImpl` should create different compact strategies for the auto-compaction for normal write v.s. the manual triggered compaction. For the latter, the strategy should directly return all the files it receives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r892010987


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -79,6 +82,7 @@ public FileStoreSource(
             boolean isContinuous,
             long discoveryInterval,
             boolean latestContinuous,
+            boolean nonRescaleCompact,

Review Comment:
   > > FromElementsSource
   > 
   > `FromElementsSource` resides in package `flink-connector-test-utils`, is it proper to use?
   
   Or just impl a new source like `FromElementsSource`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884388037


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =

Review Comment:
   Here we cannot pick small files at random, which would cause the merged files min and max to skip the middle data.
   For example:
   inputs: File1(0-10) File2(10-90) File3(90-100)
   merged: File4(0-100) File2(10-90)
   This can lead to results with overlapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r892149226


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java:
##########
@@ -47,15 +47,20 @@ public class TableStoreFactoryOptions {
                                     + "of a partition/table.");
 
     @Internal
-    public static final ConfigOption<String> COMPACTION_SCANNED_MANIFEST =
-            ConfigOptions.key("compaction.scanned-manifest")
+    public static final ConfigOption<Boolean> COMPACTION_MANUAL_TRIGGERED =
+            ConfigOptions.key("compaction.manual-triggered")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "An internal flag to indicate a manual triggered non-rescale bucket compaction.");

Review Comment:
   An internal flag to indicate a manual triggered compaction job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest closed pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest closed pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable
URL: https://github.com/apache/flink-table-store/pull/138


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#issuecomment-1148279939

   **Update** 
   For non-rescale bucket compaction, we don't perform a scan at the planning phase. Instead, we put a flag along with part spec to indicate it is ordinary manual trigger compaction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882251162


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +155,7 @@ static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
                                 Map.Entry::getValue));
     }
 
-    static TableStore buildTableStore(DynamicTableFactory.Context context) {

Review Comment:
   Why remove static?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new PartitionedManifestMeta(plan.snapshotId(), groupBy)));

Review Comment:
   Another choice is just `base64(InstantiationUtil.serializeObject(new PartitionedManifestMeta))`.
   This could be simpler. ManifestEntry is already unreadable anyway, so make it all unreadable?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new PartitionedManifestMeta(plan.snapshotId(), groupBy)));
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+        UniversalCompaction compaction =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent,
+                        options.sizeRatio,
+                        options.numSortedRunCompactionTrigger);
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                Levels levels =

Review Comment:
   I'm trying to figure out how to select the files that need to be compacted.
   A simple option would be:
   1. the file is too small, smaller than the target file size
   2. there is overlap between the files (we can use `IntervalPartition`)
   
   bucket of pick files is larger than 1.
   
   It looks like we dont need to use `Levels` and `UniversalCompaction`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
-            Comparator<RowData> keyComparator,
+            Supplier<Comparator<RowData>> keyComparatorSupplier,

Review Comment:
   Why we need to use `Supplier<Comparator<RowData>>`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884394501


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =

Review Comment:
   > For example:
   inputs: File1(0-10) File2(10-90) File3(90-100)
   merged: File4(0-100) File2(10-90)
   This can lead to results with overlapping.
   
   I agree with you that "we cannot pick small files at random". But the example you provide cannot prove this. These three files are all overlapped.  The small file threshold will pick File1(0-10) and File3(90-100), and the interval partition will pick all of them. So after deduplication, they all get compacted.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884379183


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
-            Comparator<RowData> keyComparator,
+            Supplier<Comparator<RowData>> keyComparatorSupplier,

Review Comment:
   Why a util method will modify `Comparator` to `Supplier`?
   We shouldn't create it repeatedly unless there is a thread safety issue here.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =
+                        bucketEntry.getValue().stream()
+                                .filter(fileMeta -> fileMeta.fileSize() < options.targetFileSize)
+                                .collect(Collectors.toList());
+                List<DataFileMeta> intersectedFiles =
+                        new IntervalPartition(bucketEntry.getValue(), keyComparator)
+                                .partition().stream()
+                                        .filter(section -> section.size() > 1)
+                                        .flatMap(Collection::stream)
+                                        .map(SortedRun::files)
+                                        .flatMap(Collection::stream)
+                                        .collect(Collectors.toList());
+
+                List<DataFileMeta> filteredFiles =

Review Comment:
   `bucketFiles`?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
         }
     }
 
+    @Test
+    public void testOnCompactTableForNoSnapshot() {
+        RowType partType = RowType.of();
+        MockTableStoreManagedFactory mockTableStoreManagedFactory =
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE);
+        prepare(
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                true);
+        assertThatThrownBy(
+                        () ->
+                                mockTableStoreManagedFactory.onCompactTable(
+                                        context, new CatalogPartitionSpec(emptyMap())))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("The specified table to compact does not exist any snapshot");
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) throws Exception {
+        RowType partType = RowType.of();
+        runTest(
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(
+                        SINGLE_PARTITIONED_PART_TYPE, SINGLE_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                SINGLE_PARTITIONED_PART_TYPE,
+                SINGLE_PARTITIONED_ROW_TYPE,
+                SINGLE_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(),
+                TABLE + "_" + UUID.randomUUID(),
+                DEFAULT_PART_TYPE,
+                DEFAULT_ROW_TYPE,
+                MULTI_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @MethodSource("provideManifest")
+    @ParameterizedTest

Review Comment:
   Can we have a name for `ParameterizedTest`?
   Very difficult to maintain without a name.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();

Review Comment:
   Maybe a better name: `partFiles`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java:
##########
@@ -61,20 +61,24 @@ public FileStorePathFactory(Path root, RowType partitionType, String defaultPart
         this.root = root;
         this.uuid = UUID.randomUUID().toString();
 
-        String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
-        this.partitionComputer =
-                new RowDataPartitionComputer(
-                        defaultPartValue,
-                        partitionColumns,
-                        partitionType.getFields().stream()
-                                .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
-                                .toArray(DataType[]::new),
-                        partitionColumns);
+        this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
     }
 
+    public static RowDataPartitionComputer getPartitionComputer(

Review Comment:
   Just for test?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(

Review Comment:
   You have picked files here, but how to make sure that writer will compact these files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893276249


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,

Review Comment:
   yes, compaction can be same with normal writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r891971268


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor<WriterStateT> extends StoreSinkWriterBase<WriterStateT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+    private final int subTaskId;
+    private final int numOfParallelInstances;
+
+    private final FileStore fileStore;
+    private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> partitionedMeta;
+    private final Map<String, String> partitionSpec;
+
+    public StoreSinkCompactor(
+            int subTaskId,
+            int numOfParallelInstances,
+            FileStore fileStore,
+            Map<String, String> partitionSpec) {
+        this.subTaskId = subTaskId;
+        this.numOfParallelInstances = numOfParallelInstances;
+        this.fileStore = fileStore;
+        this.partitionSpec = partitionSpec;
+        this.partitionedMeta = new HashMap<>();
+    }
+
+    @Override
+    public void flush(boolean endOfInput) {
+        if (endOfInput) {
+            FileStoreScan.Plan plan =
+                    fileStore
+                            .newScan()
+                            .withPartitionFilter(
+                                    PredicateConverter.CONVERTER.fromMap(
+                                            partitionSpec, fileStore.partitionType()))
+                            .plan();
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    plan.groupByPartFiles().entrySet()) {
+                BinaryRowData partition = partEntry.getKey();
+                for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                        partEntry.getValue().entrySet()) {
+                    int bucket = bucketEntry.getKey();
+                    if (select(partition, bucket)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "Assign partition {}, bucket {} to subtask {}",
+                                    FileStorePathFactory.getPartitionComputer(
+                                                    fileStore.partitionType(),
+                                                    FileSystemConnectorOptions
+                                                            .PARTITION_DEFAULT_NAME
+                                                            .defaultValue())
+                                            .generatePartValues(partition),
+                                    bucket,
+                                    subTaskId);
+                        }
+                        partitionedMeta

Review Comment:
   You don't need to store these meta in a class member. You can just create a Writer here.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriterBase.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The base class for file store sink writers.
+ *
+ * @param <WriterStateT> The type of the writer's state.
+ */
+public abstract class StoreSinkWriterBase<WriterStateT>

Review Comment:
   `StoreSinkWriterBase` doesn't seem to help `StoreSinkCompactor` much. You can see my comments in `StoreSinkCompactor`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor<WriterStateT> extends StoreSinkWriterBase<WriterStateT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+    private final int subTaskId;
+    private final int numOfParallelInstances;
+
+    private final FileStore fileStore;
+    private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> partitionedMeta;
+    private final Map<String, String> partitionSpec;
+
+    public StoreSinkCompactor(
+            int subTaskId,
+            int numOfParallelInstances,
+            FileStore fileStore,
+            Map<String, String> partitionSpec) {
+        this.subTaskId = subTaskId;
+        this.numOfParallelInstances = numOfParallelInstances;
+        this.fileStore = fileStore;
+        this.partitionSpec = partitionSpec;
+        this.partitionedMeta = new HashMap<>();
+    }
+
+    @Override
+    public void flush(boolean endOfInput) {
+        if (endOfInput) {
+            FileStoreScan.Plan plan =
+                    fileStore
+                            .newScan()
+                            .withPartitionFilter(
+                                    PredicateConverter.CONVERTER.fromMap(
+                                            partitionSpec, fileStore.partitionType()))
+                            .plan();
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                    plan.groupByPartFiles().entrySet()) {
+                BinaryRowData partition = partEntry.getKey();
+                for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                        partEntry.getValue().entrySet()) {
+                    int bucket = bucketEntry.getKey();
+                    if (select(partition, bucket)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "Assign partition {}, bucket {} to subtask {}",
+                                    FileStorePathFactory.getPartitionComputer(
+                                                    fileStore.partitionType(),
+                                                    FileSystemConnectorOptions
+                                                            .PARTITION_DEFAULT_NAME
+                                                            .defaultValue())
+                                            .generatePartValues(partition),
+                                    bucket,
+                                    subTaskId);
+                        }
+                        partitionedMeta
+                                .computeIfAbsent(partition, k -> new HashMap<>())
+                                .computeIfAbsent(bucket, k -> new ArrayList<>())
+                                .addAll(bucketEntry.getValue());
+                        RecordWriter writer = getWriter(partition, bucket);
+                        try {
+                            writer.flush();

Review Comment:
   You can just close the writer and create the `FileCommittable`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -79,6 +82,7 @@ public FileStoreSource(
             boolean isContinuous,
             long discoveryInterval,
             boolean latestContinuous,
+            boolean nonRescaleCompact,

Review Comment:
   Can we keep this class as it is?
   We can just create empty `FromElementsSource` for this?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -105,6 +105,9 @@ public void sync() throws Exception {
         // Do nothing here, as this writer don't introduce any async compaction thread currently.
     }
 
+    @Override
+    public void flush() throws Exception {}

Review Comment:
   endInput?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r885259036


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =

Review Comment:
   > inputs: File1(0-9) File2(10-89) File3(90-100)
   merged: File4(0-100) File2(10-89)
   
   Good example. It is a problem when they're already sitting at the highest level. And even though they're not at the highest level, the compaction cannot get rid of key range overlap. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r885233558


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
         }
     }
 
+    @Test
+    public void testOnCompactTableForNoSnapshot() {
+        RowType partType = RowType.of();
+        MockTableStoreManagedFactory mockTableStoreManagedFactory =
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE);
+        prepare(
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                true);
+        assertThatThrownBy(
+                        () ->
+                                mockTableStoreManagedFactory.onCompactTable(
+                                        context, new CatalogPartitionSpec(emptyMap())))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("The specified table to compact does not exist any snapshot");
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) throws Exception {
+        RowType partType = RowType.of();
+        runTest(
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(
+                        SINGLE_PARTITIONED_PART_TYPE, SINGLE_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                SINGLE_PARTITIONED_PART_TYPE,
+                SINGLE_PARTITIONED_ROW_TYPE,
+                SINGLE_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(),
+                TABLE + "_" + UUID.randomUUID(),
+                DEFAULT_PART_TYPE,
+                DEFAULT_ROW_TYPE,
+                MULTI_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @MethodSource("provideManifest")
+    @ParameterizedTest

Review Comment:
   [Is it possible to not use `MethodSource` in this case, the name generated by the test is difficult to be maintained.](https://stackoverflow.com/questions/57892989/generating-display-names-for-parameterizedtest-in-junit-5)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893493058


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +186,11 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+        newOptions.put(
+                COMPACTION_PARTITION_SPEC.key(),
+                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));

Review Comment:
   Compact without partition is not supported now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r892163358


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -223,21 +217,15 @@ public void testCreateAndCheckTableStore(
         ResolvedCatalogTable catalogTable =
                 createResolvedTable(
                         singletonMap(
-                                "path", sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()),
+                                PATH.key(),
+                                sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()),
                         rowType,
                         partitions,
                         primaryKeys);
-        context =
-                new FactoryUtil.DefaultDynamicTableContext(
-                        TABLE_IDENTIFIER,
-                        catalogTable,
-                        emptyMap(),
-                        Configuration.fromMap(emptyMap()),
-                        Thread.currentThread().getContextClassLoader(),
-                        false);
+        context = createEnrichedContext(TABLE_IDENTIFIER, catalogTable);
         if (expectedResult.success) {
             tableStoreManagedFactory.onCreateTable(context, false);
-            TableStore tableStore = AbstractTableStoreFactory.buildTableStore(context);
+            TableStore tableStore = tableStoreManagedFactory.buildTableStore(context);

Review Comment:
   revert this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882366370


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
-            Comparator<RowData> keyComparator,
+            Supplier<Comparator<RowData>> keyComparatorSupplier,

Review Comment:
   Since `pickManifest` needs `keyComparator`, so I extract a util method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884378698


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
         }
     }
 
+    @Test
+    public void testOnCompactTableForNoSnapshot() {
+        RowType partType = RowType.of();
+        MockTableStoreManagedFactory mockTableStoreManagedFactory =
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE);
+        prepare(
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                true);
+        assertThatThrownBy(
+                        () ->
+                                mockTableStoreManagedFactory.onCompactTable(
+                                        context, new CatalogPartitionSpec(emptyMap())))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("The specified table to compact does not exist any snapshot");
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) throws Exception {
+        RowType partType = RowType.of();
+        runTest(
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(
+                        SINGLE_PARTITIONED_PART_TYPE, SINGLE_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                SINGLE_PARTITIONED_PART_TYPE,
+                SINGLE_PARTITIONED_ROW_TYPE,
+                SINGLE_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(),
+                TABLE + "_" + UUID.randomUUID(),
+                DEFAULT_PART_TYPE,
+                DEFAULT_ROW_TYPE,
+                MULTI_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @MethodSource("provideManifest")
+    @ParameterizedTest

Review Comment:
   Can we have a name for each parameter?
   Very difficult to maintain without a name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884456680


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
         }
     }
 
+    @Test
+    public void testOnCompactTableForNoSnapshot() {
+        RowType partType = RowType.of();
+        MockTableStoreManagedFactory mockTableStoreManagedFactory =
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE);
+        prepare(
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                true);
+        assertThatThrownBy(
+                        () ->
+                                mockTableStoreManagedFactory.onCompactTable(
+                                        context, new CatalogPartitionSpec(emptyMap())))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("The specified table to compact does not exist any snapshot");
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) throws Exception {
+        RowType partType = RowType.of();
+        runTest(
+                new MockTableStoreManagedFactory(partType, NON_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(
+                        SINGLE_PARTITIONED_PART_TYPE, SINGLE_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                SINGLE_PARTITIONED_PART_TYPE,
+                SINGLE_PARTITIONED_ROW_TYPE,
+                SINGLE_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(),
+                TABLE + "_" + UUID.randomUUID(),
+                DEFAULT_PART_TYPE,
+                DEFAULT_ROW_TYPE,
+                MULTI_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @MethodSource("provideManifest")
+    @ParameterizedTest

Review Comment:
   > Can we have a name for each parameter? Very difficult to maintain without a name.
   
   I cannot understand well what you mean. Do you suggest not use Junit5 parameterized test? Method source doc illustrates the usage.
   ![image](https://user-images.githubusercontent.com/55568005/170916432-3a9e59ca-3734-4ddb-aaa4-bf825e06f731.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r885235178


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =

Review Comment:
   Sorry for the misunderstanding. I have updated my case. What I mean is:
   ```
   inputs: File1(0-9) File2(10-89) File3(90-100)
   merged: File4(0-100) File2(10-89)
   ```
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r885234306


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java:
##########
@@ -61,20 +61,24 @@ public FileStorePathFactory(Path root, RowType partitionType, String defaultPart
         this.root = root;
         this.uuid = UUID.randomUUID().toString();
 
-        String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
-        this.partitionComputer =
-                new RowDataPartitionComputer(
-                        defaultPartValue,
-                        partitionColumns,
-                        partitionType.getFields().stream()
-                                .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
-                                .toArray(DataType[]::new),
-                        partitionColumns);
+        this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
     }
 
+    public static RowDataPartitionComputer getPartitionComputer(

Review Comment:
   Add a `@VisibleForTesting`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893749689


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +186,11 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
+        newOptions.put(
+                COMPACTION_PARTITION_SPEC.key(),
+                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));

Review Comment:
   > Compact without partition is not supported now?
   
   For partitioned table, both `ALTER TABLE table_identifier COMPACT` and `ALTER TABLE table_identifier PARTITION (part_spec) COMPACT` are supported. If the user specifies a partition to compact, we need to keep track of that info in the options. Otherwise, `StoreSinkCompactor` cannot know which partition needs to be compacted at runtime.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893116440


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,

Review Comment:
   > And we can not new a separate thread for this bucket.
   
   Do you mean that different buckets which are processed by the same sub-task share the same compaction thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893054629


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+    private final int subTaskId;
+    private final int numOfParallelInstances;
+
+    private final FileStore fileStore;
+    private final Map<String, String> partitionSpec;
+    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;

Review Comment:
   We don't need to store writers now.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,
+            DataFileWriter dataFileWriter) {
+        this.unit = unit;
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory("compaction-thread"));
+        this.compactor =
+                new CompactManager(
+                        compactExecutor,
+                        (numLevels, runs) ->
+                                Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
+                        keyComparator,
+                        targetFileSize,
+                        rewriter);
+        this.dataFileWriter = dataFileWriter;
+    }
+
+    @Override
+    public Increment prepareCommit() throws IOException, InterruptedException {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        if (compactor.isCompactionFinished()) {
+            compactor.submitCompaction(unit, true);
+        }
+        while (!compactor.isCompactionFinished()) {
+            try {
+                compactor
+                        .finishCompaction(true)
+                        .ifPresent(
+                                result -> {
+                                    compactBefore.addAll(result.before());
+                                    compactAfter.addAll(result.after());
+                                });
+            } catch (ExecutionException e) {
+                throw new IOException(e.getCause());
+            }
+        }
+        return Increment.forCompact(compactBefore, compactAfter);
+    }
+
+    @Override
+    public List<DataFileMeta> close() throws Exception {
+        compactExecutor.shutdownNow();
+        List<DataFileMeta> delete = new ArrayList<>();
+        try {
+            compactor.finishCompaction(true).ifPresent(result -> delete.addAll(result.after()));

Review Comment:
   Do not block `close` here.
   Currently we don't have a good way to remove compaction's temporary files. This requires `CompactManager` to keep track of the intermediate files, and you can just leave a TODO here for now.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,
+            DataFileWriter dataFileWriter) {
+        this.unit = unit;
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory("compaction-thread"));
+        this.compactor =
+                new CompactManager(
+                        compactExecutor,
+                        (numLevels, runs) ->
+                                Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
+                        keyComparator,
+                        targetFileSize,
+                        rewriter);
+        this.dataFileWriter = dataFileWriter;
+    }
+
+    @Override
+    public Increment prepareCommit() throws IOException, InterruptedException {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        if (compactor.isCompactionFinished()) {
+            compactor.submitCompaction(unit, true);
+        }
+        while (!compactor.isCompactionFinished()) {

Review Comment:
   Remove `while`? There is only one compaction.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,
+            DataFileWriter dataFileWriter) {
+        this.unit = unit;
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory("compaction-thread"));
+        this.compactor =
+                new CompactManager(
+                        compactExecutor,
+                        (numLevels, runs) ->
+                                Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
+                        keyComparator,
+                        targetFileSize,
+                        rewriter);
+        this.dataFileWriter = dataFileWriter;
+    }
+
+    @Override
+    public Increment prepareCommit() throws IOException, InterruptedException {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        if (compactor.isCompactionFinished()) {

Review Comment:
   If compaction not finish, just do nothing?
   I think here we can just check `CompactionFinished`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link RecordWriter} implementation that only perform compaction on existing records and does
+ * not generate new records.
+ */
+public class CompactWriter implements RecordWriter {
+
+    private final CompactUnit unit;
+    private final ExecutorService compactExecutor;
+    private final CompactManager compactor;
+    private final DataFileWriter dataFileWriter;
+
+    public CompactWriter(
+            CompactUnit unit,
+            Comparator<RowData> keyComparator,
+            long targetFileSize,
+            CompactManager.Rewriter rewriter,

Review Comment:
   Can we just pass a compact manager here?
   And we can not new a separate thread for this bucket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882372928


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new PartitionedManifestMeta(plan.snapshotId(), groupBy)));

Review Comment:
   I'm fine with it~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882357901


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +155,7 @@ static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
                                 Map.Entry::getValue));
     }
 
-    static TableStore buildTableStore(DynamicTableFactory.Context context) {

Review Comment:
   > Why remove static?
   
   For test purpose. `MockTableStoreManagedFactory` extends `TableStoreManagedFactory` and override `buildTableStore` creating `TestTableStore`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org