You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/22 16:26:34 UTC

[bookkeeper] branch branch-4.7 updated: Replace guava multimap in PCBC with custom impl (Cherry-pick)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 83d3abe  Replace guava multimap in PCBC with custom impl (Cherry-pick)
83d3abe is described below

commit 83d3abe56146b3000aa3444d30c5af2b25184d86
Author: Ivan Kelly <iv...@ivankelly.net>
AuthorDate: Wed Aug 22 09:26:24 2018 -0700

    Replace guava multimap in PCBC with custom impl (Cherry-pick)
    
    Descriptions of the changes in this PR:
    
    (cherry-pick #1569)
    
    For a long time PerChannelBookieClient has used guava
    LinkedListMultiMap to store conflicting V2 completion keys and
    values. This is problematic though. Completion keys are pooled
    objects. When a key-value pair is stored in a LinkedListMultiMap, if
    it is the first value for that key, a collection is created for the
    values, and added to a top-level map using the key, and then the key
    and the value are added to the collection. When a second value is
    added for the same key, the key and value are simply added to the
    collection. The problem occurs when the first key is removed. PBCB
    will recycle the key object, but this object is still being used in
    the multimap in the top-level map. This causes all sorts of fun like
    NullPointerException and IllegalStateException.
    
    Because of this, this patch introduces a very simple multimap
    implementation that only stores the key one time (in the collection)
    and uses the hashCode of the key to separate the collections into
    buckets. It's pretty inefficient, but this code it only hit in the
    rare case where a client is trying to read or write the same entry
    from the same ledger more than once at the same time.
    
    Author: Ivan Kelly <ivanivankelly.net>
    
    Reviewers: Enrico Olivelli <eolivelligmail.com>
    
    This closes #1569 from ivankelly/conc-test-flake
    
    Master Issue: #1569
    
    Author: Ivan Kelly <iv...@ivankelly.net>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #1618 from sijie/cherry-pick-pcbc
---
 .../bookkeeper/proto/PerChannelBookieClient.java   | 57 ++++---------
 .../util/collections/SynchronizedHashMultiMap.java | 83 ++++++++++++++++++
 .../collections/SynchronizedHashMultiMapTest.java  | 99 ++++++++++++++++++++++
 3 files changed, 197 insertions(+), 42 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 10fc253..b03b77a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,8 +21,6 @@ package org.apache.bookkeeper.proto;
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
@@ -70,8 +68,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -125,6 +123,7 @@ import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
+import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,8 +162,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     // Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
     // read request for the same ledgerId/entryId
-    private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
-        LinkedListMultimap.create();
+    private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
+        new SynchronizedHashMultiMap<>();
 
     private final StatsLogger statsLogger;
     private final OpStatsLogger readEntryOpLogger;
@@ -760,9 +759,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
         if (existingValue != null) {
             // There's a pending read request on same ledger/entry. Use the multimap to track all of them
-            synchronized (completionObjectsV2Conflicts) {
-                completionObjectsV2Conflicts.put(completionKey, readCompletion);
-            }
+            completionObjectsV2Conflicts.put(completionKey, readCompletion);
         }
 
         writeAndFlush(channel, completionKey, request);
@@ -799,16 +796,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     public void checkTimeoutOnPendingOperations() {
         int timedOutOperations = completionObjects.removeIf(timeoutCheck);
 
-        synchronized (this) {
-            Iterator<CompletionValue> iterator = completionObjectsV2Conflicts.values().iterator();
-            while (iterator.hasNext()) {
-                CompletionValue value = iterator.next();
-                if (value.maybeTimeout()) {
-                    ++timedOutOperations;
-                    iterator.remove();
-                }
-            }
-        }
+        timedOutOperations += completionObjectsV2Conflicts.removeIf(timeoutCheck);
 
         if (timedOutOperations > 0) {
             LOG.info("Timed-out {} operations to channel {} for {}",
@@ -932,6 +920,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         CompletionValue completion = completionObjects.remove(key);
         if (completion != null) {
             completion.errorOut();
+        } else {
+            // If there's no completion object here, try in the multimap
+            completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
         }
     }
 
@@ -944,14 +935,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             completion.errorOut(rc);
         } else {
             // If there's no completion object here, try in the multimap
-            synchronized (completionObjectsV2Conflicts) {
-                if (completionObjectsV2Conflicts.containsKey(key)) {
-                    completion = completionObjectsV2Conflicts.get(key).get(0);
-                    completionObjectsV2Conflicts.remove(key, completion);
-
-                    completion.errorOut(rc);
-                }
-            }
+            completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
         }
     }
 
@@ -980,16 +964,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
      */
 
     void errorOutOutstandingEntries(int rc) {
-        // DO NOT rewrite these using Map.Entry iterations. We want to iterate
-        // on keys and see if we are successfully able to remove the key from
-        // the map. Because the add and the read methods also do the same thing
-        // in case they get a write failure on the socket. The one who
-        // successfully removes the key from the map is the one responsible for
-        // calling the application callback.
-        for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
-            while (completionObjectsV2Conflicts.get(key).size() > 0) {
-                errorOut(key, rc);
-            }
+        Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
+        while (multikey.isPresent()) {
+            multikey.ifPresent(k -> errorOut(k, rc));
+            multikey = completionObjectsV2Conflicts.getAnyKey();
         }
         for (CompletionKey key : completionObjects.keys()) {
             errorOut(key, rc);
@@ -1110,12 +1088,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         key.release();
         if (completionValue == null) {
             // If there's no completion object here, try in the multimap
-            synchronized (this) {
-                if (completionObjectsV2Conflicts.containsKey(key)) {
-                    completionValue = completionObjectsV2Conflicts.get(key).get(0);
-                    completionObjectsV2Conflicts.remove(key, completionValue);
-                }
-            }
+            completionValue = completionObjectsV2Conflicts.removeAny(key).orElse(null);
         }
 
         if (null == completionValue) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java
new file mode 100644
index 0000000..6e6e3c1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Simple multimap implementation that only stores key reference once.
+ *
+ * <p>Implementation is aimed at storing PerChannelBookieClient completions when there
+ * are duplicates. If the key is a pooled object, it must not exist once the value
+ * has been removed from the map, which can happen with guava multimap implemenations.
+ *
+ * <p>With this map is implemented with pretty heavy locking, but this shouldn't be an
+ * issue as the multimap only needs to be used in rare cases, i.e. when a user tries
+ * to read or the same entry twice at the same time. This class should *NOT*  be used
+ * in critical path code.
+ *
+ * <p>A unique key-value pair will only be stored once.
+ */
+public class SynchronizedHashMultiMap<K, V> {
+
+    HashMap<Integer, Set<Pair<K, V>>> map = new HashMap<>();
+
+    public synchronized void put(K k, V v) {
+        map.computeIfAbsent(k.hashCode(), (ignore) -> new HashSet<>()).add(Pair.of(k, v));
+    }
+
+    public synchronized Optional<K> getAnyKey() {
+        return map.values().stream().findAny().flatMap(pairs -> pairs.stream().findAny().map(p -> p.getLeft()));
+    }
+
+    public synchronized Optional<V> removeAny(K k) {
+        Set<Pair<K, V>> set = map.getOrDefault(k.hashCode(), Collections.emptySet());
+        Optional<Pair<K, V>> pair = set.stream().filter(p -> p.getLeft().equals(k)).findAny();
+        pair.ifPresent(p -> set.remove(p));
+        return pair.map(p -> p.getRight());
+    }
+
+    public synchronized int removeIf(BiPredicate<K, V> predicate) {
+        int removedSum = map.values().stream().mapToInt(
+                pairs -> {
+                    int removed = 0;
+                    // Can't use removeIf because we need the count
+                    Iterator<Pair<K, V>> iter = pairs.iterator();
+                    while (iter.hasNext()) {
+                        Pair<K, V> kv = iter.next();
+                        if (predicate.test(kv.getLeft(), kv.getRight())) {
+                            iter.remove();
+                            removed++;
+                        }
+                    }
+                    return removed;
+                }).sum();
+        map.values().removeIf((s) -> s.isEmpty());
+        return removedSum;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java
new file mode 100644
index 0000000..f9ab747
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for SynchronizedHashMultiMap.
+ */
+public class SynchronizedHashMultiMapTest {
+    @Test
+    public void testGetAnyKey() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertFalse(map.getAnyKey().isPresent());
+
+        map.put(1, 2);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));
+
+        map.put(1, 3);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));
+
+        map.put(2, 4);
+        int res = map.getAnyKey().get();
+        Assert.assertTrue(res == 1 || res == 2);
+
+        map.removeIf((k, v) -> k == 1);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(2));
+    }
+
+    @Test
+    public void testRemoveAny() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertFalse(map.removeAny(1).isPresent());
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+        map.put(2, 4);
+
+        Optional<Integer> v = map.removeAny(1);
+        int firstVal = v.get();
+        Assert.assertTrue(firstVal == 2 || firstVal == 3);
+
+        v = map.removeAny(1);
+        int secondVal = v.get();
+        Assert.assertTrue(secondVal == 2 || secondVal == 3);
+        Assert.assertNotEquals(secondVal, firstVal);
+
+        v = map.removeAny(2);
+        Assert.assertTrue(v.isPresent());
+        Assert.assertEquals(v.get(), Integer.valueOf(4));
+
+        Assert.assertFalse(map.removeAny(1).isPresent());
+        Assert.assertFalse(map.removeAny(2).isPresent());
+        Assert.assertFalse(map.removeAny(3).isPresent());
+    }
+
+    @Test
+    public void testRemoveIf() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertEquals(map.removeIf((k, v) -> true), 0);
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+        map.put(2, 4);
+
+        Assert.assertEquals(map.removeIf((k, v) -> v == 4), 1);
+        Assert.assertEquals(map.removeIf((k, v) -> k == 1), 2);
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+
+        Assert.assertEquals(map.removeIf((k, v) -> false), 0);
+        Assert.assertEquals(map.removeIf((k, v) -> true), 3);
+    }
+}