You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/26 02:20:36 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #138: [FLINK-27707] Implement ManagedTableFactory#onCompactTable

JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r882251162


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -155,7 +155,7 @@ static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
                                 Map.Entry::getValue));
     }
 
-    static TableStore buildTableStore(DynamicTableFactory.Context context) {

Review Comment:
   Why remove static?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new PartitionedManifestMeta(plan.snapshotId(), groupBy)));

Review Comment:
   Another choice is just `base64(InstantiationUtil.serializeObject(new PartitionedManifestMeta))`.
   This could be simpler. ManifestEntry is already unreadable anyway, so make it all unreadable?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +200,70 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = plan.groupByPartFiles();
+        if (!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        newOptions.put(
+                COMPACTION_SCANNED_MANIFEST.key(),
+                JsonSerdeUtil.toJson(new PartitionedManifestMeta(plan.snapshotId(), groupBy)));
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new HashMap<>();
+        UniversalCompaction compaction =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent,
+                        options.sizeRatio,
+                        options.numSortedRunCompactionTrigger);
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                Levels levels =

Review Comment:
   I'm trying to figure out how to select the files that need to be compacted.
   A simple option would be:
   1. the file is too small, smaller than the target file size
   2. there is overlap between the files (we can use `IntervalPartition`)
   
   bucket of pick files is larger than 1.
   
   It looks like we dont need to use `Levels` and `UniversalCompaction`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
-            Comparator<RowData> keyComparator,
+            Supplier<Comparator<RowData>> keyComparatorSupplier,

Review Comment:
   Why we need to use `Supplier<Comparator<RowData>>`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org