You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/02/01 10:59:24 UTC

[druid] branch master updated: Dimension dictionary reduce locking (#13710)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a3bd89a85 Dimension dictionary reduce locking (#13710)
7a3bd89a85 is described below

commit 7a3bd89a854c73879ccdc51ce35071f81109b24c
Author: Jason Koch <jk...@netflix.com>
AuthorDate: Wed Feb 1 02:59:12 2023 -0800

    Dimension dictionary reduce locking (#13710)
    
    * perf: introduce benchmark for StringDimensionIndexer
    
    jdk11 -- Benchmark                                                       Mode  Cnt      Score     Error  Units
    StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10  30471.552 ±  456.716  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10  18069.863 ±  327.923  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  67676.617 ± 2351.311  us/op
    StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10   1048.079 ±    1.120  us/op
    StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   4629.769 ±   29.353  us/op
    
    * perf: switch DimensionDictionary to StampedLock
    
    jdk11 - Benchmark                                                        Mode  Cnt      Score      Error  Units
    StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10  37958.372 ± 1685.206  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10  31192.232 ± 2755.365  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  58256.791 ± 1998.220  us/op
    StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10   1079.440 ±    1.753  us/op
    StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   4585.690 ±   13.225  us/op
    
    * perf: use optimistic locking in DimensionDictionary
    
    jdk11 - Benchmark                                                        Mode  Cnt      Score     Error  Units
    StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10   6212.366 ± 162.684  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10   1807.235 ± 109.339  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  19427.759 ± 611.692  us/op
    StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10    194.370 ±   1.050  us/op
    StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   2871.423 ±  14.426  us/op
    
    * perf: refactor DimensionDictionary null handling to need less locks
    
    jdk11 - Benchmark                                                        Mode  Cnt      Score      Error  Units
    StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10   6591.619 ±  470.497  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10   1387.338 ±  144.587  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  22204.462 ± 1620.806  us/op
    StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10    204.911 ±    0.459  us/op
    StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   2935.376 ±   12.639  us/op
    
    * perf: refactor DimensionDictionary add handling to do a little less work
    
    jdk11 - Benchmark                                                        Mode  Cnt      Score    Error  Units
    StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10   2914.859 ± 22.519  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10    508.010 ± 14.675  us/op
    StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  10135.408 ± 82.745  us/op
    StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10    205.415 ±  0.158  us/op
    StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   3098.743 ± 23.603  us/op
---
 .../StringDimensionIndexerProcessBenchmark.java    | 170 +++++++++++++++++++++
 .../apache/druid/segment/DimensionDictionary.java  | 149 +++++++++++++-----
 2 files changed, 278 insertions(+), 41 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
new file mode 100644
index 0000000000..0f44b15018
--- /dev/null
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.indexing;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.segment.StringDimensionIndexer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 10)
+public class StringDimensionIndexerProcessBenchmark
+{
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  String[] inputData;
+  StringDimensionIndexer emptyIndexer;
+  StringDimensionIndexer fullIndexer;
+  int[] readOrder;
+
+  @Setup(Level.Trial)
+  public void setup()
+  {
+    // maxNumbers : inputData ratio of 1000:50000 is 1:50, in other words we should expect each element to be 'added'
+    // 50x, the first time it is new, the next 49 times it is an existing entry; also 5% of values will be false
+
+    final int maxNumbers = 1000;
+    final int nullNumbers = 50; // 5%
+    final int validNumbers = (maxNumbers + 1) - nullNumbers;
+
+    // set up dummy input data, and load to indexer
+    inputData = new String[50000];
+    for (int i = 0; i < inputData.length; i++) {
+      int next = ThreadLocalRandom.current().nextInt(maxNumbers);
+      inputData[i] = (next < nullNumbers) ? null : ("abcd-" + next + "-efgh");
+    }
+
+    fullIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false);
+    for (String data : inputData) {
+      fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
+    }
+
+    // set up a random read order
+    readOrder = new int[inputData.length];
+    for (int i = 0; i < readOrder.length; i++) {
+      readOrder[i] = ThreadLocalRandom.current().nextInt(validNumbers);
+    }
+  }
+  @Setup(Level.Iteration)
+  public void setupEmptyIndexer()
+  {
+    emptyIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false);
+  }
+
+  @Setup(Level.Iteration)
+  public void shuffleReadOrder()
+  {
+    ArrayList<Integer> asList = new ArrayList<>(readOrder.length);
+    for (int i : readOrder) {
+      asList.add(i);
+    }
+
+    Collections.shuffle(asList);
+
+    for (int i = 0; i < readOrder.length; i++) {
+      readOrder[i] = asList.get(i);
+    }
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void soloWriter()
+  {
+    // load ALL input data to an empty index; duplicates will be present / should be ignored
+    for (String data : inputData) {
+      emptyIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
+    }
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  @Group("soloReader")
+  public void soloReader(Blackhole blackhole)
+  {
+    // read ALL elements from a fully loaded index
+    for (int i : readOrder) {
+      Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i});
+      blackhole.consume(result);
+    }
+  }
+
+  // parallel read/write test should simulate what happens when we are (1) ingesting data (aka writing to dictionary)
+  // and also (2) running query (aka reading from dictionary)
+  // the read side should continuously read
+  // the write side should continuously write; but we also need to throw in some random writes too
+  // since our dataset will fill the 'new write' path quickly, so 1-in-50 elements should be new
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  @Group("parallelReadWrite")
+  public void parallelWriter()
+  {
+    int count = 0;
+    // load ALL input data to an empty index; duplicates will be present / should be ignored
+    for (String data : inputData) {
+      fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
+      count++;
+      if (count == 50) {
+        int next = ThreadLocalRandom.current().nextInt(10000);
+        fullIndexer.processRowValsToUnsortedEncodedKeyComponent("xxx-" + next + "yz", true);
+        count = 0;
+      }
+    }
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  @Group("parallelReadWrite")
+  @GroupThreads(3)
+  public void parallelReader(Blackhole blackhole)
+  {
+    for (int i : readOrder) {
+      Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i});
+      blackhole.consume(result);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
index 0bb25f8c78..754934975c 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
@@ -27,7 +27,7 @@ import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.StampedLock;
 
 /**
  * Buildable dictionary for some comparable type. Values are unsorted, or rather sorted in the order which they are
@@ -51,41 +51,51 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
   private final Object2IntMap<T> valueToId = new Object2IntOpenHashMap<>();
 
   private final List<T> idToValue = new ArrayList<>();
-  private final ReentrantReadWriteLock lock;
+  private final StampedLock lock;
 
   public DimensionDictionary(Class<T> cls)
   {
     this.cls = cls;
-    this.lock = new ReentrantReadWriteLock();
+    this.lock = new StampedLock();
     valueToId.defaultReturnValue(ABSENT_VALUE_ID);
   }
 
   public int getId(@Nullable T value)
   {
-    lock.readLock().lock();
+    if (value == null) {
+      return idForNull;
+    }
+
+    long stamp = lock.readLock();
     try {
-      if (value == null) {
-        return idForNull;
-      }
       return valueToId.getInt(value);
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 
   @Nullable
   public T getValue(int id)
   {
-    lock.readLock().lock();
+    if (id == idForNull) {
+      return null;
+    }
+
+    // optimistic read
+    long stamp = lock.tryOptimisticRead();
+    T output = idToValue.get(id);
+    if (lock.validate(stamp)) {
+      return output;
+    }
+
+    // classic lock
+    stamp = lock.readLock();
     try {
-      if (id == idForNull) {
-        return null;
-      }
       return idToValue.get(id);
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 
@@ -93,27 +103,36 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
   {
     T[] values = (T[]) Array.newInstance(cls, ids.length);
 
-    lock.readLock().lock();
+    long stamp = lock.readLock();
     try {
       for (int i = 0; i < ids.length; i++) {
-        values[i] = (ids[i] == idForNull) ? null : idToValue.get(ids[i]);
+        values[i] = idToValue.get(ids[i]);
       }
       return values;
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 
   public int size()
   {
-    lock.readLock().lock();
+    // using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present.
+
+    // optimistic read
+    long stamp = lock.tryOptimisticRead();
+    int size = idToValue.size();
+    if (lock.validate(stamp)) {
+      return size;
+    }
+
+    // classic lock
+    stamp = lock.readLock();
     try {
-      // using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present.
       return idToValue.size();
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 
@@ -133,56 +152,104 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
 
   public int add(@Nullable T originalValue)
   {
-    lock.writeLock().lock();
-    try {
-      if (originalValue == null) {
-        if (idForNull == ABSENT_VALUE_ID) {
-          idForNull = idToValue.size();
-          idToValue.add(null);
+    if (originalValue == null) {
+      return addNull();
+    }
+
+    long stamp = lock.tryReadLock();
+    if (stamp != 0) {
+      try {
+        int existing = valueToId.getInt(originalValue);
+        if (existing >= 0) {
+          return existing;
         }
-        return idForNull;
       }
-      int prev = valueToId.getInt(originalValue);
+      finally {
+        lock.unlockRead(stamp);
+      }
+    }
+
+    long extraSize = 0;
+    if (computeOnHeapSize()) {
+      // Add size of new dim value and 2 references (valueToId and idToValue)
+      extraSize = estimateSizeOfValue(originalValue) + 2L * Long.BYTES;
+    }
+
+    stamp = lock.writeLock();
+    try {
+      final int index = idToValue.size();
+      final int prev = valueToId.putIfAbsent(originalValue, index);
       if (prev >= 0) {
         return prev;
       }
-      final int index = idToValue.size();
-      valueToId.put(originalValue, index);
-      idToValue.add(originalValue);
 
-      if (computeOnHeapSize()) {
-        // Add size of new dim value and 2 references (valueToId and idToValue)
-        sizeInBytes.addAndGet(estimateSizeOfValue(originalValue) + 2L * Long.BYTES);
-      }
+      idToValue.add(originalValue);
+      sizeInBytes.addAndGet(extraSize);
 
       minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
       maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
       return index;
     }
     finally {
-      lock.writeLock().unlock();
+      lock.unlockWrite(stamp);
+    }
+  }
+
+  private int addNull()
+  {
+    if (idForNull != ABSENT_VALUE_ID) {
+      return idForNull;
+    }
+
+    long stamp = lock.writeLock();
+    try {
+      // check, in case it was changed by another thread
+      if (idForNull == ABSENT_VALUE_ID) {
+        idForNull = idToValue.size();
+        idToValue.add(null);
+      }
+      return idForNull;
+    }
+    finally {
+      lock.unlockWrite(stamp);
     }
   }
 
   public T getMinValue()
   {
-    lock.readLock().lock();
+    // optimistic read
+    long stamp = lock.tryOptimisticRead();
+    T output = minValue;
+    if (lock.validate(stamp)) {
+      return output;
+    }
+
+    // classic lock
+    stamp = lock.readLock();
     try {
       return minValue;
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 
   public T getMaxValue()
   {
-    lock.readLock().lock();
+    // optimistic read
+    long stamp = lock.tryOptimisticRead();
+    T output = maxValue;
+    if (lock.validate(stamp)) {
+      return output;
+    }
+
+    // classic lock
+    stamp = lock.readLock();
     try {
       return maxValue;
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 
@@ -193,12 +260,12 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
 
   public SortedDimensionDictionary<T> sort()
   {
-    lock.readLock().lock();
+    long stamp = lock.readLock();
     try {
       return new SortedDimensionDictionary<>(idToValue, idToValue.size());
     }
     finally {
-      lock.readLock().unlock();
+      lock.unlockRead(stamp);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org