You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/29 17:46:14 UTC
[incubator-pinot] 01/01: Implement OffHeapMutableBytesStore for
real-time var-length bytes store
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch offheap_var_length_store
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a342d5768cb28f56669fe3bf4f6ccd79b8a374f5
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Sat Jun 29 10:39:25 2019 -0700
Implement OffHeapMutableBytesStore for real-time var-length bytes store
Comparing with the MutableOffHeapByteArrayStore:
1. Read/Write speed improvement
2. Buffer size is fixed and buffers are allocated on demand, so memory is always used effeciently
3. No need to estimate the buffer size beforehead, and performance is guaranteed (for MutableOffHeapByteArrayStore, if buffer size is under-estimated, pay huge penalty on allocating a huge buffer at runtime, and also read/write speed degration
Added benchmark to compare the read/write speed:
Benchmark (_maxValueLength) Mode Cnt Score Error Units
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 8 avgt 5 29.875 ± 0.370 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 16 avgt 5 32.344 ± 0.765 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 32 avgt 5 35.027 ± 0.460 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 64 avgt 5 40.811 ± 0.414 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 128 avgt 5 51.767 ± 0.185 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 256 avgt 5 77.065 ± 1.627 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 512 avgt 5 127.167 ± 1.075 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead 1024 avgt 5 283.145 ± 2.829 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 8 avgt 5 16.755 ± 0.374 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 16 avgt 5 25.579 ± 0.293 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 32 avgt 5 23.953 ± 0.507 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 64 avgt 5 51.442 ± 1.230 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 128 avgt 5 64.334 ± 0.069 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 256 avgt 5 260.415 ± 1.823 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 512 avgt 5 615.528 ± 3.180 ms/op
BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite 1024 avgt 5 1168.779 ± 16.961 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 8 avgt 5 28.978 ± 0.535 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 16 avgt 5 33.420 ± 0.400 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 32 avgt 5 37.120 ± 0.272 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 64 avgt 5 39.552 ± 0.317 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 128 avgt 5 42.861 ± 0.588 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 256 avgt 5 47.025 ± 0.189 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 512 avgt 5 60.740 ± 0.677 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead 1024 avgt 5 89.509 ± 1.838 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 8 avgt 5 21.327 ± 0.509 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 16 avgt 5 28.494 ± 0.304 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 32 avgt 5 33.399 ± 0.533 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 64 avgt 5 50.755 ± 1.700 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 128 avgt 5 75.702 ± 1.079 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 256 avgt 5 129.677 ± 1.942 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 512 avgt 5 222.697 ± 2.109 ms/op
BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite 1024 avgt 5 392.053 ± 3.200 ms/op
---
.../impl/dictionary/OffHeapMutableBytesStore.java | 195 +++++++++++++++++++++
.../dictionary/OffHeapMutableBytesStoreTest.java | 93 ++++++++++
.../perf/BenchmarkOffHeapMutableBytesStore.java | 147 ++++++++++++++++
3 files changed, 435 insertions(+)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStore.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStore.java
new file mode 100644
index 0000000..72a5064
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStore.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.dictionary;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+/**
+ * Off-heap variable length mutable bytes store.
+ * <p>The class is thread-safe for single writer and multiple readers.
+ * <p>There are two sets of buffers allocated for the store, one set to store the value end offsets and the other set to
+ * store the values. The new buffer is allocated when the previous buffer is filled up. The first entry of each offset
+ * buffer stores the end offset of the last value (or 0 for the first offset buffer) so that we can always get the
+ * previous and current value's end offset from the same buffer.
+ */
+public class OffHeapMutableBytesStore implements Closeable {
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ // MAX_NUM_BUFFERS = 8_192
+ private static final int MAX_NUM_BUFFERS = 1 << 13;
+
+ // OFFSET_BUFFER_SIZE = 32_768 + 4
+ private static final int OFFSET_BUFFER_SHIFT_OFFSET = 13;
+ private static final int OFFSET_BUFFER_SIZE = ((1 << OFFSET_BUFFER_SHIFT_OFFSET) + 1) << 2;
+ private static final int OFFSET_BUFFER_MASK = 0xFFFFFFFF >>> (Integer.SIZE - OFFSET_BUFFER_SHIFT_OFFSET);
+
+ // VALUE_BUFFER_SIZE = 1_048_576
+ private static final int VALUE_BUFFER_SIFT_OFFSET = 20;
+ private static final int VALUE_BUFFER_SIZE = 1 << VALUE_BUFFER_SIFT_OFFSET;
+ private static final int VALUE_BUFFER_MASK = 0xFFFFFFFF >>> (Integer.SIZE - VALUE_BUFFER_SIFT_OFFSET);
+
+ private final PinotDataBuffer[] _offsetBuffers = new PinotDataBuffer[MAX_NUM_BUFFERS];
+ private final PinotDataBuffer[] _valueBuffers = new PinotDataBuffer[MAX_NUM_BUFFERS];
+ private final PinotDataBufferMemoryManager _memoryManager;
+ private final String _allocationContext;
+
+ private int _numValues;
+ private int _valueStartOffset;
+ private int _totalBufferSize;
+
+ public OffHeapMutableBytesStore(PinotDataBufferMemoryManager memoryManager, String allocationContext) {
+ _memoryManager = memoryManager;
+ _allocationContext = allocationContext;
+ }
+
+ public int add(byte[] value) {
+ int offsetBufferIndex = _numValues >>> OFFSET_BUFFER_SHIFT_OFFSET;
+ int offsetIndex = _numValues & OFFSET_BUFFER_MASK;
+
+ PinotDataBuffer offsetBuffer;
+ // If this is the first entry in the offset buffer, allocate a new buffer and store the start offset (end offset of
+ // the previous value)
+ if (offsetIndex == 0) {
+ offsetBuffer = _memoryManager.allocate(OFFSET_BUFFER_SIZE, _allocationContext);
+ offsetBuffer.putInt(0, _valueStartOffset);
+ _offsetBuffers[offsetBufferIndex] = offsetBuffer;
+ _totalBufferSize += OFFSET_BUFFER_SIZE;
+ } else {
+ offsetBuffer = _offsetBuffers[offsetBufferIndex];
+ }
+
+ int valueLength = value.length;
+ if (valueLength == 0) {
+ offsetBuffer.putInt((offsetIndex + 1) << 2, _valueStartOffset);
+ return _numValues++;
+ }
+
+ int valueBufferIndex = (_valueStartOffset + valueLength - 1) >>> VALUE_BUFFER_SIFT_OFFSET;
+ PinotDataBuffer valueBuffer;
+ // If the current value buffer does not have enough space, allocate a new buffer to store the value
+ if ((_valueStartOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET != valueBufferIndex) {
+ valueBuffer = _memoryManager.allocate(VALUE_BUFFER_SIZE, _allocationContext);
+ _valueBuffers[valueBufferIndex] = valueBuffer;
+ _totalBufferSize += VALUE_BUFFER_SIZE;
+ _valueStartOffset = valueBufferIndex << VALUE_BUFFER_SIFT_OFFSET;
+ } else {
+ valueBuffer = _valueBuffers[valueBufferIndex];
+ }
+
+ int valueEndOffset = _valueStartOffset + valueLength;
+ offsetBuffer.putInt((offsetIndex + 1) << 2, valueEndOffset);
+ valueBuffer.readFrom(_valueStartOffset & VALUE_BUFFER_MASK, value);
+ _valueStartOffset = valueEndOffset;
+
+ return _numValues++;
+ }
+
+ public byte[] get(int index) {
+ int offsetBufferIndex = index >>> OFFSET_BUFFER_SHIFT_OFFSET;
+ int offsetIndex = index & OFFSET_BUFFER_MASK;
+ int previousValueEndOffset = _offsetBuffers[offsetBufferIndex].getInt(offsetIndex << 2);
+ int valueEndOffset = _offsetBuffers[offsetBufferIndex].getInt((offsetIndex + 1) << 2);
+
+ if (previousValueEndOffset == valueEndOffset) {
+ return EMPTY_BYTES;
+ }
+
+ int valueBufferIndex = (valueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET;
+ int startOffsetInValueBuffer;
+ int valueLength;
+ if ((previousValueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET != valueBufferIndex) {
+ // The first value in the value buffer
+ startOffsetInValueBuffer = 0;
+ valueLength = valueEndOffset & VALUE_BUFFER_MASK;
+ } else {
+ // Not the first value in the value buffer
+ startOffsetInValueBuffer = previousValueEndOffset & VALUE_BUFFER_MASK;
+ valueLength = valueEndOffset - previousValueEndOffset;
+ }
+
+ byte[] value = new byte[valueLength];
+ _valueBuffers[valueBufferIndex].copyTo(startOffsetInValueBuffer, value);
+ return value;
+ }
+
+ public boolean equals(int index, byte[] value) {
+ int valueLength = value.length;
+
+ int offsetBufferIndex = index >>> OFFSET_BUFFER_SHIFT_OFFSET;
+ int offsetIndex = index & OFFSET_BUFFER_MASK;
+ int previousValueEndOffset = _offsetBuffers[offsetBufferIndex].getInt(offsetIndex << 2);
+ int valueEndOffset = _offsetBuffers[offsetBufferIndex].getInt((offsetIndex + 1) << 2);
+
+ if (valueLength == 0) {
+ return previousValueEndOffset == valueEndOffset;
+ }
+
+ int valueBufferIndex = (valueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET;
+ int startOffsetInValueBuffer;
+ if ((previousValueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET != valueBufferIndex) {
+ // The first value in the value buffer
+ if ((valueEndOffset & VALUE_BUFFER_MASK) != valueLength) {
+ return false;
+ }
+ startOffsetInValueBuffer = 0;
+ } else {
+ // Not the first value in the value buffer
+ if (valueEndOffset - previousValueEndOffset != valueLength) {
+ return false;
+ }
+ startOffsetInValueBuffer = previousValueEndOffset & VALUE_BUFFER_MASK;
+ }
+
+ byte[] valueAtIndex = new byte[valueLength];
+ _valueBuffers[valueBufferIndex].copyTo(startOffsetInValueBuffer, valueAtIndex);
+ for (int i = 0; i < valueLength; i++) {
+ if (value[i] != valueAtIndex[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int getTotalBufferSize() {
+ return _totalBufferSize;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ for (PinotDataBuffer offsetBuffer : _offsetBuffers) {
+ if (offsetBuffer != null) {
+ offsetBuffer.close();
+ } else {
+ break;
+ }
+ }
+ for (PinotDataBuffer valueBuffer : _valueBuffers) {
+ if (valueBuffer != null) {
+ valueBuffer.close();
+ } else {
+ break;
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStoreTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStoreTest.java
new file mode 100644
index 0000000..44fcec6
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStoreTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.dictionary;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class OffHeapMutableBytesStoreTest {
+ private static final int NUM_VALUES = 100_000;
+ private static final int MAX_NUM_BYTES = 512;
+ private static final Random RANDOM = new Random();
+
+ private final PinotDataBufferMemoryManager _memoryManager = new DirectMemoryManager("testSegment");
+ private final byte[][] _values = new byte[NUM_VALUES][];
+
+ @BeforeClass
+ public void setUp() {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ int numBytes = RANDOM.nextInt(MAX_NUM_BYTES);
+ byte[] value = new byte[numBytes];
+ RANDOM.nextBytes(value);
+ _values[i] = value;
+ }
+ }
+
+ @Test
+ public void testAdd()
+ throws IOException {
+ try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ assertEquals(offHeapMutableBytesStore.add(_values[i]), i);
+ }
+ }
+ }
+
+ @Test
+ public void testGet()
+ throws IOException {
+ try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ offHeapMutableBytesStore.add(_values[i]);
+ }
+ for (int i = 0; i < NUM_VALUES; i++) {
+ int index = RANDOM.nextInt(NUM_VALUES);
+ assertTrue(Arrays.equals(offHeapMutableBytesStore.get(index), _values[index]));
+ }
+ }
+ }
+
+ @Test
+ public void testEquals()
+ throws IOException {
+ try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ offHeapMutableBytesStore.add(_values[i]);
+ }
+ for (int i = 0; i < NUM_VALUES; i++) {
+ int index = RANDOM.nextInt(NUM_VALUES);
+ assertTrue(offHeapMutableBytesStore.equals(index, _values[index]));
+ if (!Arrays.equals(_values[index], _values[0])) {
+ assertFalse(offHeapMutableBytesStore.equals(0, _values[index]));
+ assertFalse(offHeapMutableBytesStore.equals(index, _values[0]));
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffHeapMutableBytesStore.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffHeapMutableBytesStore.java
new file mode 100644
index 0000000..fbe148f
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffHeapMutableBytesStore.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MutableOffHeapByteArrayStore;
+import org.apache.pinot.core.realtime.impl.dictionary.OffHeapMutableBytesStore;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 3, time = 30)
+@Measurement(iterations = 5, time = 30)
+@Fork(1)
+@State(Scope.Benchmark)
+public class BenchmarkOffHeapMutableBytesStore {
+ private static final int NUM_VALUES = 1_000_000;
+
+ @Param({"8", "16", "32", "64", "128", "256", "512", "1024"})
+ private int _maxValueLength;
+
+ private PinotDataBufferMemoryManager _memoryManager;
+ private byte[][] _values;
+ private OffHeapMutableBytesStore _offHeapMutableBytesStore;
+ private MutableOffHeapByteArrayStore _mutableOffHeapByteArrayStore;
+
+ @Setup
+ public void setUp() {
+ _memoryManager = new DirectMemoryManager("");
+
+ _values = new byte[NUM_VALUES][];
+ Random random = new Random();
+ for (int i = 0; i < NUM_VALUES; i++) {
+ int valueLength = random.nextInt(_maxValueLength + 1);
+ byte[] value = new byte[valueLength];
+ random.nextBytes(value);
+ _values[i] = value;
+ }
+
+ _offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null);
+ for (byte[] value : _values) {
+ _offHeapMutableBytesStore.add(value);
+ }
+ System.out
+ .println("\nBytes allocated for OffHeapMutableBytesStore: " + _offHeapMutableBytesStore.getTotalBufferSize());
+
+ _mutableOffHeapByteArrayStore =
+ new MutableOffHeapByteArrayStore(_memoryManager, null, NUM_VALUES, _maxValueLength / 2);
+ for (byte[] value : _values) {
+ _mutableOffHeapByteArrayStore.add(value);
+ }
+ System.out.println("\nBytes allocated for MutableOffHeapByteArrayStore: " + _mutableOffHeapByteArrayStore
+ .getTotalOffHeapMemUsed());
+ }
+
+ @TearDown
+ public void tearDown()
+ throws IOException {
+ _mutableOffHeapByteArrayStore.close();
+ _offHeapMutableBytesStore.close();
+ _memoryManager.close();
+ }
+
+ @Benchmark
+ public int offHeapMutableBytesStoreRead() {
+ int sum = 0;
+ for (int i = 0; i < NUM_VALUES; i++) {
+ sum += _offHeapMutableBytesStore.get(i).length;
+ }
+ return sum;
+ }
+
+ @Benchmark
+ public int mutableOffHeapByteArrayStoreRead() {
+ int sum = 0;
+ for (int i = 0; i < NUM_VALUES; i++) {
+ sum += _mutableOffHeapByteArrayStore.get(i).length;
+ }
+ return sum;
+ }
+
+ @Benchmark
+ public int offHeapMutableBytesStoreWrite()
+ throws IOException {
+ int sum = 0;
+ try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+ for (byte[] value : _values) {
+ sum += offHeapMutableBytesStore.add(value);
+ }
+ }
+ return sum;
+ }
+
+ @Benchmark
+ public int mutableOffHeapByteArrayStoreWrite()
+ throws IOException {
+ int sum = 0;
+ try (MutableOffHeapByteArrayStore mutableOffHeapByteArrayStore = new MutableOffHeapByteArrayStore(_memoryManager,
+ null, NUM_VALUES, _maxValueLength / 2)) {
+ for (byte[] value : _values) {
+ sum += mutableOffHeapByteArrayStore.add(value);
+ }
+ }
+ return sum;
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkOffHeapMutableBytesStore.class.getSimpleName());
+ new Runner(opt.build()).run();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org