You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/07/27 08:16:13 UTC

[bookkeeper] branch master updated: Replace guava multimap in PCBC with custom impl

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ec8231  Replace guava multimap in PCBC with custom impl
3ec8231 is described below

commit 3ec8231c58a2ebe50c0c6a082e2feab231540116
Author: Ivan Kelly <iv...@ivankelly.net>
AuthorDate: Fri Jul 27 10:16:07 2018 +0200

    Replace guava multimap in PCBC with custom impl
    
    For a long time PerChannelBookieClient has used guava
    LinkedListMultiMap to store conflicting V2 completion keys and
    values. This is problematic though. Completion keys are pooled
    objects. When a key-value pair is stored in a LinkedListMultiMap, if
    it is the first value for that key, a collection is created for the
    values, and added to a top-level map using the key, and then the key
    and the value are added to the collection. When a second value is
    added for the same key, the key and value are simply added to the
    collection. The problem occurs when the first key is removed. PBCB
    will recycle the key object, but this object is still being used in
    the multimap in the top-level map. This causes all sorts of fun like
    NullPointerException and IllegalStateException.
    
    Because of this, this patch introduces a very simple multimap
    implementation that only stores the key one time (in the collection)
    and uses the hashCode of the key to separate the collections into
    buckets. It's pretty inefficient, but this code it only hit in the
    rare case where a client is trying to read or write the same entry
    from the same ledger more than once at the same time.
    
    Author: Ivan Kelly <iv...@ivankelly.net>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1569 from ivankelly/conc-test-flake
---
 .../bookkeeper/proto/PerChannelBookieClient.java   | 57 ++++---------
 .../util/collections/SynchronizedHashMultiMap.java | 83 ++++++++++++++++++
 .../collections/SynchronizedHashMultiMapTest.java  | 99 ++++++++++++++++++++++
 3 files changed, 197 insertions(+), 42 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index ba704e4..8600797 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,8 +21,6 @@ package org.apache.bookkeeper.proto;
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
@@ -71,8 +69,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -131,6 +129,7 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
+import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,8 +168,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     // Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
     // read request for the same ledgerId/entryId
-    private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
-        LinkedListMultimap.create();
+    private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
+        new SynchronizedHashMultiMap<>();
 
     private final StatsLogger statsLogger;
     private final OpStatsLogger readEntryOpLogger;
@@ -867,16 +866,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     public void checkTimeoutOnPendingOperations() {
         int timedOutOperations = completionObjects.removeIf(timeoutCheck);
 
-        synchronized (this) {
-            Iterator<CompletionValue> iterator = completionObjectsV2Conflicts.values().iterator();
-            while (iterator.hasNext()) {
-                CompletionValue value = iterator.next();
-                if (value.maybeTimeout()) {
-                    ++timedOutOperations;
-                    iterator.remove();
-                }
-            }
-        }
+        timedOutOperations += completionObjectsV2Conflicts.removeIf(timeoutCheck);
 
         if (timedOutOperations > 0) {
             LOG.info("Timed-out {} operations to channel {} for {}",
@@ -1020,6 +1010,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         CompletionValue completion = completionObjects.remove(key);
         if (completion != null) {
             completion.errorOut();
+        } else {
+            // If there's no completion object here, try in the multimap
+            completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
         }
     }
 
@@ -1032,14 +1025,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             completion.errorOut(rc);
         } else {
             // If there's no completion object here, try in the multimap
-            synchronized (completionObjectsV2Conflicts) {
-                if (completionObjectsV2Conflicts.containsKey(key)) {
-                    completion = completionObjectsV2Conflicts.get(key).get(0);
-                    completionObjectsV2Conflicts.remove(key, completion);
-
-                    completion.errorOut(rc);
-                }
-            }
+            completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
         }
     }
 
@@ -1068,16 +1054,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
      */
 
     void errorOutOutstandingEntries(int rc) {
-        // DO NOT rewrite these using Map.Entry iterations. We want to iterate
-        // on keys and see if we are successfully able to remove the key from
-        // the map. Because the add and the read methods also do the same thing
-        // in case they get a write failure on the socket. The one who
-        // successfully removes the key from the map is the one responsible for
-        // calling the application callback.
-        for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
-            while (completionObjectsV2Conflicts.get(key).size() > 0) {
-                errorOut(key, rc);
-            }
+        Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
+        while (multikey.isPresent()) {
+            multikey.ifPresent(k -> errorOut(k, rc));
+            multikey = completionObjectsV2Conflicts.getAnyKey();
         }
         for (CompletionKey key : completionObjects.keys()) {
             errorOut(key, rc);
@@ -2074,9 +2054,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         CompletionValue existingValue = completionObjects.putIfAbsent(key, value);
         if (existingValue != null) { // will only happen for V2 keys, as V3 have unique txnid
             // There's a pending read request on same ledger/entry. Use the multimap to track all of them
-            synchronized (completionObjectsV2Conflicts) {
-                completionObjectsV2Conflicts.put(key, value);
-            }
+            completionObjectsV2Conflicts.put(key, value);
         }
     }
 
@@ -2084,12 +2062,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         CompletionValue completionValue = completionObjects.remove(key);
         if (completionValue == null) {
             // If there's no completion object here, try in the multimap
-            synchronized (this) {
-                if (completionObjectsV2Conflicts.containsKey(key)) {
-                    completionValue = completionObjectsV2Conflicts.get(key).get(0);
-                    completionObjectsV2Conflicts.remove(key, completionValue);
-                }
-            }
+            completionValue = completionObjectsV2Conflicts.removeAny(key).orElse(null);
         }
         return completionValue;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java
new file mode 100644
index 0000000..6e6e3c1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Simple multimap implementation that only stores key reference once.
+ *
+ * <p>Implementation is aimed at storing PerChannelBookieClient completions when there
+ * are duplicates. If the key is a pooled object, it must not exist once the value
+ * has been removed from the map, which can happen with guava multimap implemenations.
+ *
+ * <p>With this map is implemented with pretty heavy locking, but this shouldn't be an
+ * issue as the multimap only needs to be used in rare cases, i.e. when a user tries
+ * to read or the same entry twice at the same time. This class should *NOT*  be used
+ * in critical path code.
+ *
+ * <p>A unique key-value pair will only be stored once.
+ */
+public class SynchronizedHashMultiMap<K, V> {
+
+    HashMap<Integer, Set<Pair<K, V>>> map = new HashMap<>();
+
+    public synchronized void put(K k, V v) {
+        map.computeIfAbsent(k.hashCode(), (ignore) -> new HashSet<>()).add(Pair.of(k, v));
+    }
+
+    public synchronized Optional<K> getAnyKey() {
+        return map.values().stream().findAny().flatMap(pairs -> pairs.stream().findAny().map(p -> p.getLeft()));
+    }
+
+    public synchronized Optional<V> removeAny(K k) {
+        Set<Pair<K, V>> set = map.getOrDefault(k.hashCode(), Collections.emptySet());
+        Optional<Pair<K, V>> pair = set.stream().filter(p -> p.getLeft().equals(k)).findAny();
+        pair.ifPresent(p -> set.remove(p));
+        return pair.map(p -> p.getRight());
+    }
+
+    public synchronized int removeIf(BiPredicate<K, V> predicate) {
+        int removedSum = map.values().stream().mapToInt(
+                pairs -> {
+                    int removed = 0;
+                    // Can't use removeIf because we need the count
+                    Iterator<Pair<K, V>> iter = pairs.iterator();
+                    while (iter.hasNext()) {
+                        Pair<K, V> kv = iter.next();
+                        if (predicate.test(kv.getLeft(), kv.getRight())) {
+                            iter.remove();
+                            removed++;
+                        }
+                    }
+                    return removed;
+                }).sum();
+        map.values().removeIf((s) -> s.isEmpty());
+        return removedSum;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java
new file mode 100644
index 0000000..f9ab747
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for SynchronizedHashMultiMap.
+ */
+public class SynchronizedHashMultiMapTest {
+    @Test
+    public void testGetAnyKey() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertFalse(map.getAnyKey().isPresent());
+
+        map.put(1, 2);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));
+
+        map.put(1, 3);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));
+
+        map.put(2, 4);
+        int res = map.getAnyKey().get();
+        Assert.assertTrue(res == 1 || res == 2);
+
+        map.removeIf((k, v) -> k == 1);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(2));
+    }
+
+    @Test
+    public void testRemoveAny() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertFalse(map.removeAny(1).isPresent());
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+        map.put(2, 4);
+
+        Optional<Integer> v = map.removeAny(1);
+        int firstVal = v.get();
+        Assert.assertTrue(firstVal == 2 || firstVal == 3);
+
+        v = map.removeAny(1);
+        int secondVal = v.get();
+        Assert.assertTrue(secondVal == 2 || secondVal == 3);
+        Assert.assertNotEquals(secondVal, firstVal);
+
+        v = map.removeAny(2);
+        Assert.assertTrue(v.isPresent());
+        Assert.assertEquals(v.get(), Integer.valueOf(4));
+
+        Assert.assertFalse(map.removeAny(1).isPresent());
+        Assert.assertFalse(map.removeAny(2).isPresent());
+        Assert.assertFalse(map.removeAny(3).isPresent());
+    }
+
+    @Test
+    public void testRemoveIf() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertEquals(map.removeIf((k, v) -> true), 0);
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+        map.put(2, 4);
+
+        Assert.assertEquals(map.removeIf((k, v) -> v == 4), 1);
+        Assert.assertEquals(map.removeIf((k, v) -> k == 1), 2);
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+
+        Assert.assertEquals(map.removeIf((k, v) -> false), 0);
+        Assert.assertEquals(map.removeIf((k, v) -> true), 3);
+    }
+}