You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/22 09:49:29 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request, #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

JingsongLi opened a new pull request, #171:
URL: https://github.com/apache/flink-table-store/pull/171

   Currently, Sink's MemTable memory is self-managed Heap, and compaction also consumes memory, which is not very easy to use, and the memory problem will be more serious if there are multiple partitions and multiple Buckets in a single task.
   
   We need to control the total memory of a single sink task.
   
   - Introduce MemoryPoolFactory to share memory for multiple writers
   - MemoryPoolFactory supports PreemptRunner to preempt memory from other writers (flush other writers, do not flush writer self)
   - Fix MergeTreeWriter's close, avoid unexpected interrupt exceptions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r906944139


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.utils.MemoryPoolFactory;
+import org.apache.flink.table.store.file.writer.MemoryRecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import java.util.Map;
+
+/**
+ * A {@link TableWrite} which supports using shared memory and preempting memory from other writers.
+ */
+public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T>
+        implements MemoryPoolFactory.PreemptRunner<MemoryRecordWriter<T>> {

Review Comment:
   Why add an abstraction on the table layer? Move this to `KeyValueFileStoreTableWrite` (or create an abstract class which extends `AbstractFileStoreTableWrite` so that the table layer does not need to worry about memory management.
   
   With current implementation, I can never use the key value layer alone. I'll have to solve the memory management problem again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r906958100


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.utils.MemoryPoolFactory;
+import org.apache.flink.table.store.file.writer.MemoryRecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import java.util.Map;
+
+/**
+ * A {@link TableWrite} which supports using shared memory and preempting memory from other writers.
+ */
+public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T>
+        implements MemoryPoolFactory.PreemptRunner<MemoryRecordWriter<T>> {

Review Comment:
   You means `KeyValueFileStoreWrite`?
   I understand what you mean, but currently `KeyValueFileStoreWrite` is not responsible for writing data, it is just a factory that creates writers, so it does not accomplish memory management capabilities unless we move the logic of `AbstractTableWrite` to `FileStoreWrite`, such as `write` and `prepareCommit`.
   
   I refactored the code and will make `MemoryTableWrite` simple enough that it is now just a shell to call, the real logic is in the `MemoryPoolFactory`.
   
   Let's consider "move the logic of `AbstractTableWrite` to `FileStoreWrite`" in new JIRA.
   https://issues.apache.org/jira/browse/FLINK-28256



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi closed pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
JingsongLi closed pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
URL: https://github.com/apache/flink-table-store/pull/171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi closed pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
JingsongLi closed pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
URL: https://github.com/apache/flink-table-store/pull/171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r905881398


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java:
##########
@@ -32,9 +33,18 @@
  */
 public interface RecordWriter<T> {
 
+    /** Open the record write. */
+    void open(MemorySegmentPool memoryPool);

Review Comment:
   The current AppendOnlyWriter also has memory problems. When there are too many partitions, the ORC Writer's VectorizedRowBatch will also take up a lot of memory, leading to OOM.
   I'm considering whether I can also bring to AppendOnly in the future the ability to.
   - ORC writer's memory can be pooled
   - Or put records into MemorySegment first, then write the file
   
   So I introduce this to `RecordWriter` interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r905877682


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java:
##########
@@ -95,7 +95,7 @@ public class MergeTreeOptions {
 
     public final long writeBufferSize;
 
-    public final long pageSize;
+    public final int pageSize;

Review Comment:
   In Flink, `pageSize` in `MemoryManager.create` is int. Int is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r905872413


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java:
##########
@@ -38,20 +46,33 @@
  *
  * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}.
  */
-public abstract class AbstractTableWrite<T> implements TableWrite {
+public abstract class AbstractTableWrite<T>
+        implements TableWrite, MemoryPoolFactory.PreemptRunner<RecordWriter<T>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTableWrite.class);
 
     private final FileStoreWrite<T> write;
     private final SinkRecordConverter recordConverter;
 
     private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
     private final ExecutorService compactExecutor;
+    private final MemoryPoolFactory<RecordWriter<T>> memoryPoolFactory;
 
     private boolean overwrite = false;
 
-    protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter recordConverter) {
+    protected AbstractTableWrite(
+            FileStoreWrite<T> write,
+            SinkRecordConverter recordConverter,
+            FileStoreOptions options) {
         this.write = write;
         this.recordConverter = recordConverter;
 
+        MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
+        HeapMemorySegmentPool memoryPool =
+                new HeapMemorySegmentPool(
+                        mergeTreeOptions.writeBufferSize, mergeTreeOptions.pageSize);
+        this.memoryPoolFactory = new MemoryPoolFactory<>(memoryPool, this);

Review Comment:
   Move these to the construction of `KeyValueFileStoreWrite`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java:
##########
@@ -32,9 +33,18 @@
  */
 public interface RecordWriter<T> {
 
+    /** Open the record write. */
+    void open(MemorySegmentPool memoryPool);

Review Comment:
   No need to change the base interface. These methods are only useful in `KeyValueRecordWriter`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java:
##########
@@ -95,7 +95,7 @@ public class MergeTreeOptions {
 
     public final long writeBufferSize;
 
-    public final long pageSize;
+    public final int pageSize;

Review Comment:
   Why changing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #171: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #171:
URL: https://github.com/apache/flink-table-store/pull/171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org