You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/05/04 03:55:35 UTC

[solr] 01/01: race condition SizeLimitedDistributedMap

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr16175
in repository https://gitbox.apache.org/repos/asf/solr.git

commit c1ae998e7ad3650229449ff7b2a55ef222ec8b8c
Author: Noble Paul <no...@gmail.com>
AuthorDate: Wed May 4 13:55:24 2022 +1000

    race condition SizeLimitedDistributedMap
---
 .../solr/cloud/SizeLimitedDistributedMap.java      | 11 ++--
 .../solr/cloud/TestSizeLimitedDistributedMap.java  | 61 ++++++++++++++++++++--
 2 files changed, 65 insertions(+), 7 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
index db57ea4980b..2e021a54c53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
@@ -91,9 +91,14 @@ public class SizeLimitedDistributedMap extends DistributedMap {
       for (String child : children) {
         Long id = childToModificationZxid.get(child);
         if (id != null && id <= topElementMzxId) {
-          zookeeper.delete(dir + "/" + child, -1, true);
-          if (onOverflowObserver != null)
-            onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
+          try {
+            zookeeper.delete(dir + "/" + child, -1, true);
+            if (onOverflowObserver != null) {
+              onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
+            }
+          } catch (KeeperException.NoNodeException e) {
+            //Ignore. It's already deleted by another thread
+          }
         }
       }
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
index a086656d13c..5b8f3c3d5cb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
@@ -17,10 +17,13 @@
 
 package org.apache.solr.cloud;
 
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.Callable;
 import org.apache.solr.common.cloud.SolrZkClient;
 
 public class TestSizeLimitedDistributedMap extends TestDistributedMap {
@@ -67,6 +70,56 @@ public class TestSizeLimitedDistributedMap extends TestDistributedMap {
     }
   }
 
+  public void testConcurrentCleanup() throws Exception {
+    final Set<String> expectedKeys = new HashSet<>();
+    final List<String> deletedItems = new LinkedList<>();
+    int numResponsesToStore = TEST_NIGHTLY ? Overseer.NUM_RESPONSES_TO_STORE : 100;
+
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      String path = getAndMakeInitialPath(zkClient);
+      final DistributedMap map = new SizeLimitedDistributedMap(zkClient, path, numResponsesToStore, deletedItems::add);
+      //fill the map to limit first
+      for (int i = 0; i < numResponsesToStore; i++) {
+        map.put("xyz_" + i, new byte[0]);
+      }
+
+      // add more elements concurrently to trigger cleanup
+      final int THREAD_COUNT = Math.min(100, numResponsesToStore);
+      List<Callable> callables = new ArrayList<>();
+      for (int i = 0; i < THREAD_COUNT; i++) {
+        final String key = "xyz_" + (numResponsesToStore + 1);
+        expectedKeys.add(key);
+        callables.add(new Callable() {
+          @Override
+          public void call(Object o) {
+            try {
+              map.put(key, new byte[0]);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      }
+      ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
+      List<Future<Object>> futures = new ArrayList<>();
+      for (Callable callable : callables) {
+        futures.add(executorService.submit(callable));
+      }
+      try {
+        for (Future<Object> future : futures) {
+          future.get(); //none of them should throw exception
+        }
+        for (String expectedKey : expectedKeys) {
+          assertTrue(map.contains(expectedKey));
+        }
+        //there's no guarantees on exactly how many elements will be removed, but it should at least NOT throw exception
+        assertFalse(deletedItems.isEmpty());
+      } finally {
+        executorService.shutdown();
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+      }
+    }
+  }
   protected DistributedMap createMap(SolrZkClient zkClient, String path) {
     return new SizeLimitedDistributedMap(zkClient, path, Overseer.NUM_RESPONSES_TO_STORE, null);
   }