You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by kr...@apache.org on 2022/05/02 19:10:49 UTC

[solr] branch branch_9x updated: SOLR-16174: Modernize TestBulkSchemaConcurrent (#829)

This is an automated email from the ASF dual-hosted git repository.

krisden pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 23dccff5b57 SOLR-16174: Modernize TestBulkSchemaConcurrent (#829)
23dccff5b57 is described below

commit 23dccff5b579b93bf367d6ad26ca82306e20b37f
Author: Kevin Risden <ri...@users.noreply.github.com>
AuthorDate: Mon May 2 15:05:59 2022 -0400

    SOLR-16174: Modernize TestBulkSchemaConcurrent (#829)
---
 .../solr/schema/TestBulkSchemaConcurrent.java      | 289 +++++++++++----------
 1 file changed, 157 insertions(+), 132 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java b/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java
index 7aaed143e3e..e2d5b5a4257 100644
--- a/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java
+++ b/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java
@@ -27,12 +27,20 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.RestTestHarness;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -41,6 +49,9 @@ import org.slf4j.LoggerFactory;
 public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private static final long TIMEOUT = TEST_NIGHTLY ? 10 : 1; // in seconds
+  private static final int THREAD_COUNT = TEST_NIGHTLY ? 5 : 2;
+
   @BeforeClass
   public static void initSysProperties() {
     System.setProperty("managed.schema.mutable", "true");
@@ -51,43 +62,56 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
     return "solrconfig-managed-schema.xml";
   }
 
+  @Before
+  public void setupTest() {
+    setupRestTestHarnesses();
+  }
+
+  @After
+  public void teardownTest() throws Exception {
+    closeRestTestHarnesses();
+  }
+
   @Test
-  @SuppressWarnings({"unchecked"})
   public void test() throws Exception {
+    final List<List<String>> collectErrors = Collections.synchronizedList(new ArrayList<>());
 
-    final int threadCount = 5;
-    setupRestTestHarnesses();
-    Thread[] threads = new Thread[threadCount];
-    @SuppressWarnings({"rawtypes"})
-    final List<List> collectErrors = Collections.synchronizedList(new ArrayList<>());
+    final ExecutorService executorService =
+        ExecutorUtil.newMDCAwareFixedThreadPool(
+            THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName()));
 
-    for (int i = 0; i < threadCount; i++) {
+    List<Callable<Void>> callees = new ArrayList<>(THREAD_COUNT);
+    for (int i = 0; i < THREAD_COUNT; i++) {
       final int finalI = i;
-      threads[i] =
-          new Thread() {
-            @Override
-            public void run() {
-              @SuppressWarnings({"rawtypes"})
-              ArrayList errs = new ArrayList();
-              collectErrors.add(errs);
-              try {
-                invokeBulkAddCall(finalI, errs);
-                invokeBulkReplaceCall(finalI, errs);
-                invokeBulkDeleteCall(finalI, errs);
-              } catch (Exception e) {
-                e.printStackTrace();
-              }
+      Callable<Void> call =
+          () -> {
+            List<String> errs = new ArrayList<>();
+            collectErrors.add(errs);
+            try {
+              invokeBulkAddCall(finalI, errs);
+              invokeBulkReplaceCall(finalI, errs);
+              invokeBulkDeleteCall(finalI, errs);
+            } catch (InterruptedException interruptedException) {
+              Thread.currentThread().interrupt();
+            } catch (Exception e) {
+              // TODO this might be double logged, but safer to log here anyway
+              log.error("Exception from thread {}", finalI, e);
             }
+            return null;
           };
-
-      threads[i].start();
+      callees.add(call);
     }
 
-    for (Thread thread : threads) thread.join();
+    executorService.invokeAll(callees);
+    executorService.shutdown();
+
+    // TIMEOUT * 3 there are 3 tests - add, replace, delete each running for the length of TIMEOUT
+    assertTrue(
+        "Running for too long...", executorService.awaitTermination(TIMEOUT * 3, TimeUnit.SECONDS));
 
     boolean success = true;
 
-    for (@SuppressWarnings({"rawtypes"}) List e : collectErrors) {
+    for (List<String> e : collectErrors) {
       if (e != null && !e.isEmpty()) {
         success = false;
         log.error("{}", e);
@@ -98,7 +122,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
   }
 
   @SuppressWarnings({"unchecked"})
-  private void invokeBulkAddCall(int seed, ArrayList<String> errs) throws Exception {
+  private void invokeBulkAddCall(int seed, List<String> errs) throws Exception {
     String payload =
         "{\n"
             + "          'add-field' : {\n"
@@ -133,10 +157,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
     payload = payload.replace("replaceDynamicCopyFieldDest", dynamicCopyFldDest);
     payload = payload.replace("myNewFieldTypeName", newFieldTypeName);
 
+    // don't close publisher - gets closed at teardown
     RestTestHarness publisher = randomRestTestHarness(r);
     String response = publisher.post("/schema", SolrTestCaseJ4.json(payload));
-    @SuppressWarnings({"rawtypes"})
-    Map map = (Map) Utils.fromJSONString(response);
+    Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response);
     Object errors = map.get("errors");
     if (errors != null) {
       errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8));
@@ -145,38 +169,38 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
 
     // get another node
     Set<String> errmessages = new HashSet<>();
+    // don't close harness - gets closed at teardown
     RestTestHarness harness = randomRestTestHarness(r);
-    try {
-      long startTime = System.nanoTime();
-      long maxTimeoutMillis = 100000;
-      while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS)
-          < maxTimeoutMillis) {
-        errmessages.clear();
-        @SuppressWarnings({"rawtypes"})
-        Map m = getObj(harness, aField, "fields");
-        if (m == null) errmessages.add(StrUtils.formatString("field {0} not created", aField));
-
-        m = getObj(harness, dynamicFldName, "dynamicFields");
-        if (m == null)
-          errmessages.add(StrUtils.formatString("dynamic field {0} not created", dynamicFldName));
-
-        @SuppressWarnings({"rawtypes"})
-        List l = getSourceCopyFields(harness, aField);
-        if (!checkCopyField(l, aField, dynamicCopyFldDest))
-          errmessages.add(
-              StrUtils.formatString(
-                  "CopyField source={0},dest={1} not created", aField, dynamicCopyFldDest));
-
-        m = getObj(harness, newFieldTypeName, "fieldTypes");
-        if (m == null)
-          errmessages.add(StrUtils.formatString("new type {0}  not created", newFieldTypeName));
-
-        if (errmessages.isEmpty()) break;
-
-        Thread.sleep(10);
+    TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (!timeout.hasTimedOut()) {
+      errmessages.clear();
+      Map<?, ?> m = getObj(harness, aField, "fields");
+      if (m == null) {
+        errmessages.add(StrUtils.formatString("field {0} not created", aField));
       }
-    } finally {
-      harness.close();
+
+      m = getObj(harness, dynamicFldName, "dynamicFields");
+      if (m == null) {
+        errmessages.add(StrUtils.formatString("dynamic field {0} not created", dynamicFldName));
+      }
+
+      List<Map<String, String>> l = getSourceCopyFields(harness, aField);
+      if (!checkCopyField(l, aField, dynamicCopyFldDest)) {
+        errmessages.add(
+            StrUtils.formatString(
+                "CopyField source={0},dest={1} not created", aField, dynamicCopyFldDest));
+      }
+
+      m = getObj(harness, newFieldTypeName, "fieldTypes");
+      if (m == null) {
+        errmessages.add(StrUtils.formatString("new type {0}  not created", newFieldTypeName));
+      }
+
+      if (errmessages.isEmpty()) {
+        break;
+      }
+
+      timeout.sleep(10);
     }
     if (!errmessages.isEmpty()) {
       errs.addAll(errmessages);
@@ -184,7 +208,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
   }
 
   @SuppressWarnings({"unchecked"})
-  private void invokeBulkReplaceCall(int seed, ArrayList<String> errs) throws Exception {
+  private void invokeBulkReplaceCall(int seed, List<String> errs) throws Exception {
     String payload =
         "{\n"
             + "          'replace-field' : {\n"
@@ -213,10 +237,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
     payload = payload.replace("replaceDynamicField", dynamicFldName);
     payload = payload.replace("myNewFieldTypeName", newFieldTypeName);
 
+    // don't close publisher - gets closed at teardown
     RestTestHarness publisher = randomRestTestHarness(r);
     String response = publisher.post("/schema", SolrTestCaseJ4.json(payload));
-    @SuppressWarnings({"rawtypes"})
-    Map map = (Map) Utils.fromJSONString(response);
+    Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response);
     Object errors = map.get("errors");
     if (errors != null) {
       errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8));
@@ -225,41 +249,39 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
 
     // get another node
     Set<String> errmessages = new HashSet<>();
+    // don't close harness - gets closed at teardown
     RestTestHarness harness = randomRestTestHarness(r);
-    try {
-      long startTime = System.nanoTime();
-      long maxTimeoutMillis = 100000;
-      while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS)
-          < maxTimeoutMillis) {
-        errmessages.clear();
-        @SuppressWarnings({"rawtypes"})
-        Map m = getObj(harness, aField, "fields");
-        if (m == null)
-          errmessages.add(StrUtils.formatString("field {0} no longer present", aField));
-
-        m = getObj(harness, dynamicFldName, "dynamicFields");
-        if (m == null)
-          errmessages.add(
-              StrUtils.formatString("dynamic field {0} no longer present", dynamicFldName));
-
-        @SuppressWarnings({"rawtypes"})
-        List l = getSourceCopyFields(harness, aField);
-        if (!checkCopyField(l, aField, dynamicCopyFldDest))
-          errmessages.add(
-              StrUtils.formatString(
-                  "CopyField source={0},dest={1} no longer present", aField, dynamicCopyFldDest));
-
-        m = getObj(harness, newFieldTypeName, "fieldTypes");
-        if (m == null)
-          errmessages.add(
-              StrUtils.formatString("new type {0} no longer present", newFieldTypeName));
-
-        if (errmessages.isEmpty()) break;
-
-        Thread.sleep(10);
+    TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (!timeout.hasTimedOut()) {
+      errmessages.clear();
+      Map<?, ?> m = getObj(harness, aField, "fields");
+      if (m == null) {
+        errmessages.add(StrUtils.formatString("field {0} no longer present", aField));
       }
-    } finally {
-      harness.close();
+
+      m = getObj(harness, dynamicFldName, "dynamicFields");
+      if (m == null) {
+        errmessages.add(
+            StrUtils.formatString("dynamic field {0} no longer present", dynamicFldName));
+      }
+
+      List<Map<String, String>> l = getSourceCopyFields(harness, aField);
+      if (!checkCopyField(l, aField, dynamicCopyFldDest)) {
+        errmessages.add(
+            StrUtils.formatString(
+                "CopyField source={0},dest={1} no longer present", aField, dynamicCopyFldDest));
+      }
+
+      m = getObj(harness, newFieldTypeName, "fieldTypes");
+      if (m == null) {
+        errmessages.add(StrUtils.formatString("new type {0} no longer present", newFieldTypeName));
+      }
+
+      if (errmessages.isEmpty()) {
+        break;
+      }
+
+      timeout.sleep(10);
     }
     if (!errmessages.isEmpty()) {
       errs.addAll(errmessages);
@@ -267,7 +289,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
   }
 
   @SuppressWarnings({"unchecked"})
-  private void invokeBulkDeleteCall(int seed, ArrayList<String> errs) throws Exception {
+  private void invokeBulkDeleteCall(int seed, List<String> errs) throws Exception {
     String payload =
         "{\n"
             + "          'delete-copy-field' : {\n"
@@ -288,10 +310,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
     payload = payload.replace("replaceDynamicCopyFieldDest", dynamicCopyFldDest);
     payload = payload.replace("myNewFieldTypeName", newFieldTypeName);
 
+    // don't close publisher - gets closed at teardown
     RestTestHarness publisher = randomRestTestHarness(r);
     String response = publisher.post("/schema", SolrTestCaseJ4.json(payload));
-    @SuppressWarnings({"rawtypes"})
-    Map map = (Map) Utils.fromJSONString(response);
+    Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response);
     Object errors = map.get("errors");
     if (errors != null) {
       errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8));
@@ -300,49 +322,52 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
 
     // get another node
     Set<String> errmessages = new HashSet<>();
+    // don't close harness - gets closed at teardown
     RestTestHarness harness = randomRestTestHarness(r);
-    try {
-      long startTime = System.nanoTime();
-      long maxTimeoutMillis = 100000;
-      while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS)
-          < maxTimeoutMillis) {
-        errmessages.clear();
-        @SuppressWarnings({"rawtypes"})
-        Map m = getObj(harness, aField, "fields");
-        if (m != null) errmessages.add(StrUtils.formatString("field {0} still exists", aField));
-
-        m = getObj(harness, dynamicFldName, "dynamicFields");
-        if (m != null)
-          errmessages.add(StrUtils.formatString("dynamic field {0} still exists", dynamicFldName));
-
-        @SuppressWarnings({"rawtypes"})
-        List l = getSourceCopyFields(harness, aField);
-        if (checkCopyField(l, aField, dynamicCopyFldDest))
-          errmessages.add(
-              StrUtils.formatString(
-                  "CopyField source={0},dest={1} still exists", aField, dynamicCopyFldDest));
-
-        m = getObj(harness, newFieldTypeName, "fieldTypes");
-        if (m != null)
-          errmessages.add(StrUtils.formatString("new type {0} still exists", newFieldTypeName));
-
-        if (errmessages.isEmpty()) break;
-
-        Thread.sleep(10);
+    TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (!timeout.hasTimedOut()) {
+      errmessages.clear();
+      Map<?, ?> m = getObj(harness, aField, "fields");
+      if (m != null) {
+        errmessages.add(StrUtils.formatString("field {0} still exists", aField));
+      }
+
+      m = getObj(harness, dynamicFldName, "dynamicFields");
+      if (m != null) {
+        errmessages.add(StrUtils.formatString("dynamic field {0} still exists", dynamicFldName));
       }
-    } finally {
-      harness.close();
+
+      List<Map<String, String>> l = getSourceCopyFields(harness, aField);
+      if (checkCopyField(l, aField, dynamicCopyFldDest)) {
+        errmessages.add(
+            StrUtils.formatString(
+                "CopyField source={0},dest={1} still exists", aField, dynamicCopyFldDest));
+      }
+
+      m = getObj(harness, newFieldTypeName, "fieldTypes");
+      if (m != null) {
+        errmessages.add(StrUtils.formatString("new type {0} still exists", newFieldTypeName));
+      }
+
+      if (errmessages.isEmpty()) {
+        break;
+      }
+
+      timeout.sleep(10);
     }
     if (!errmessages.isEmpty()) {
       errs.addAll(errmessages);
     }
   }
 
-  private boolean checkCopyField(
-      @SuppressWarnings({"rawtypes"}) List<Map> l, String src, String dest) {
-    if (l == null) return false;
-    for (@SuppressWarnings({"rawtypes"}) Map map : l) {
-      if (src.equals(map.get("source")) && dest.equals(map.get("dest"))) return true;
+  private boolean checkCopyField(List<Map<String, String>> l, String src, String dest) {
+    if (l == null) {
+      return false;
+    }
+    for (Map<String, String> map : l) {
+      if (src.equals(map.get("source")) && dest.equals(map.get("dest"))) {
+        return true;
+      }
     }
     return false;
   }