You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/23 14:57:28 UTC

[pulsar] branch branch-2.10 updated: [improve][broker] Make MessageRedeliveryController work more efficiently (#17804)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 19eb842177f [improve][broker] Make MessageRedeliveryController work more efficiently (#17804)
19eb842177f is described below

commit 19eb842177f9e770ab59c4556188e8fa7468eed5
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Sep 23 22:45:08 2022 +0800

    [improve][broker] Make MessageRedeliveryController work more efficiently (#17804)
    
    (cherry picked from commit c60f895cae6aa3a51a69603c6215c92e358a0e47)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |   8 +
 pulsar-broker/pom.xml                              |   5 +
 .../persistent/MessageRedeliveryController.java    |  41 ++--
 .../utils/ConcurrentBitmapSortedLongPairSet.java   | 143 ++++++++++++++
 .../MessageRedeliveryControllerTest.java           |  22 +--
 .../ConcurrentBitmapSortedLongPairSetTest.java     | 211 +++++++++++++++++++++
 7 files changed, 391 insertions(+), 41 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index f5209320dd7..4783c17c83b 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -538,6 +538,8 @@ The Apache Software License, Version 2.0
     - io.etcd-jetcd-core-0.5.11.jar
   * IPAddress
     - com.github.seancfoley-ipaddress-5.3.3.jar
+  * RoaringBitmap
+    - org.roaringbitmap-RoaringBitmap-0.9.15.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
diff --git a/pom.xml b/pom.xml
index edf1c604a43..41d153c78b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,7 @@ flexible messaging model and an intuitive client API.</description>
     <j2objc-annotations.version>1.3</j2objc-annotations.version>
     <lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
     <dependency-check-maven.version>7.1.0</dependency-check-maven.version>
+    <roaringbitmap.version>0.9.15</roaringbitmap.version>
 
     <!-- Used to configure rename.netty.native. Libs -->
     <rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
@@ -1276,6 +1277,13 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>reload4j</artifactId>
         <version>${reload4j.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>org.roaringbitmap</groupId>
+        <artifactId>RoaringBitmap</artifactId>
+        <version>${roaringbitmap.version}</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index cd331ab4b56..6c41a545d80 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -347,6 +347,11 @@
       <artifactId>hppc</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-functions-api-examples</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 46fa1b2b050..7aaba9a4e10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -22,46 +22,43 @@ import com.google.common.collect.ComparisonChain;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
-import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
-import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
 
 public class MessageRedeliveryController {
-    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentBitmapSortedLongPairSet messagesToRedeliver;
     private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
 
     public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
-        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true);
+        this.messagesToRedeliver = new ConcurrentBitmapSortedLongPairSet();
         this.hashesToBeBlocked = allowOutOfOrderDelivery
                 ? null
                 : ConcurrentLongLongPairHashMap
                     .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
     }
 
-    public boolean add(long ledgerId, long entryId) {
-        return messagesToRedeliver.add(ledgerId, entryId);
+    public void add(long ledgerId, long entryId) {
+        messagesToRedeliver.add(ledgerId, entryId);
     }
 
-    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+    public void add(long ledgerId, long entryId, long stickyKeyHash) {
         if (hashesToBeBlocked != null) {
             hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
         }
-        return messagesToRedeliver.add(ledgerId, entryId);
+        messagesToRedeliver.add(ledgerId, entryId);
     }
 
-    public boolean remove(long ledgerId, long entryId) {
+    public void remove(long ledgerId, long entryId) {
         if (hashesToBeBlocked != null) {
             hashesToBeBlocked.remove(ledgerId, entryId);
         }
-        return messagesToRedeliver.remove(ledgerId, entryId);
+        messagesToRedeliver.remove(ledgerId, entryId);
     }
 
-    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+    public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
         if (hashesToBeBlocked != null) {
             List<LongPair> keysToRemove = new ArrayList<>();
             hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> {
@@ -73,10 +70,7 @@ public class MessageRedeliveryController {
             keysToRemove.forEach(longPair -> hashesToBeBlocked.remove(longPair.first, longPair.second));
             keysToRemove.clear();
         }
-        return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
-            return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
-                    .result() <= 0;
-        });
+        messagesToRedeliver.removeUpTo(markDeleteLedgerId, markDeleteEntryId + 1);
     }
 
     public boolean isEmpty() {
@@ -107,17 +101,6 @@ public class MessageRedeliveryController {
     }
 
     public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
-        if (hashesToBeBlocked != null) {
-            // allowOutOfOrderDelivery is false
-            return messagesToRedeliver.items().stream()
-                    .sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first)
-                            .compare(l1.second, l2.second).result())
-                    .limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second))
-                    .collect(Collectors.toCollection(TreeSet::new));
-        } else {
-            // allowOutOfOrderDelivery is true
-            return messagesToRedeliver.items(maxMessagesToRead,
-                    (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
-        }
+        return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
new file mode 100644
index 00000000000..c9f1c65daca
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class ConcurrentBitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void remove(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            if (bitSet != null) {
+                bitSet.remove(item2, item2 + 1);
+                if (bitSet.isEmpty()) {
+                    map.remove(item1, bitSet);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean contains(long item1, long item2) {
+        lock.readLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            return bitSet != null && bitSet.contains(item2, item2 + 1);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public void removeUpTo(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
+            while (firstEntry != null && firstEntry.getKey() <= item1) {
+                if (firstEntry.getKey() < item1) {
+                    map.remove(firstEntry.getKey(), firstEntry.getValue());
+                } else {
+                    RoaringBitmap bitSet = firstEntry.getValue();
+                    if (bitSet != null) {
+                        bitSet.remove(0, item2);
+                        if (bitSet.isEmpty()) {
+                            map.remove(firstEntry.getKey(), bitSet);
+                        }
+                    }
+                    break;
+                }
+                firstEntry = map.firstEntry();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+
+    public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
+        NavigableSet<T> items = new TreeSet<>();
+        lock.readLock().lock();
+        try {
+            for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
+                Iterator<Integer> iterator = entry.getValue().stream().iterator();
+                while (iterator.hasNext() && items.size() < numberOfItems) {
+                    items.add(longPairConverter.apply(entry.getKey(), iterator.next()));
+                }
+                if (items.size() == numberOfItems) {
+                    break;
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+        return items;
+    }
+
+    public boolean isEmpty() {
+        lock.readLock().lock();
+        try {
+            return map.isEmpty() || map.values().stream().allMatch(RoaringBitmap::isEmpty);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public void clear() {
+        lock.writeLock().lock();
+        try {
+            map.clear();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public int size() {
+        lock.readLock().lock();
+        try {
+            return map.isEmpty() ? 0 : map.values().stream().mapToInt(RoaringBitmap::getCardinality).sum();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 478677a25e4..3cd6fc23de7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,8 +30,8 @@ import java.lang.reflect.Field;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
 import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.pulsar.common.util.collections.LongPairSet;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -48,7 +48,8 @@ public class MessageRedeliveryControllerTest {
 
         Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver");
         messagesToRedeliverField.setAccessible(true);
-        LongPairSet messagesToRedeliver = (LongPairSet) messagesToRedeliverField.get(controller);
+        ConcurrentBitmapSortedLongPairSet messagesToRedeliver =
+                (ConcurrentBitmapSortedLongPairSet) messagesToRedeliverField.get(controller);
 
         Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked");
         hashesToBeBlockedField.setAccessible(true);
@@ -67,9 +68,8 @@ public class MessageRedeliveryControllerTest {
             assertEquals(hashesToBeBlocked.size(), 0);
         }
 
-        assertTrue(controller.add(1, 1));
-        assertTrue(controller.add(1, 2));
-        assertFalse(controller.add(1, 1));
+        controller.add(1, 1);
+        controller.add(1, 2);
 
         assertFalse(controller.isEmpty());
         assertEquals(messagesToRedeliver.size(), 2);
@@ -81,9 +81,8 @@ public class MessageRedeliveryControllerTest {
             assertFalse(hashesToBeBlocked.containsKey(1, 2));
         }
 
-        assertTrue(controller.remove(1, 1));
-        assertTrue(controller.remove(1, 2));
-        assertFalse(controller.remove(1, 1));
+        controller.remove(1, 1);
+        controller.remove(1, 2);
 
         assertTrue(controller.isEmpty());
         assertEquals(messagesToRedeliver.size(), 0);
@@ -93,10 +92,9 @@ public class MessageRedeliveryControllerTest {
             assertEquals(hashesToBeBlocked.size(), 0);
         }
 
-        assertTrue(controller.add(2, 1, 100));
-        assertTrue(controller.add(2, 2, 101));
-        assertTrue(controller.add(2, 3, 101));
-        assertFalse(controller.add(2, 1, 100));
+        controller.add(2, 1, 100);
+        controller.add(2, 2, 101);
+        controller.add(2, 3, 101);
 
         assertFalse(controller.isEmpty());
         assertEquals(messagesToRedeliver.size(), 3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
new file mode 100644
index 00000000000..3c53fc159d0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+@Test(groups = "utils")
+public class ConcurrentBitmapSortedLongPairSetTest {
+
+    @Test
+    public void testAdd() {
+        ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+        int items = 10;
+        for (int i = 0; i < items; i++) {
+            set.add(1, i);
+        }
+        assertEquals(set.size(), items);
+
+        for (int i = 0; i < items; i++) {
+            set.add(2, i);
+        }
+        assertEquals(set.size(), items * 2);
+
+        for (int i = 0; i < items; i++) {
+            set.add(2, i);
+        }
+        assertEquals(set.size(), items * 2);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+        int items = 10;
+        for (int i = 0; i < items; i++) {
+            set.add(1, i);
+        }
+
+        for (int i = 0; i < items / 2; i++) {
+            set.remove(1, i);
+        }
+        assertEquals(set.size(), items / 2);
+
+        for (int i = 0; i < items / 2; i++) {
+            set.remove(2, i);
+        }
+        assertEquals(set.size(), items / 2);
+
+        for (int i = 0; i < items / 2; i++) {
+            set.remove(1, i + 10000);
+        }
+        assertEquals(set.size(), items / 2);
+
+        for (int i = 0; i < items / 2; i++) {
+            set.remove(1, i + items / 2);
+        }
+        assertEquals(set.size(), 0);
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testContains() {
+        ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+        assertFalse(set.contains(1, 1));
+
+        int items = 10;
+        for (int i = 0; i < items; i++) {
+            set.add(1, i);
+        }
+
+        for (int i = 0; i < items; i++) {
+            assertTrue(set.contains(1, i));
+        }
+
+        assertFalse(set.contains(1, 10000));
+    }
+
+    @Test
+    public void testRemoveUpTo() {
+        ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+        set.removeUpTo(0, 1000);
+        set.removeUpTo(10, 10000);
+        assertTrue(set.isEmpty());
+
+        set.add(1, 0);
+
+        int items = 10;
+        for (int i = 0; i < items; i++) {
+            set.add(1, i);
+        }
+
+        set.removeUpTo(1, 5);
+        assertFalse(set.isEmpty());
+        assertEquals(set.size(), 5);
+
+        for (int i = 5; i < items; i++) {
+            assertTrue(set.contains(1, i));
+        }
+
+        set.removeUpTo(2, 0);
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testItems() {
+        ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+        Set<ConcurrentLongPairSet.LongPair> items = set.items(10, ConcurrentLongPairSet.LongPair::new);
+        assertEquals(items.size(), 0);
+        for (int i = 0; i < 100; i++) {
+            set.add(1, i);
+            set.add(2, i);
+            set.add(5, i);
+        }
+        for (int i = 0; i < 100; i++) {
+            set.add(1, i + 1000);
+            set.add(2, i + 1000);
+            set.add(5, i + 1000);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            set.add(1, i + 500);
+            set.add(2, i + 500);
+            set.add(5, i + 500);
+        }
+        assertEquals(set.size(), 900);
+        assertFalse(set.isEmpty());
+        items = set.items(10, ConcurrentLongPairSet.LongPair::new);
+        assertEquals(items.size(), 10);
+        ConcurrentLongPairSet.LongPair last = null;
+        for (ConcurrentLongPairSet.LongPair item : items) {
+            if (last != null) {
+                assertTrue(item.compareTo(last) > 0);
+            }
+            last = item;
+        }
+
+        items = set.items(900, ConcurrentLongPairSet.LongPair::new);
+        assertEquals(items.size(), 900);
+        last = null;
+        for (ConcurrentLongPairSet.LongPair item : items) {
+            if (last != null) {
+                assertTrue(item.compareTo(last) > 0);
+            }
+            last = item;
+        }
+
+        items = set.items(1000, ConcurrentLongPairSet.LongPair::new);
+        assertEquals(items.size(), 900);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 8;
+        final int N = 1000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    int key = random.nextInt();
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+                    key = Math.abs(key);
+                    set.add(key, key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(set.size(), N * nThreads);
+    }
+}