You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by kr...@apache.org on 2022/05/02 19:06:05 UTC
[solr] branch main updated: SOLR-16174: Modernize TestBulkSchemaConcurrent (#829)
This is an automated email from the ASF dual-hosted git repository.
krisden pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 24b871392db SOLR-16174: Modernize TestBulkSchemaConcurrent (#829)
24b871392db is described below
commit 24b871392db01941ad53eb13197844a02cd9ca61
Author: Kevin Risden <ri...@users.noreply.github.com>
AuthorDate: Mon May 2 15:05:59 2022 -0400
SOLR-16174: Modernize TestBulkSchemaConcurrent (#829)
---
.../solr/schema/TestBulkSchemaConcurrent.java | 289 +++++++++++----------
1 file changed, 157 insertions(+), 132 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java b/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java
index 7aaed143e3e..e2d5b5a4257 100644
--- a/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java
+++ b/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java
@@ -27,12 +27,20 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.RestTestHarness;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -41,6 +49,9 @@ import org.slf4j.LoggerFactory;
public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final long TIMEOUT = TEST_NIGHTLY ? 10 : 1; // in seconds
+ private static final int THREAD_COUNT = TEST_NIGHTLY ? 5 : 2;
+
@BeforeClass
public static void initSysProperties() {
System.setProperty("managed.schema.mutable", "true");
@@ -51,43 +62,56 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
return "solrconfig-managed-schema.xml";
}
+ @Before
+ public void setupTest() {
+ setupRestTestHarnesses();
+ }
+
+ @After
+ public void teardownTest() throws Exception {
+ closeRestTestHarnesses();
+ }
+
@Test
- @SuppressWarnings({"unchecked"})
public void test() throws Exception {
+ final List<List<String>> collectErrors = Collections.synchronizedList(new ArrayList<>());
- final int threadCount = 5;
- setupRestTestHarnesses();
- Thread[] threads = new Thread[threadCount];
- @SuppressWarnings({"rawtypes"})
- final List<List> collectErrors = Collections.synchronizedList(new ArrayList<>());
+ final ExecutorService executorService =
+ ExecutorUtil.newMDCAwareFixedThreadPool(
+ THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName()));
- for (int i = 0; i < threadCount; i++) {
+ List<Callable<Void>> callees = new ArrayList<>(THREAD_COUNT);
+ for (int i = 0; i < THREAD_COUNT; i++) {
final int finalI = i;
- threads[i] =
- new Thread() {
- @Override
- public void run() {
- @SuppressWarnings({"rawtypes"})
- ArrayList errs = new ArrayList();
- collectErrors.add(errs);
- try {
- invokeBulkAddCall(finalI, errs);
- invokeBulkReplaceCall(finalI, errs);
- invokeBulkDeleteCall(finalI, errs);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ Callable<Void> call =
+ () -> {
+ List<String> errs = new ArrayList<>();
+ collectErrors.add(errs);
+ try {
+ invokeBulkAddCall(finalI, errs);
+ invokeBulkReplaceCall(finalI, errs);
+ invokeBulkDeleteCall(finalI, errs);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ // TODO this might be double logged, but safer to log here anyway
+ log.error("Exception from thread {}", finalI, e);
}
+ return null;
};
-
- threads[i].start();
+ callees.add(call);
}
- for (Thread thread : threads) thread.join();
+ executorService.invokeAll(callees);
+ executorService.shutdown();
+
+ // TIMEOUT * 3 there are 3 tests - add, replace, delete each running for the length of TIMEOUT
+ assertTrue(
+ "Running for too long...", executorService.awaitTermination(TIMEOUT * 3, TimeUnit.SECONDS));
boolean success = true;
- for (@SuppressWarnings({"rawtypes"}) List e : collectErrors) {
+ for (List<String> e : collectErrors) {
if (e != null && !e.isEmpty()) {
success = false;
log.error("{}", e);
@@ -98,7 +122,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
}
@SuppressWarnings({"unchecked"})
- private void invokeBulkAddCall(int seed, ArrayList<String> errs) throws Exception {
+ private void invokeBulkAddCall(int seed, List<String> errs) throws Exception {
String payload =
"{\n"
+ " 'add-field' : {\n"
@@ -133,10 +157,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
payload = payload.replace("replaceDynamicCopyFieldDest", dynamicCopyFldDest);
payload = payload.replace("myNewFieldTypeName", newFieldTypeName);
+ // don't close publisher - gets closed at teardown
RestTestHarness publisher = randomRestTestHarness(r);
String response = publisher.post("/schema", SolrTestCaseJ4.json(payload));
- @SuppressWarnings({"rawtypes"})
- Map map = (Map) Utils.fromJSONString(response);
+ Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response);
Object errors = map.get("errors");
if (errors != null) {
errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8));
@@ -145,38 +169,38 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
// get another node
Set<String> errmessages = new HashSet<>();
+ // don't close harness - gets closed at teardown
RestTestHarness harness = randomRestTestHarness(r);
- try {
- long startTime = System.nanoTime();
- long maxTimeoutMillis = 100000;
- while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS)
- < maxTimeoutMillis) {
- errmessages.clear();
- @SuppressWarnings({"rawtypes"})
- Map m = getObj(harness, aField, "fields");
- if (m == null) errmessages.add(StrUtils.formatString("field {0} not created", aField));
-
- m = getObj(harness, dynamicFldName, "dynamicFields");
- if (m == null)
- errmessages.add(StrUtils.formatString("dynamic field {0} not created", dynamicFldName));
-
- @SuppressWarnings({"rawtypes"})
- List l = getSourceCopyFields(harness, aField);
- if (!checkCopyField(l, aField, dynamicCopyFldDest))
- errmessages.add(
- StrUtils.formatString(
- "CopyField source={0},dest={1} not created", aField, dynamicCopyFldDest));
-
- m = getObj(harness, newFieldTypeName, "fieldTypes");
- if (m == null)
- errmessages.add(StrUtils.formatString("new type {0} not created", newFieldTypeName));
-
- if (errmessages.isEmpty()) break;
-
- Thread.sleep(10);
+ TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut()) {
+ errmessages.clear();
+ Map<?, ?> m = getObj(harness, aField, "fields");
+ if (m == null) {
+ errmessages.add(StrUtils.formatString("field {0} not created", aField));
}
- } finally {
- harness.close();
+
+ m = getObj(harness, dynamicFldName, "dynamicFields");
+ if (m == null) {
+ errmessages.add(StrUtils.formatString("dynamic field {0} not created", dynamicFldName));
+ }
+
+ List<Map<String, String>> l = getSourceCopyFields(harness, aField);
+ if (!checkCopyField(l, aField, dynamicCopyFldDest)) {
+ errmessages.add(
+ StrUtils.formatString(
+ "CopyField source={0},dest={1} not created", aField, dynamicCopyFldDest));
+ }
+
+ m = getObj(harness, newFieldTypeName, "fieldTypes");
+ if (m == null) {
+ errmessages.add(StrUtils.formatString("new type {0} not created", newFieldTypeName));
+ }
+
+ if (errmessages.isEmpty()) {
+ break;
+ }
+
+ timeout.sleep(10);
}
if (!errmessages.isEmpty()) {
errs.addAll(errmessages);
@@ -184,7 +208,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
}
@SuppressWarnings({"unchecked"})
- private void invokeBulkReplaceCall(int seed, ArrayList<String> errs) throws Exception {
+ private void invokeBulkReplaceCall(int seed, List<String> errs) throws Exception {
String payload =
"{\n"
+ " 'replace-field' : {\n"
@@ -213,10 +237,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
payload = payload.replace("replaceDynamicField", dynamicFldName);
payload = payload.replace("myNewFieldTypeName", newFieldTypeName);
+ // don't close publisher - gets closed at teardown
RestTestHarness publisher = randomRestTestHarness(r);
String response = publisher.post("/schema", SolrTestCaseJ4.json(payload));
- @SuppressWarnings({"rawtypes"})
- Map map = (Map) Utils.fromJSONString(response);
+ Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response);
Object errors = map.get("errors");
if (errors != null) {
errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8));
@@ -225,41 +249,39 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
// get another node
Set<String> errmessages = new HashSet<>();
+ // don't close harness - gets closed at teardown
RestTestHarness harness = randomRestTestHarness(r);
- try {
- long startTime = System.nanoTime();
- long maxTimeoutMillis = 100000;
- while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS)
- < maxTimeoutMillis) {
- errmessages.clear();
- @SuppressWarnings({"rawtypes"})
- Map m = getObj(harness, aField, "fields");
- if (m == null)
- errmessages.add(StrUtils.formatString("field {0} no longer present", aField));
-
- m = getObj(harness, dynamicFldName, "dynamicFields");
- if (m == null)
- errmessages.add(
- StrUtils.formatString("dynamic field {0} no longer present", dynamicFldName));
-
- @SuppressWarnings({"rawtypes"})
- List l = getSourceCopyFields(harness, aField);
- if (!checkCopyField(l, aField, dynamicCopyFldDest))
- errmessages.add(
- StrUtils.formatString(
- "CopyField source={0},dest={1} no longer present", aField, dynamicCopyFldDest));
-
- m = getObj(harness, newFieldTypeName, "fieldTypes");
- if (m == null)
- errmessages.add(
- StrUtils.formatString("new type {0} no longer present", newFieldTypeName));
-
- if (errmessages.isEmpty()) break;
-
- Thread.sleep(10);
+ TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut()) {
+ errmessages.clear();
+ Map<?, ?> m = getObj(harness, aField, "fields");
+ if (m == null) {
+ errmessages.add(StrUtils.formatString("field {0} no longer present", aField));
}
- } finally {
- harness.close();
+
+ m = getObj(harness, dynamicFldName, "dynamicFields");
+ if (m == null) {
+ errmessages.add(
+ StrUtils.formatString("dynamic field {0} no longer present", dynamicFldName));
+ }
+
+ List<Map<String, String>> l = getSourceCopyFields(harness, aField);
+ if (!checkCopyField(l, aField, dynamicCopyFldDest)) {
+ errmessages.add(
+ StrUtils.formatString(
+ "CopyField source={0},dest={1} no longer present", aField, dynamicCopyFldDest));
+ }
+
+ m = getObj(harness, newFieldTypeName, "fieldTypes");
+ if (m == null) {
+ errmessages.add(StrUtils.formatString("new type {0} no longer present", newFieldTypeName));
+ }
+
+ if (errmessages.isEmpty()) {
+ break;
+ }
+
+ timeout.sleep(10);
}
if (!errmessages.isEmpty()) {
errs.addAll(errmessages);
@@ -267,7 +289,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
}
@SuppressWarnings({"unchecked"})
- private void invokeBulkDeleteCall(int seed, ArrayList<String> errs) throws Exception {
+ private void invokeBulkDeleteCall(int seed, List<String> errs) throws Exception {
String payload =
"{\n"
+ " 'delete-copy-field' : {\n"
@@ -288,10 +310,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
payload = payload.replace("replaceDynamicCopyFieldDest", dynamicCopyFldDest);
payload = payload.replace("myNewFieldTypeName", newFieldTypeName);
+ // don't close publisher - gets closed at teardown
RestTestHarness publisher = randomRestTestHarness(r);
String response = publisher.post("/schema", SolrTestCaseJ4.json(payload));
- @SuppressWarnings({"rawtypes"})
- Map map = (Map) Utils.fromJSONString(response);
+ Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response);
Object errors = map.get("errors");
if (errors != null) {
errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8));
@@ -300,49 +322,52 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase {
// get another node
Set<String> errmessages = new HashSet<>();
+ // don't close harness - gets closed at teardown
RestTestHarness harness = randomRestTestHarness(r);
- try {
- long startTime = System.nanoTime();
- long maxTimeoutMillis = 100000;
- while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS)
- < maxTimeoutMillis) {
- errmessages.clear();
- @SuppressWarnings({"rawtypes"})
- Map m = getObj(harness, aField, "fields");
- if (m != null) errmessages.add(StrUtils.formatString("field {0} still exists", aField));
-
- m = getObj(harness, dynamicFldName, "dynamicFields");
- if (m != null)
- errmessages.add(StrUtils.formatString("dynamic field {0} still exists", dynamicFldName));
-
- @SuppressWarnings({"rawtypes"})
- List l = getSourceCopyFields(harness, aField);
- if (checkCopyField(l, aField, dynamicCopyFldDest))
- errmessages.add(
- StrUtils.formatString(
- "CopyField source={0},dest={1} still exists", aField, dynamicCopyFldDest));
-
- m = getObj(harness, newFieldTypeName, "fieldTypes");
- if (m != null)
- errmessages.add(StrUtils.formatString("new type {0} still exists", newFieldTypeName));
-
- if (errmessages.isEmpty()) break;
-
- Thread.sleep(10);
+ TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut()) {
+ errmessages.clear();
+ Map<?, ?> m = getObj(harness, aField, "fields");
+ if (m != null) {
+ errmessages.add(StrUtils.formatString("field {0} still exists", aField));
+ }
+
+ m = getObj(harness, dynamicFldName, "dynamicFields");
+ if (m != null) {
+ errmessages.add(StrUtils.formatString("dynamic field {0} still exists", dynamicFldName));
}
- } finally {
- harness.close();
+
+ List<Map<String, String>> l = getSourceCopyFields(harness, aField);
+ if (checkCopyField(l, aField, dynamicCopyFldDest)) {
+ errmessages.add(
+ StrUtils.formatString(
+ "CopyField source={0},dest={1} still exists", aField, dynamicCopyFldDest));
+ }
+
+ m = getObj(harness, newFieldTypeName, "fieldTypes");
+ if (m != null) {
+ errmessages.add(StrUtils.formatString("new type {0} still exists", newFieldTypeName));
+ }
+
+ if (errmessages.isEmpty()) {
+ break;
+ }
+
+ timeout.sleep(10);
}
if (!errmessages.isEmpty()) {
errs.addAll(errmessages);
}
}
- private boolean checkCopyField(
- @SuppressWarnings({"rawtypes"}) List<Map> l, String src, String dest) {
- if (l == null) return false;
- for (@SuppressWarnings({"rawtypes"}) Map map : l) {
- if (src.equals(map.get("source")) && dest.equals(map.get("dest"))) return true;
+ private boolean checkCopyField(List<Map<String, String>> l, String src, String dest) {
+ if (l == null) {
+ return false;
+ }
+ for (Map<String, String> map : l) {
+ if (src.equals(map.get("source")) && dest.equals(map.get("dest"))) {
+ return true;
+ }
}
return false;
}