You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/23 15:32:32 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-29276] BinaryInMemorySortBuffer supports to clear all memory segments
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new b82f41a8 [FLINK-29276] BinaryInMemorySortBuffer supports to clear all memory segments
b82f41a8 is described below
commit b82f41a8c75f08e683377645a2ae6bb9eee1e755
Author: shammon <zj...@gmail.com>
AuthorDate: Fri Sep 23 23:28:42 2022 +0800
[FLINK-29276] BinaryInMemorySortBuffer supports to clear all memory segments
This closes #300
---
.../table/store/file/memory/MemoryPoolFactory.java | 3 -
.../file/memory/sort/BinaryInMemorySortBuffer.java | 241 +++++++++++++++++++++
.../store/file/mergetree/SortBufferMemTable.java | 12 +-
.../file/mergetree/SortBufferMemTableTestBase.java | 1 +
4 files changed, 252 insertions(+), 5 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
index ef1583d5..bad0aa54 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
@@ -91,9 +91,6 @@ public class MemoryPoolFactory {
@Override
public int freePages() {
- // Actually, other owners still keep 1 page
- // TODO We need to optimize this one page later.
- // See BinaryInMemorySortBuffer.reset
return totalPages - allocatedPages;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java
new file mode 100644
index 00000000..951e46bd
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.memory.sort;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * In memory sort buffer for binary row. The main code is copied from Flink {@code
+ * BinaryInMemorySortBuffer} instead of extended because it's a final class.
+ *
+ * <p>The main differences in the new sort buffer are:
+ *
+ * <ul>
+ * <li>1. Add clear method to clean all memory.
+ * <li>2. Add tryInitialized() method to initialize memory before write and read in buffer, while
+ * the old buffer will do it in the constructor and reset().
+ * <li>3. Remove reset() and etc. methods which are not used in flink table store.
+ * </ul>
+ */
+public class BinaryInMemorySortBuffer extends BinaryIndexedSortable {
+
+ private static final int MIN_REQUIRED_BUFFERS = 3;
+
+ private final AbstractRowDataSerializer<RowData> inputSerializer;
+ private final ArrayList<MemorySegment> recordBufferSegments;
+ private final SimpleCollectingOutputView recordCollector;
+
+ private long currentDataBufferOffset;
+ private long sortIndexBytes;
+ private boolean isInitialized;
+
+ /** Create a memory sorter in `insert` way. */
+ public static BinaryInMemorySortBuffer createBuffer(
+ NormalizedKeyComputer normalizedKeyComputer,
+ AbstractRowDataSerializer<RowData> inputSerializer,
+ BinaryRowDataSerializer serializer,
+ RecordComparator comparator,
+ MemorySegmentPool memoryPool) {
+ checkArgument(memoryPool.freePages() >= MIN_REQUIRED_BUFFERS);
+ ArrayList<MemorySegment> recordBufferSegments = new ArrayList<>(16);
+ return new BinaryInMemorySortBuffer(
+ normalizedKeyComputer,
+ inputSerializer,
+ serializer,
+ comparator,
+ recordBufferSegments,
+ new SimpleCollectingOutputView(
+ recordBufferSegments, memoryPool, memoryPool.pageSize()),
+ memoryPool);
+ }
+
+ private BinaryInMemorySortBuffer(
+ NormalizedKeyComputer normalizedKeyComputer,
+ AbstractRowDataSerializer<RowData> inputSerializer,
+ BinaryRowDataSerializer serializer,
+ RecordComparator comparator,
+ ArrayList<MemorySegment> recordBufferSegments,
+ SimpleCollectingOutputView recordCollector,
+ MemorySegmentPool pool) {
+ super(normalizedKeyComputer, serializer, comparator, recordBufferSegments, pool);
+ this.inputSerializer = inputSerializer;
+ this.recordBufferSegments = recordBufferSegments;
+ this.recordCollector = recordCollector;
+ // The memory will be initialized in super()
+ this.isInitialized = true;
+ this.clear();
+ }
+
+ // -------------------------------------------------------------------------
+ // Memory Segment
+ // -------------------------------------------------------------------------
+
+ public void returnToSegmentPool() {
+ // return all memory
+ this.memorySegmentPool.returnAll(this.sortIndex);
+ this.memorySegmentPool.returnAll(this.recordBufferSegments);
+ this.sortIndex.clear();
+ this.recordBufferSegments.clear();
+ }
+
+ public int getBufferSegmentCount() {
+ return this.recordBufferSegments.size();
+ }
+
+ /** Try to initialize the sort buffer if all contained data is discarded. */
+ public void tryInitialize() {
+ if (!isInitialized) {
+ // grab first buffer
+ this.currentSortIndexSegment = nextMemorySegment();
+ this.sortIndex.add(this.currentSortIndexSegment);
+ // grab second buffer
+ this.recordCollector.reset();
+ this.isInitialized = true;
+ }
+ }
+
+ /**
+ * We add clear() method here instead of reset() to release all memory segments. The reset()
+ * method in flink sort buffer will clear memory and grab two buffers for
+ * currentSortIndexSegment and recordCollector.
+ */
+ public void clear() {
+ if (this.isInitialized) {
+ // reset all offsets
+ this.numRecords = 0;
+ this.currentSortIndexOffset = 0;
+ this.currentDataBufferOffset = 0;
+ this.sortIndexBytes = 0;
+
+ // return all memory
+ returnToSegmentPool();
+ this.currentSortIndexSegment = null;
+ this.isInitialized = false;
+ }
+ }
+
+ public long getOccupancy() {
+ return this.currentDataBufferOffset + this.sortIndexBytes;
+ }
+
+ /**
+ * Writes a given record to this sort buffer. The written record will be appended and take the
+ * last logical position.
+ *
+ * @param record The record to be written.
+ * @return True, if the record was successfully written, false, if the sort buffer was full.
+ * @throws IOException Thrown, if an error occurred while serializing the record into the
+ * buffers.
+ */
+ public boolean write(RowData record) throws IOException {
+ tryInitialize();
+
+ // check whether we need a new memory segment for the sort index
+ if (!checkNextIndexOffset()) {
+ return false;
+ }
+
+ // serialize the record into the data buffers
+ int skip;
+ try {
+ skip = this.inputSerializer.serializeToPages(record, this.recordCollector);
+ } catch (EOFException e) {
+ return false;
+ }
+
+ final long newOffset = this.recordCollector.getCurrentOffset();
+ long currOffset = currentDataBufferOffset + skip;
+
+ writeIndexAndNormalizedKey(record, currOffset);
+
+ this.currentDataBufferOffset = newOffset;
+
+ return true;
+ }
+
+ private BinaryRowData getRecordFromBuffer(BinaryRowData reuse, long pointer)
+ throws IOException {
+ this.recordBuffer.setReadPosition(pointer);
+ return this.serializer.mapFromPages(reuse, this.recordBuffer);
+ }
+
+ // -------------------------------------------------------------------------
+
+ /**
+ * Gets an iterator over all records in this buffer in their logical order.
+ *
+ * @return An iterator returning the records in their logical order.
+ */
+ public final MutableObjectIterator<BinaryRowData> getIterator() {
+ tryInitialize();
+
+ return new MutableObjectIterator<BinaryRowData>() {
+ private final int size = size();
+ private int current = 0;
+
+ private int currentSegment = 0;
+ private int currentOffset = 0;
+
+ private MemorySegment currentIndexSegment = sortIndex.get(0);
+
+ @Override
+ public BinaryRowData next(BinaryRowData target) {
+ if (this.current < this.size) {
+ this.current++;
+ if (this.currentOffset > lastIndexEntryOffset) {
+ this.currentOffset = 0;
+ this.currentIndexSegment = sortIndex.get(++this.currentSegment);
+ }
+
+ long pointer = this.currentIndexSegment.getLong(this.currentOffset);
+ this.currentOffset += indexEntrySize;
+
+ try {
+ return getRecordFromBuffer(target, pointer);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public BinaryRowData next() {
+ throw new RuntimeException("Not support!");
+ }
+ };
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 87186888..53406885 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -18,18 +18,19 @@
package org.apache.flink.table.store.file.mergetree;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.memory.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionHelper;
import org.apache.flink.table.types.logical.BigIntType;
@@ -44,6 +45,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** A {@link MemTable} which stores records in {@link BinaryInMemorySortBuffer}. */
public class SortBufferMemTable implements MemTable {
@@ -110,7 +113,12 @@ public class SortBufferMemTable implements MemTable {
@Override
public void clear() {
- buffer.reset();
+ buffer.clear();
+ }
+
+ @VisibleForTesting
+ void assertBufferEmpty() {
+ checkState(buffer.getBufferSegmentCount() == 0, "The sort buffer is not empty");
}
private class MergeIterator implements Iterator<KeyValue> {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
index f70f703a..4469a8e2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
@@ -67,6 +67,7 @@ public abstract class SortBufferMemTableTestBase {
public void testAndClear() throws IOException {
testRandom(100);
table.clear();
+ table.assertBufferEmpty();
testRandom(200);
}