You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/07/27 08:16:16 UTC

[GitHub] eolivelli closed pull request #1569: Replace guava multimap in PCBC with custom impl

eolivelli closed pull request #1569: Replace guava multimap in PCBC with custom impl
URL: https://github.com/apache/bookkeeper/pull/1569
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index ba704e4be7..860079737f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,8 +21,6 @@
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
@@ -71,8 +69,8 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -131,6 +129,7 @@
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
+import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,8 +168,8 @@
 
     // Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
     // read request for the same ledgerId/entryId
-    private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
-        LinkedListMultimap.create();
+    private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
+        new SynchronizedHashMultiMap<>();
 
     private final StatsLogger statsLogger;
     private final OpStatsLogger readEntryOpLogger;
@@ -867,16 +866,7 @@ public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object
     public void checkTimeoutOnPendingOperations() {
         int timedOutOperations = completionObjects.removeIf(timeoutCheck);
 
-        synchronized (this) {
-            Iterator<CompletionValue> iterator = completionObjectsV2Conflicts.values().iterator();
-            while (iterator.hasNext()) {
-                CompletionValue value = iterator.next();
-                if (value.maybeTimeout()) {
-                    ++timedOutOperations;
-                    iterator.remove();
-                }
-            }
-        }
+        timedOutOperations += completionObjectsV2Conflicts.removeIf(timeoutCheck);
 
         if (timedOutOperations > 0) {
             LOG.info("Timed-out {} operations to channel {} for {}",
@@ -1020,6 +1010,9 @@ void errorOut(final CompletionKey key) {
         CompletionValue completion = completionObjects.remove(key);
         if (completion != null) {
             completion.errorOut();
+        } else {
+            // If there's no completion object here, try in the multimap
+            completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
         }
     }
 
@@ -1032,14 +1025,7 @@ void errorOut(final CompletionKey key, final int rc) {
             completion.errorOut(rc);
         } else {
             // If there's no completion object here, try in the multimap
-            synchronized (completionObjectsV2Conflicts) {
-                if (completionObjectsV2Conflicts.containsKey(key)) {
-                    completion = completionObjectsV2Conflicts.get(key).get(0);
-                    completionObjectsV2Conflicts.remove(key, completion);
-
-                    completion.errorOut(rc);
-                }
-            }
+            completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
         }
     }
 
@@ -1068,16 +1054,10 @@ void errorOutPendingOps(int rc) {
      */
 
     void errorOutOutstandingEntries(int rc) {
-        // DO NOT rewrite these using Map.Entry iterations. We want to iterate
-        // on keys and see if we are successfully able to remove the key from
-        // the map. Because the add and the read methods also do the same thing
-        // in case they get a write failure on the socket. The one who
-        // successfully removes the key from the map is the one responsible for
-        // calling the application callback.
-        for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
-            while (completionObjectsV2Conflicts.get(key).size() > 0) {
-                errorOut(key, rc);
-            }
+        Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
+        while (multikey.isPresent()) {
+            multikey.ifPresent(k -> errorOut(k, rc));
+            multikey = completionObjectsV2Conflicts.getAnyKey();
         }
         for (CompletionKey key : completionObjects.keys()) {
             errorOut(key, rc);
@@ -2074,9 +2054,7 @@ private void putCompletionKeyValue(CompletionKey key, CompletionValue value) {
         CompletionValue existingValue = completionObjects.putIfAbsent(key, value);
         if (existingValue != null) { // will only happen for V2 keys, as V3 have unique txnid
             // There's a pending read request on same ledger/entry. Use the multimap to track all of them
-            synchronized (completionObjectsV2Conflicts) {
-                completionObjectsV2Conflicts.put(key, value);
-            }
+            completionObjectsV2Conflicts.put(key, value);
         }
     }
 
@@ -2084,12 +2062,7 @@ private CompletionValue getCompletionValue(CompletionKey key) {
         CompletionValue completionValue = completionObjects.remove(key);
         if (completionValue == null) {
             // If there's no completion object here, try in the multimap
-            synchronized (this) {
-                if (completionObjectsV2Conflicts.containsKey(key)) {
-                    completionValue = completionObjectsV2Conflicts.get(key).get(0);
-                    completionObjectsV2Conflicts.remove(key, completionValue);
-                }
-            }
+            completionValue = completionObjectsV2Conflicts.removeAny(key).orElse(null);
         }
         return completionValue;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java
new file mode 100644
index 0000000000..6e6e3c189a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMap.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Simple multimap implementation that only stores key reference once.
+ *
+ * <p>Implementation is aimed at storing PerChannelBookieClient completions when there
+ * are duplicates. If the key is a pooled object, it must not exist once the value
+ * has been removed from the map, which can happen with guava multimap implemenations.
+ *
+ * <p>With this map is implemented with pretty heavy locking, but this shouldn't be an
+ * issue as the multimap only needs to be used in rare cases, i.e. when a user tries
+ * to read or the same entry twice at the same time. This class should *NOT*  be used
+ * in critical path code.
+ *
+ * <p>A unique key-value pair will only be stored once.
+ */
+public class SynchronizedHashMultiMap<K, V> {
+
+    HashMap<Integer, Set<Pair<K, V>>> map = new HashMap<>();
+
+    public synchronized void put(K k, V v) {
+        map.computeIfAbsent(k.hashCode(), (ignore) -> new HashSet<>()).add(Pair.of(k, v));
+    }
+
+    public synchronized Optional<K> getAnyKey() {
+        return map.values().stream().findAny().flatMap(pairs -> pairs.stream().findAny().map(p -> p.getLeft()));
+    }
+
+    public synchronized Optional<V> removeAny(K k) {
+        Set<Pair<K, V>> set = map.getOrDefault(k.hashCode(), Collections.emptySet());
+        Optional<Pair<K, V>> pair = set.stream().filter(p -> p.getLeft().equals(k)).findAny();
+        pair.ifPresent(p -> set.remove(p));
+        return pair.map(p -> p.getRight());
+    }
+
+    public synchronized int removeIf(BiPredicate<K, V> predicate) {
+        int removedSum = map.values().stream().mapToInt(
+                pairs -> {
+                    int removed = 0;
+                    // Can't use removeIf because we need the count
+                    Iterator<Pair<K, V>> iter = pairs.iterator();
+                    while (iter.hasNext()) {
+                        Pair<K, V> kv = iter.next();
+                        if (predicate.test(kv.getLeft(), kv.getRight())) {
+                            iter.remove();
+                            removed++;
+                        }
+                    }
+                    return removed;
+                }).sum();
+        map.values().removeIf((s) -> s.isEmpty());
+        return removedSum;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java
new file mode 100644
index 0000000000..f9ab747ee6
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/SynchronizedHashMultiMapTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for SynchronizedHashMultiMap.
+ */
+public class SynchronizedHashMultiMapTest {
+    @Test
+    public void testGetAnyKey() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertFalse(map.getAnyKey().isPresent());
+
+        map.put(1, 2);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));
+
+        map.put(1, 3);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(1));
+
+        map.put(2, 4);
+        int res = map.getAnyKey().get();
+        Assert.assertTrue(res == 1 || res == 2);
+
+        map.removeIf((k, v) -> k == 1);
+        Assert.assertEquals(map.getAnyKey().get(), Integer.valueOf(2));
+    }
+
+    @Test
+    public void testRemoveAny() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertFalse(map.removeAny(1).isPresent());
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+        map.put(2, 4);
+
+        Optional<Integer> v = map.removeAny(1);
+        int firstVal = v.get();
+        Assert.assertTrue(firstVal == 2 || firstVal == 3);
+
+        v = map.removeAny(1);
+        int secondVal = v.get();
+        Assert.assertTrue(secondVal == 2 || secondVal == 3);
+        Assert.assertNotEquals(secondVal, firstVal);
+
+        v = map.removeAny(2);
+        Assert.assertTrue(v.isPresent());
+        Assert.assertEquals(v.get(), Integer.valueOf(4));
+
+        Assert.assertFalse(map.removeAny(1).isPresent());
+        Assert.assertFalse(map.removeAny(2).isPresent());
+        Assert.assertFalse(map.removeAny(3).isPresent());
+    }
+
+    @Test
+    public void testRemoveIf() {
+        SynchronizedHashMultiMap<Integer, Integer> map = new SynchronizedHashMultiMap<>();
+        Assert.assertEquals(map.removeIf((k, v) -> true), 0);
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+        map.put(2, 4);
+
+        Assert.assertEquals(map.removeIf((k, v) -> v == 4), 1);
+        Assert.assertEquals(map.removeIf((k, v) -> k == 1), 2);
+
+        map.put(1, 2);
+        map.put(1, 3);
+        map.put(2, 4);
+
+        Assert.assertEquals(map.removeIf((k, v) -> false), 0);
+        Assert.assertEquals(map.removeIf((k, v) -> true), 3);
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services