You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/24 09:11:01 UTC

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

tsreaper commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r905872413


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java:
##########
@@ -38,20 +46,33 @@
  *
  * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}.
  */
-public abstract class AbstractTableWrite<T> implements TableWrite {
+public abstract class AbstractTableWrite<T>
+        implements TableWrite, MemoryPoolFactory.PreemptRunner<RecordWriter<T>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTableWrite.class);
 
     private final FileStoreWrite<T> write;
     private final SinkRecordConverter recordConverter;
 
     private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
     private final ExecutorService compactExecutor;
+    private final MemoryPoolFactory<RecordWriter<T>> memoryPoolFactory;
 
     private boolean overwrite = false;
 
-    protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter recordConverter) {
+    protected AbstractTableWrite(
+            FileStoreWrite<T> write,
+            SinkRecordConverter recordConverter,
+            FileStoreOptions options) {
         this.write = write;
         this.recordConverter = recordConverter;
 
+        MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
+        HeapMemorySegmentPool memoryPool =
+                new HeapMemorySegmentPool(
+                        mergeTreeOptions.writeBufferSize, mergeTreeOptions.pageSize);
+        this.memoryPoolFactory = new MemoryPoolFactory<>(memoryPool, this);

Review Comment:
   Move these to the construction of `KeyValueFileStoreWrite`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java:
##########
@@ -32,9 +33,18 @@
  */
 public interface RecordWriter<T> {
 
+    /** Open the record write. */
+    void open(MemorySegmentPool memoryPool);

Review Comment:
   No need to change the base interface. These methods are only useful in `KeyValueRecordWriter`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java:
##########
@@ -95,7 +95,7 @@ public class MergeTreeOptions {
 
     public final long writeBufferSize;
 
-    public final long pageSize;
+    public final int pageSize;

Review Comment:
   Why changing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org