You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/02/01 10:59:24 UTC
[druid] branch master updated: Dimension dictionary reduce locking (#13710)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7a3bd89a85 Dimension dictionary reduce locking (#13710)
7a3bd89a85 is described below
commit 7a3bd89a854c73879ccdc51ce35071f81109b24c
Author: Jason Koch <jk...@netflix.com>
AuthorDate: Wed Feb 1 02:59:12 2023 -0800
Dimension dictionary reduce locking (#13710)
* perf: introduce benchmark for StringDimensionIndexer
jdk11 -- Benchmark Mode Cnt Score Error Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite avgt 10 30471.552 ± 456.716 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader avgt 10 18069.863 ± 327.923 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter avgt 10 67676.617 ± 2351.311 us/op
StringDimensionIndexerProcessBenchmark.soloReader avgt 10 1048.079 ± 1.120 us/op
StringDimensionIndexerProcessBenchmark.soloWriter avgt 10 4629.769 ± 29.353 us/op
* perf: switch DimensionDictionary to StampedLock
jdk11 - Benchmark Mode Cnt Score Error Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite avgt 10 37958.372 ± 1685.206 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader avgt 10 31192.232 ± 2755.365 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter avgt 10 58256.791 ± 1998.220 us/op
StringDimensionIndexerProcessBenchmark.soloReader avgt 10 1079.440 ± 1.753 us/op
StringDimensionIndexerProcessBenchmark.soloWriter avgt 10 4585.690 ± 13.225 us/op
* perf: use optimistic locking in DimensionDictionary
jdk11 - Benchmark Mode Cnt Score Error Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite avgt 10 6212.366 ± 162.684 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader avgt 10 1807.235 ± 109.339 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter avgt 10 19427.759 ± 611.692 us/op
StringDimensionIndexerProcessBenchmark.soloReader avgt 10 194.370 ± 1.050 us/op
StringDimensionIndexerProcessBenchmark.soloWriter avgt 10 2871.423 ± 14.426 us/op
* perf: refactor DimensionDictionary null handling to need less locks
jdk11 - Benchmark Mode Cnt Score Error Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite avgt 10 6591.619 ± 470.497 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader avgt 10 1387.338 ± 144.587 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter avgt 10 22204.462 ± 1620.806 us/op
StringDimensionIndexerProcessBenchmark.soloReader avgt 10 204.911 ± 0.459 us/op
StringDimensionIndexerProcessBenchmark.soloWriter avgt 10 2935.376 ± 12.639 us/op
* perf: refactor DimensionDictionary add handling to do a little less work
jdk11 - Benchmark Mode Cnt Score Error Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite avgt 10 2914.859 ± 22.519 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader avgt 10 508.010 ± 14.675 us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter avgt 10 10135.408 ± 82.745 us/op
StringDimensionIndexerProcessBenchmark.soloReader avgt 10 205.415 ± 0.158 us/op
StringDimensionIndexerProcessBenchmark.soloWriter avgt 10 3098.743 ± 23.603 us/op
---
.../StringDimensionIndexerProcessBenchmark.java | 170 +++++++++++++++++++++
.../apache/druid/segment/DimensionDictionary.java | 149 +++++++++++++-----
2 files changed, 278 insertions(+), 41 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
new file mode 100644
index 0000000000..0f44b15018
--- /dev/null
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.indexing;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.segment.StringDimensionIndexer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 10)
+public class StringDimensionIndexerProcessBenchmark
+{
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ String[] inputData;
+ StringDimensionIndexer emptyIndexer;
+ StringDimensionIndexer fullIndexer;
+ int[] readOrder;
+
+ @Setup(Level.Trial)
+ public void setup()
+ {
+ // maxNumbers : inputData ratio of 1000:50000 is 1:50, in other words we should expect each element to be 'added'
+ // 50x, the first time it is new, the next 49 times it is an existing entry; also 5% of values will be false
+
+ final int maxNumbers = 1000;
+ final int nullNumbers = 50; // 5%
+ final int validNumbers = (maxNumbers + 1) - nullNumbers;
+
+ // set up dummy input data, and load to indexer
+ inputData = new String[50000];
+ for (int i = 0; i < inputData.length; i++) {
+ int next = ThreadLocalRandom.current().nextInt(maxNumbers);
+ inputData[i] = (next < nullNumbers) ? null : ("abcd-" + next + "-efgh");
+ }
+
+ fullIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false);
+ for (String data : inputData) {
+ fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
+ }
+
+ // set up a random read order
+ readOrder = new int[inputData.length];
+ for (int i = 0; i < readOrder.length; i++) {
+ readOrder[i] = ThreadLocalRandom.current().nextInt(validNumbers);
+ }
+ }
+ @Setup(Level.Iteration)
+ public void setupEmptyIndexer()
+ {
+ emptyIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false);
+ }
+
+ @Setup(Level.Iteration)
+ public void shuffleReadOrder()
+ {
+ ArrayList<Integer> asList = new ArrayList<>(readOrder.length);
+ for (int i : readOrder) {
+ asList.add(i);
+ }
+
+ Collections.shuffle(asList);
+
+ for (int i = 0; i < readOrder.length; i++) {
+ readOrder[i] = asList.get(i);
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void soloWriter()
+ {
+ // load ALL input data to an empty index; duplicates will be present / should be ignored
+ for (String data : inputData) {
+ emptyIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ @Group("soloReader")
+ public void soloReader(Blackhole blackhole)
+ {
+ // read ALL elements from a fully loaded index
+ for (int i : readOrder) {
+ Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i});
+ blackhole.consume(result);
+ }
+ }
+
+ // parallel read/write test should simulate what happens when we are (1) ingesting data (aka writing to dictionary)
+ // and also (2) running query (aka reading from dictionary)
+ // the read side should continuously read
+ // the write side should continuously write; but we also need to throw in some random writes too
+ // since our dataset will fill the 'new write' path quickly, so 1-in-50 elements should be new
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ @Group("parallelReadWrite")
+ public void parallelWriter()
+ {
+ int count = 0;
+ // load ALL input data to an empty index; duplicates will be present / should be ignored
+ for (String data : inputData) {
+ fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
+ count++;
+ if (count == 50) {
+ int next = ThreadLocalRandom.current().nextInt(10000);
+ fullIndexer.processRowValsToUnsortedEncodedKeyComponent("xxx-" + next + "yz", true);
+ count = 0;
+ }
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ @Group("parallelReadWrite")
+ @GroupThreads(3)
+ public void parallelReader(Blackhole blackhole)
+ {
+ for (int i : readOrder) {
+ Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i});
+ blackhole.consume(result);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
index 0bb25f8c78..754934975c 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
@@ -27,7 +27,7 @@ import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.StampedLock;
/**
* Buildable dictionary for some comparable type. Values are unsorted, or rather sorted in the order which they are
@@ -51,41 +51,51 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
private final Object2IntMap<T> valueToId = new Object2IntOpenHashMap<>();
private final List<T> idToValue = new ArrayList<>();
- private final ReentrantReadWriteLock lock;
+ private final StampedLock lock;
public DimensionDictionary(Class<T> cls)
{
this.cls = cls;
- this.lock = new ReentrantReadWriteLock();
+ this.lock = new StampedLock();
valueToId.defaultReturnValue(ABSENT_VALUE_ID);
}
public int getId(@Nullable T value)
{
- lock.readLock().lock();
+ if (value == null) {
+ return idForNull;
+ }
+
+ long stamp = lock.readLock();
try {
- if (value == null) {
- return idForNull;
- }
return valueToId.getInt(value);
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
@Nullable
public T getValue(int id)
{
- lock.readLock().lock();
+ if (id == idForNull) {
+ return null;
+ }
+
+ // optimistic read
+ long stamp = lock.tryOptimisticRead();
+ T output = idToValue.get(id);
+ if (lock.validate(stamp)) {
+ return output;
+ }
+
+ // classic lock
+ stamp = lock.readLock();
try {
- if (id == idForNull) {
- return null;
- }
return idToValue.get(id);
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
@@ -93,27 +103,36 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
{
T[] values = (T[]) Array.newInstance(cls, ids.length);
- lock.readLock().lock();
+ long stamp = lock.readLock();
try {
for (int i = 0; i < ids.length; i++) {
- values[i] = (ids[i] == idForNull) ? null : idToValue.get(ids[i]);
+ values[i] = idToValue.get(ids[i]);
}
return values;
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
public int size()
{
- lock.readLock().lock();
+ // using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present.
+
+ // optimistic read
+ long stamp = lock.tryOptimisticRead();
+ int size = idToValue.size();
+ if (lock.validate(stamp)) {
+ return size;
+ }
+
+ // classic lock
+ stamp = lock.readLock();
try {
- // using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present.
return idToValue.size();
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
@@ -133,56 +152,104 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
public int add(@Nullable T originalValue)
{
- lock.writeLock().lock();
- try {
- if (originalValue == null) {
- if (idForNull == ABSENT_VALUE_ID) {
- idForNull = idToValue.size();
- idToValue.add(null);
+ if (originalValue == null) {
+ return addNull();
+ }
+
+ long stamp = lock.tryReadLock();
+ if (stamp != 0) {
+ try {
+ int existing = valueToId.getInt(originalValue);
+ if (existing >= 0) {
+ return existing;
}
- return idForNull;
}
- int prev = valueToId.getInt(originalValue);
+ finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ long extraSize = 0;
+ if (computeOnHeapSize()) {
+ // Add size of new dim value and 2 references (valueToId and idToValue)
+ extraSize = estimateSizeOfValue(originalValue) + 2L * Long.BYTES;
+ }
+
+ stamp = lock.writeLock();
+ try {
+ final int index = idToValue.size();
+ final int prev = valueToId.putIfAbsent(originalValue, index);
if (prev >= 0) {
return prev;
}
- final int index = idToValue.size();
- valueToId.put(originalValue, index);
- idToValue.add(originalValue);
- if (computeOnHeapSize()) {
- // Add size of new dim value and 2 references (valueToId and idToValue)
- sizeInBytes.addAndGet(estimateSizeOfValue(originalValue) + 2L * Long.BYTES);
- }
+ idToValue.add(originalValue);
+ sizeInBytes.addAndGet(extraSize);
minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
return index;
}
finally {
- lock.writeLock().unlock();
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ private int addNull()
+ {
+ if (idForNull != ABSENT_VALUE_ID) {
+ return idForNull;
+ }
+
+ long stamp = lock.writeLock();
+ try {
+ // check, in case it was changed by another thread
+ if (idForNull == ABSENT_VALUE_ID) {
+ idForNull = idToValue.size();
+ idToValue.add(null);
+ }
+ return idForNull;
+ }
+ finally {
+ lock.unlockWrite(stamp);
}
}
public T getMinValue()
{
- lock.readLock().lock();
+ // optimistic read
+ long stamp = lock.tryOptimisticRead();
+ T output = minValue;
+ if (lock.validate(stamp)) {
+ return output;
+ }
+
+ // classic lock
+ stamp = lock.readLock();
try {
return minValue;
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
public T getMaxValue()
{
- lock.readLock().lock();
+ // optimistic read
+ long stamp = lock.tryOptimisticRead();
+ T output = maxValue;
+ if (lock.validate(stamp)) {
+ return output;
+ }
+
+ // classic lock
+ stamp = lock.readLock();
try {
return maxValue;
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
@@ -193,12 +260,12 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
public SortedDimensionDictionary<T> sort()
{
- lock.readLock().lock();
+ long stamp = lock.readLock();
try {
return new SortedDimensionDictionary<>(idToValue, idToValue.size());
}
finally {
- lock.readLock().unlock();
+ lock.unlockRead(stamp);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org