You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/29 17:46:13 UTC

[incubator-pinot] branch offheap_var_length_store created (now a342d57)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch offheap_var_length_store
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at a342d57  Implement OffHeapMutableBytesStore for real-time var-length bytes store

This branch includes the following new commits:

     new a342d57  Implement OffHeapMutableBytesStore for real-time var-length bytes store

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Implement OffHeapMutableBytesStore for real-time var-length bytes store

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch offheap_var_length_store
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a342d5768cb28f56669fe3bf4f6ccd79b8a374f5
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Sat Jun 29 10:39:25 2019 -0700

    Implement OffHeapMutableBytesStore for real-time var-length bytes store
    
    Comparing with the MutableOffHeapByteArrayStore:
    1. Read/Write speed improvement
    2. Buffer size is fixed and buffers are allocated on demand, so memory is always used effeciently
    3. No need to estimate the buffer size beforehead, and performance is guaranteed (for MutableOffHeapByteArrayStore, if buffer size is under-estimated, pay huge penalty on allocating a huge buffer at runtime, and also read/write speed degration
    
    Added benchmark to compare the read/write speed:
    Benchmark                                                            (_maxValueLength)  Mode  Cnt     Score    Error  Units
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                   8  avgt    5    29.875 ±  0.370  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                  16  avgt    5    32.344 ±  0.765  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                  32  avgt    5    35.027 ±  0.460  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                  64  avgt    5    40.811 ±  0.414  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                 128  avgt    5    51.767 ±  0.185  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                 256  avgt    5    77.065 ±  1.627  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                 512  avgt    5   127.167 ±  1.075  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreRead                1024  avgt    5   283.145 ±  2.829  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                  8  avgt    5    16.755 ±  0.374  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                 16  avgt    5    25.579 ±  0.293  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                 32  avgt    5    23.953 ±  0.507  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                 64  avgt    5    51.442 ±  1.230  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                128  avgt    5    64.334 ±  0.069  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                256  avgt    5   260.415 ±  1.823  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite                512  avgt    5   615.528 ±  3.180  ms/op
    BenchmarkOffHeapMutableBytesStore.mutableOffHeapByteArrayStoreWrite               1024  avgt    5  1168.779 ± 16.961  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                       8  avgt    5    28.978 ±  0.535  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                      16  avgt    5    33.420 ±  0.400  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                      32  avgt    5    37.120 ±  0.272  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                      64  avgt    5    39.552 ±  0.317  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                     128  avgt    5    42.861 ±  0.588  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                     256  avgt    5    47.025 ±  0.189  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                     512  avgt    5    60.740 ±  0.677  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreRead                    1024  avgt    5    89.509 ±  1.838  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                      8  avgt    5    21.327 ±  0.509  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                     16  avgt    5    28.494 ±  0.304  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                     32  avgt    5    33.399 ±  0.533  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                     64  avgt    5    50.755 ±  1.700  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                    128  avgt    5    75.702 ±  1.079  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                    256  avgt    5   129.677 ±  1.942  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                    512  avgt    5   222.697 ±  2.109  ms/op
    BenchmarkOffHeapMutableBytesStore.offHeapMutableBytesStoreWrite                   1024  avgt    5   392.053 ±  3.200  ms/op
---
 .../impl/dictionary/OffHeapMutableBytesStore.java  | 195 +++++++++++++++++++++
 .../dictionary/OffHeapMutableBytesStoreTest.java   |  93 ++++++++++
 .../perf/BenchmarkOffHeapMutableBytesStore.java    | 147 ++++++++++++++++
 3 files changed, 435 insertions(+)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStore.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStore.java
new file mode 100644
index 0000000..72a5064
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStore.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.dictionary;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+/**
+ * Off-heap variable length mutable bytes store.
+ * <p>The class is thread-safe for single writer and multiple readers.
+ * <p>There are two sets of buffers allocated for the store, one set to store the value end offsets and the other set to
+ * store the values. The new buffer is allocated when the previous buffer is filled up. The first entry of each offset
+ * buffer stores the end offset of the last value (or 0 for the first offset buffer) so that we can always get the
+ * previous and current value's end offset from the same buffer.
+ */
+public class OffHeapMutableBytesStore implements Closeable {
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  // MAX_NUM_BUFFERS = 8_192
+  private static final int MAX_NUM_BUFFERS = 1 << 13;
+
+  // OFFSET_BUFFER_SIZE = 32_768 + 4
+  private static final int OFFSET_BUFFER_SHIFT_OFFSET = 13;
+  private static final int OFFSET_BUFFER_SIZE = ((1 << OFFSET_BUFFER_SHIFT_OFFSET) + 1) << 2;
+  private static final int OFFSET_BUFFER_MASK = 0xFFFFFFFF >>> (Integer.SIZE - OFFSET_BUFFER_SHIFT_OFFSET);
+
+  // VALUE_BUFFER_SIZE = 1_048_576
+  private static final int VALUE_BUFFER_SIFT_OFFSET = 20;
+  private static final int VALUE_BUFFER_SIZE = 1 << VALUE_BUFFER_SIFT_OFFSET;
+  private static final int VALUE_BUFFER_MASK = 0xFFFFFFFF >>> (Integer.SIZE - VALUE_BUFFER_SIFT_OFFSET);
+
+  private final PinotDataBuffer[] _offsetBuffers = new PinotDataBuffer[MAX_NUM_BUFFERS];
+  private final PinotDataBuffer[] _valueBuffers = new PinotDataBuffer[MAX_NUM_BUFFERS];
+  private final PinotDataBufferMemoryManager _memoryManager;
+  private final String _allocationContext;
+
+  private int _numValues;
+  private int _valueStartOffset;
+  private int _totalBufferSize;
+
+  public OffHeapMutableBytesStore(PinotDataBufferMemoryManager memoryManager, String allocationContext) {
+    _memoryManager = memoryManager;
+    _allocationContext = allocationContext;
+  }
+
+  public int add(byte[] value) {
+    int offsetBufferIndex = _numValues >>> OFFSET_BUFFER_SHIFT_OFFSET;
+    int offsetIndex = _numValues & OFFSET_BUFFER_MASK;
+
+    PinotDataBuffer offsetBuffer;
+    // If this is the first entry in the offset buffer, allocate a new buffer and store the start offset (end offset of
+    // the previous value)
+    if (offsetIndex == 0) {
+      offsetBuffer = _memoryManager.allocate(OFFSET_BUFFER_SIZE, _allocationContext);
+      offsetBuffer.putInt(0, _valueStartOffset);
+      _offsetBuffers[offsetBufferIndex] = offsetBuffer;
+      _totalBufferSize += OFFSET_BUFFER_SIZE;
+    } else {
+      offsetBuffer = _offsetBuffers[offsetBufferIndex];
+    }
+
+    int valueLength = value.length;
+    if (valueLength == 0) {
+      offsetBuffer.putInt((offsetIndex + 1) << 2, _valueStartOffset);
+      return _numValues++;
+    }
+
+    int valueBufferIndex = (_valueStartOffset + valueLength - 1) >>> VALUE_BUFFER_SIFT_OFFSET;
+    PinotDataBuffer valueBuffer;
+    // If the current value buffer does not have enough space, allocate a new buffer to store the value
+    if ((_valueStartOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET != valueBufferIndex) {
+      valueBuffer = _memoryManager.allocate(VALUE_BUFFER_SIZE, _allocationContext);
+      _valueBuffers[valueBufferIndex] = valueBuffer;
+      _totalBufferSize += VALUE_BUFFER_SIZE;
+      _valueStartOffset = valueBufferIndex << VALUE_BUFFER_SIFT_OFFSET;
+    } else {
+      valueBuffer = _valueBuffers[valueBufferIndex];
+    }
+
+    int valueEndOffset = _valueStartOffset + valueLength;
+    offsetBuffer.putInt((offsetIndex + 1) << 2, valueEndOffset);
+    valueBuffer.readFrom(_valueStartOffset & VALUE_BUFFER_MASK, value);
+    _valueStartOffset = valueEndOffset;
+
+    return _numValues++;
+  }
+
+  public byte[] get(int index) {
+    int offsetBufferIndex = index >>> OFFSET_BUFFER_SHIFT_OFFSET;
+    int offsetIndex = index & OFFSET_BUFFER_MASK;
+    int previousValueEndOffset = _offsetBuffers[offsetBufferIndex].getInt(offsetIndex << 2);
+    int valueEndOffset = _offsetBuffers[offsetBufferIndex].getInt((offsetIndex + 1) << 2);
+
+    if (previousValueEndOffset == valueEndOffset) {
+      return EMPTY_BYTES;
+    }
+
+    int valueBufferIndex = (valueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET;
+    int startOffsetInValueBuffer;
+    int valueLength;
+    if ((previousValueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET != valueBufferIndex) {
+      // The first value in the value buffer
+      startOffsetInValueBuffer = 0;
+      valueLength = valueEndOffset & VALUE_BUFFER_MASK;
+    } else {
+      // Not the first value in the value buffer
+      startOffsetInValueBuffer = previousValueEndOffset & VALUE_BUFFER_MASK;
+      valueLength = valueEndOffset - previousValueEndOffset;
+    }
+
+    byte[] value = new byte[valueLength];
+    _valueBuffers[valueBufferIndex].copyTo(startOffsetInValueBuffer, value);
+    return value;
+  }
+
+  public boolean equals(int index, byte[] value) {
+    int valueLength = value.length;
+
+    int offsetBufferIndex = index >>> OFFSET_BUFFER_SHIFT_OFFSET;
+    int offsetIndex = index & OFFSET_BUFFER_MASK;
+    int previousValueEndOffset = _offsetBuffers[offsetBufferIndex].getInt(offsetIndex << 2);
+    int valueEndOffset = _offsetBuffers[offsetBufferIndex].getInt((offsetIndex + 1) << 2);
+
+    if (valueLength == 0) {
+      return previousValueEndOffset == valueEndOffset;
+    }
+
+    int valueBufferIndex = (valueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET;
+    int startOffsetInValueBuffer;
+    if ((previousValueEndOffset - 1) >>> VALUE_BUFFER_SIFT_OFFSET != valueBufferIndex) {
+      // The first value in the value buffer
+      if ((valueEndOffset & VALUE_BUFFER_MASK) != valueLength) {
+        return false;
+      }
+      startOffsetInValueBuffer = 0;
+    } else {
+      // Not the first value in the value buffer
+      if (valueEndOffset - previousValueEndOffset != valueLength) {
+        return false;
+      }
+      startOffsetInValueBuffer = previousValueEndOffset & VALUE_BUFFER_MASK;
+    }
+
+    byte[] valueAtIndex = new byte[valueLength];
+    _valueBuffers[valueBufferIndex].copyTo(startOffsetInValueBuffer, valueAtIndex);
+    for (int i = 0; i < valueLength; i++) {
+      if (value[i] != valueAtIndex[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public int getTotalBufferSize() {
+    return _totalBufferSize;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    for (PinotDataBuffer offsetBuffer : _offsetBuffers) {
+      if (offsetBuffer != null) {
+        offsetBuffer.close();
+      } else {
+        break;
+      }
+    }
+    for (PinotDataBuffer valueBuffer : _valueBuffers) {
+      if (valueBuffer != null) {
+        valueBuffer.close();
+      } else {
+        break;
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStoreTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStoreTest.java
new file mode 100644
index 0000000..44fcec6
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/OffHeapMutableBytesStoreTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.dictionary;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class OffHeapMutableBytesStoreTest {
+  private static final int NUM_VALUES = 100_000;
+  private static final int MAX_NUM_BYTES = 512;
+  private static final Random RANDOM = new Random();
+
+  private final PinotDataBufferMemoryManager _memoryManager = new DirectMemoryManager("testSegment");
+  private final byte[][] _values = new byte[NUM_VALUES][];
+
+  @BeforeClass
+  public void setUp() {
+    for (int i = 0; i < NUM_VALUES; i++) {
+      int numBytes = RANDOM.nextInt(MAX_NUM_BYTES);
+      byte[] value = new byte[numBytes];
+      RANDOM.nextBytes(value);
+      _values[i] = value;
+    }
+  }
+
+  @Test
+  public void testAdd()
+      throws IOException {
+    try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+      for (int i = 0; i < NUM_VALUES; i++) {
+        assertEquals(offHeapMutableBytesStore.add(_values[i]), i);
+      }
+    }
+  }
+
+  @Test
+  public void testGet()
+      throws IOException {
+    try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+      for (int i = 0; i < NUM_VALUES; i++) {
+        offHeapMutableBytesStore.add(_values[i]);
+      }
+      for (int i = 0; i < NUM_VALUES; i++) {
+        int index = RANDOM.nextInt(NUM_VALUES);
+        assertTrue(Arrays.equals(offHeapMutableBytesStore.get(index), _values[index]));
+      }
+    }
+  }
+
+  @Test
+  public void testEquals()
+      throws IOException {
+    try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+      for (int i = 0; i < NUM_VALUES; i++) {
+        offHeapMutableBytesStore.add(_values[i]);
+      }
+      for (int i = 0; i < NUM_VALUES; i++) {
+        int index = RANDOM.nextInt(NUM_VALUES);
+        assertTrue(offHeapMutableBytesStore.equals(index, _values[index]));
+        if (!Arrays.equals(_values[index], _values[0])) {
+          assertFalse(offHeapMutableBytesStore.equals(0, _values[index]));
+          assertFalse(offHeapMutableBytesStore.equals(index, _values[0]));
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffHeapMutableBytesStore.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffHeapMutableBytesStore.java
new file mode 100644
index 0000000..fbe148f
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffHeapMutableBytesStore.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MutableOffHeapByteArrayStore;
+import org.apache.pinot.core.realtime.impl.dictionary.OffHeapMutableBytesStore;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 3, time = 30)
+@Measurement(iterations = 5, time = 30)
+@Fork(1)
+@State(Scope.Benchmark)
+public class BenchmarkOffHeapMutableBytesStore {
+  private static final int NUM_VALUES = 1_000_000;
+
+  @Param({"8", "16", "32", "64", "128", "256", "512", "1024"})
+  private int _maxValueLength;
+
+  private PinotDataBufferMemoryManager _memoryManager;
+  private byte[][] _values;
+  private OffHeapMutableBytesStore _offHeapMutableBytesStore;
+  private MutableOffHeapByteArrayStore _mutableOffHeapByteArrayStore;
+
+  @Setup
+  public void setUp() {
+    _memoryManager = new DirectMemoryManager("");
+
+    _values = new byte[NUM_VALUES][];
+    Random random = new Random();
+    for (int i = 0; i < NUM_VALUES; i++) {
+      int valueLength = random.nextInt(_maxValueLength + 1);
+      byte[] value = new byte[valueLength];
+      random.nextBytes(value);
+      _values[i] = value;
+    }
+
+    _offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null);
+    for (byte[] value : _values) {
+      _offHeapMutableBytesStore.add(value);
+    }
+    System.out
+        .println("\nBytes allocated for OffHeapMutableBytesStore: " + _offHeapMutableBytesStore.getTotalBufferSize());
+
+    _mutableOffHeapByteArrayStore =
+        new MutableOffHeapByteArrayStore(_memoryManager, null, NUM_VALUES, _maxValueLength / 2);
+    for (byte[] value : _values) {
+      _mutableOffHeapByteArrayStore.add(value);
+    }
+    System.out.println("\nBytes allocated for MutableOffHeapByteArrayStore: " + _mutableOffHeapByteArrayStore
+        .getTotalOffHeapMemUsed());
+  }
+
+  @TearDown
+  public void tearDown()
+      throws IOException {
+    _mutableOffHeapByteArrayStore.close();
+    _offHeapMutableBytesStore.close();
+    _memoryManager.close();
+  }
+
+  @Benchmark
+  public int offHeapMutableBytesStoreRead() {
+    int sum = 0;
+    for (int i = 0; i < NUM_VALUES; i++) {
+      sum += _offHeapMutableBytesStore.get(i).length;
+    }
+    return sum;
+  }
+
+  @Benchmark
+  public int mutableOffHeapByteArrayStoreRead() {
+    int sum = 0;
+    for (int i = 0; i < NUM_VALUES; i++) {
+      sum += _mutableOffHeapByteArrayStore.get(i).length;
+    }
+    return sum;
+  }
+
+  @Benchmark
+  public int offHeapMutableBytesStoreWrite()
+      throws IOException {
+    int sum = 0;
+    try (OffHeapMutableBytesStore offHeapMutableBytesStore = new OffHeapMutableBytesStore(_memoryManager, null)) {
+      for (byte[] value : _values) {
+        sum += offHeapMutableBytesStore.add(value);
+      }
+    }
+    return sum;
+  }
+
+  @Benchmark
+  public int mutableOffHeapByteArrayStoreWrite()
+      throws IOException {
+    int sum = 0;
+    try (MutableOffHeapByteArrayStore mutableOffHeapByteArrayStore = new MutableOffHeapByteArrayStore(_memoryManager,
+        null, NUM_VALUES, _maxValueLength / 2)) {
+      for (byte[] value : _values) {
+        sum += mutableOffHeapByteArrayStore.add(value);
+      }
+    }
+    return sum;
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkOffHeapMutableBytesStore.class.getSimpleName());
+    new Runner(opt.build()).run();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org