You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/13 07:31:52 UTC

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #433: [FLINK-30210] Refactor StoreCompactOperator to accept records containing partitions and buckets in Table Store

tsreaper commented on code in PR #433:
URL: https://github.com/apache/flink-table-store/pull/433#discussion_r1046746710


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java:
##########
@@ -18,99 +18,70 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.utils.OffsetRowData;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
 
 /** A dedicated operator for manual triggered compaction. */
 public class StoreCompactOperator extends PrepareCommitOperator {
 
-    private static final Logger LOG = LoggerFactory.getLogger(StoreCompactOperator.class);
-
     private final FileStoreTable table;
-    private final String commitUser;
-    @Nullable private final Map<String, String> compactPartitionSpec;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
+    private final boolean isStreaming;
 
-    private TableScan scan;
-    private TableWrite write;
+    private transient StoreSinkWrite write;
+    private transient RowDataSerializer partitionSerializer;
+    private transient OffsetRowData reusedPartition;
 
     public StoreCompactOperator(
             FileStoreTable table,
-            String commitUser,
-            @Nullable Map<String, String> compactPartitionSpec) {
+            StoreSinkWrite.Provider storeSinkWriteProvider,
+            boolean isStreaming) {
+        Preconditions.checkArgument(

Review Comment:
   We'll introduce a `TableStoreCompactJob` so user can submit this job class to Flink. This job class will disable "write.compaction-skip" so user does not need to care about it.
   
   This `checkArgument` is just for sanity check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org