You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/01/11 20:01:04 UTC
[lucene-solr] branch branch_8x updated: SOLR-13051 improve TRA
update processor test - remove need to Thread.sleep() - better async
mechanism linked to SolrCore lifecycle - add some additional tests to be a
bit more thorough
This is an automated email from the ASF dual-hosted git repository.
gus pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 0f1da2b SOLR-13051 improve TRA update processor test - remove need to Thread.sleep() - better async mechanism linked to SolrCore lifecycle - add some additional tests to be a bit more thorough
0f1da2b is described below
commit 0f1da2bc14ef1bb79e21558247ef3c72e802924e
Author: Gus Heck <gu...@apache.org>
AuthorDate: Fri Jan 11 15:00:35 2019 -0500
SOLR-13051 improve TRA update processor test
- remove need to Thread.sleep()
- better async mechanism linked to SolrCore lifecycle
- add some additional tests to be a bit more thorough
---
.../src/java/org/apache/solr/core/SolrCore.java | 25 ++
.../processor/TimeRoutedAliasUpdateProcessor.java | 35 +-
.../TimeRoutedAliasUpdateProcessorTest.java | 373 ++++++++++++++-------
3 files changed, 298 insertions(+), 135 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 95342c3..142ccab 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -260,6 +260,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
static int boolean_query_max_clause_count = Integer.MIN_VALUE;
+ private ExecutorService coreAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Async Task");
/**
* The SolrResourceLoader used to load all resources for this core.
@@ -1532,6 +1533,8 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
}
log.info("{} CLOSING SolrCore {}", logid, this);
+ ExecutorUtil.shutdownAndAwaitTermination(coreAsyncTaskExecutor);
+
// stop reporting metrics
try {
coreMetricManager.close();
@@ -3166,4 +3169,26 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
});
return blobRef;
}
+
+ /**
+ * Run an arbitrary task in it's own thread. This is an expert option and is
+ * a method you should use with great care. It would be bad to run something that never stopped
+ * or run something that took a very long time. Typically this is intended for actions that take
+ * a few seconds, and therefore would be bad to wait for within a request, but but would not pose
+ * a significant hindrance to server shut down times. It is not intended for long running tasks
+ * and if you are using a Runnable with a loop in it, you are almost certainly doing it wrong.
+ * <p>
+ * WARNING: Solr wil not be able to shut down gracefully until this task completes!
+ * <p>
+ * A significant upside of using this method vs creating your own ExecutorService is that your code
+ * does not have to properly shutdown executors which typically is risky from a unit testing
+ * perspective since the test framework will complain if you don't carefully ensure the executor
+ * shuts down before the end of the test. Also the threads running this task are sure to have
+ * a proper MDC for logging.
+ *
+ * @param r the task to run
+ */
+ public void runAsync(Runnable r) {
+ coreAsyncTaskExecutor.submit(r);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index cc1ddb8..c28ac44 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.solr.cloud.ZkController;
@@ -52,12 +51,10 @@ import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.apache.solr.util.DateMathParser;
-import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.solr.common.util.ExecutorUtil.newMDCAwareSingleThreadExecutor;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE;
@@ -97,8 +94,10 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
- // This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details
- private volatile ExecutorService preemptiveCreationExecutor;
+ // This class is created once per request and the overseer methods prevent duplicate create requests
+ // from creating extra copies. All we need to track here is that we don't spam preemptive creates to
+ // the overseer multiple times from *this* request.
+ private volatile boolean preemptiveCreateOnceAlready = false;
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
//TODO get from "Collection property"
@@ -215,7 +214,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
// This next line blocks until all collections required by the current document have been created
return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
case ASYNC_PREEMPTIVE:
- if (preemptiveCreationExecutor == null) {
+ if (!preemptiveCreateOnceAlready) {
+ log.info("EXECUTING preemptive creation for {}", timeRoutedAlias.getAliasName());
// It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
// in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
// and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
@@ -242,18 +242,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
}
private void preemptiveAsync(Runnable r) {
- // Note: creating an executor and throwing it away is slightly expensive, but this is only likely to happen
- // once per hour/day/week (depending on time slice size for the TRA). If the executor were retained, it
- // would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
- // more complicated from a code maintenance and readability stand point. An executor must used instead of a
- // thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
- DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
- preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
- preemptiveCreationExecutor.execute(() -> {
- r.run();
- preemptiveCreationExecutor.shutdown();
- preemptiveCreationExecutor = null;
- });
+ preemptiveCreateOnceAlready = true;
+ req.getCore().runAsync(r);
}
/**
@@ -479,9 +469,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
return targetCollectionDesc.getValue(); // we don't need another collection
case ASYNC_PREEMPTIVE:
// can happen when preemptive interval is longer than one time slice
- String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
- preemptiveAsync(() -> createNextCollection(mostRecentCollName));
- return targetCollectionDesc.getValue();
+ if (!preemptiveCreateOnceAlready) {
+ String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+ preemptiveAsync(() -> createNextCollection(mostRecentCollName));
+ return targetCollectionDesc.getValue();
+ }
case SYNCHRONOUS:
createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but...
if (!updateParsedCollectionAliases()) { // thus we didn't make progress...
@@ -495,7 +487,6 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
break;
default:
throw unknownCreateType();
-
}
} while (true);
}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index e592654..7eeff5a 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -29,14 +29,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -54,14 +56,17 @@ import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@@ -75,13 +80,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// This feature has a leak
-@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12801")
+@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13059")
public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final String configName = "timeConfig";
private static final String alias = "myalias";
+ private static final String alias2 = "myalias2";
private static final String timeField = "timestamp_dt";
private static final String intField = "integer_i";
@@ -125,7 +129,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
.setMaxShardsPerNode(2)
.withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
.process(solrClient);
-
+
cluster.waitForActiveCollection(col23rd, 2, 4);
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
@@ -228,6 +232,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
+ // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
+ waitCol(1,configName);
// manipulate the config...
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
.withMethod(SolrRequest.METHOD.POST)
@@ -289,9 +295,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
assertUpdateResponse(solrClient.commit(alias));
// wait for all the collections to exist...
- waitCol("2017-10-23", numShards);
- waitCol("2017-10-24", numShards);
- waitCol("2017-10-25", numShards);
+ waitColAndAlias("2017-10-23", numShards, alias);
+ waitColAndAlias("2017-10-24", numShards, alias);
+ waitColAndAlias("2017-10-25", numShards, alias);
// at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
// 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
@@ -347,75 +353,108 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
.process(solrClient);
- // cause some collections to be created
- assertUpdateResponse(solrClient.add(alias,
- sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z")
- ));
- assertUpdateResponse(solrClient.commit(alias));
-
- // wait for all the collections to exist...
- waitCol("2017-10-23", numShards); // This one should have already existed from the alias creation
- waitCol("2017-10-24", numShards); // Create 1
- waitCol("2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken)
+ // needed to verify that preemptive creation in one alias doesn't inhibit preemptive creation in another
+ CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
+ .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
+ .process(solrClient);
- // normal update, nothing special, no collection creation required.
- List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
- assertEquals(3,cols.size());
+ addOneDocSynchCreation(numShards, alias);
+ addOneDocSynchCreation(numShards, alias2);
- assertNumDocs("2017-10-23", 0);
- assertNumDocs("2017-10-24", 0);
- assertNumDocs("2017-10-25", 1);
+ List<String> cols;
+ ModifiableSolrParams params = params();
- // cause some collections to be created
+ // Using threads to ensure that two TRA's are simultaneously preemptively creating and don't
+ // interfere with each other
+ ExecutorService executorService = ExecutorUtil.newMDCAwareCachedThreadPool("TimeRoutedAliasProcessorTestx-testPreemptiveCreation");
- ModifiableSolrParams params = params();
+ Exception[] threadExceptions = new Exception[2];
+ boolean[] threadStarted = new boolean[2];
+ boolean[] threadFinished = new boolean[2];
+ try {
+ CountDownLatch starter = new CountDownLatch(1);
+ executorService.submit(() -> {
+ threadStarted[0] = true;
+ try {
+ starter.await();
+ concurrentUpdates(params, alias);
+ } catch (Exception e) {
+ threadExceptions[0] = e;
+ }
+ threadFinished[0] = true;
+ });
+
+ executorService.submit(() -> {
+ threadStarted[1] = true;
+ try {
+ starter.await();
+ concurrentUpdates(params, alias2);
+ } catch (Exception e) {
+ threadExceptions[1] = e;
+ }
+ threadFinished[1] = true;
+ });
+ starter.countDown();
+ } finally {
+ ExecutorUtil.shutdownAndAwaitTermination(executorService);
+ }
- // TIME SENSITIVE SECTION BEGINS
+ // threads are known to be terminated by now, check for exceptions
+ for (Exception threadException : threadExceptions) {
+ if (threadException != null) {
+ Thread.sleep(5000); // avoid spurious fails due to TRA thread not done yet
+ //noinspection ThrowFromFinallyBlock
+ throw threadException;
+ }
+ }
- // In this section we intentionally rely on timing of a race condition but the gap in collection creation time vs
- // requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we
- // should always win the race) This is necessary because we are testing that we can guard against specific race
- // conditions that happen while a collection is being created. To test this without timing sensitivity we would
- // need a means to pass a semaphore to the server that it can use to delay collection creation
+ // just for confidence that there's nothing dodgy about how the threads executed.
+ assertTrue(threadStarted[0]);
+ assertTrue(threadStarted[1]);
+ assertTrue(threadFinished[0]);
+ assertTrue(threadFinished[1]);
+
+ // if one of these times out then the test has failed due to interference between aliases
+ waitColAndAlias("2017-10-26", numShards, alias);
+ waitColAndAlias("2017-10-26", numShards, alias2);
+
+ // after this we can ignore alias2
+ checkPreemptiveCase1(alias);
+ checkPreemptiveCase1(alias2);
+
+ // Some designs contemplated with close hooks were not properly restricted to the core and would have
+ // failed after other cores with other TRAs were stopped. Make sure that we don't fall into that trap in
+ // the future. The basic problem with a close hook solution is that one either winds up putting the
+ // executor on the TRAUP where it's duplicated/initiated for every request, or putting it at the class level
+ // in which case the hook will remove it for all TRA's which can pass a single TRA test nicely but is not safe
+ // where multiple TRA's might come and go.
//
- // This section must NOT gain any Thread.sleep() statements, nor should it gain any long running operations
+ // Start and stop some cores that have TRA's... 2x2 used to ensure every jetty gets at least one
- assertUpdateResponse(add(alias, Arrays.asList(
- sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
- sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
- sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"),
- sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation
- params));
- assertUpdateResponse(solrClient.commit(alias));
-
- cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
- assertEquals(3, cols.size());
- assertTrue("Preemptive creation appears to not be asynchronous anymore",!cols.contains("myalias_2017-10-26"));
- assertNumDocs("2017-10-23", 1);
- assertNumDocs("2017-10-24", 1);
- assertNumDocs("2017-10-25", 3);
+ CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
+ .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
+ .process(solrClient);
- // Here we quickly add another doc in a separate request, before the collection creation has completed.
- // This has the potential to incorrectly cause preemptive collection creation to run twice and create a
- // second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
- assertUpdateResponse(add(alias, Collections.singletonList(
- sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
- params));
- assertUpdateResponse(solrClient.commit(alias));
+ waitColAndAlias("2017-10-23",2, "foo");
+ waitCoreCount("foo_2017-10-23", 1); // prove this works, for confidence in deletion checking below.
+ assertUpdateResponse(solrClient.add("foo",
+ sdoc("id","1","timestamp_dt", "2017-10-23T00:00:00Z") // no extra collections should be created
+ ));
+ assertUpdateResponse(solrClient.commit("foo"));
- // TIME SENSITIVE SECTION ENDS
+ List<String> foo = solrClient.getClusterStateProvider().resolveAlias("foo");
- waitCol("2017-10-26", numShards);
+ CollectionAdminRequest.deleteAlias("foo").process(solrClient);
- cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
- assertTrue("Preemptive creation happened twice and created a collection " +
- "further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27"));
+ for (String colName : foo) {
+ CollectionAdminRequest.deleteCollection(colName).process(solrClient);
+ waitCoreCount(colName, 0);
+ }
- assertEquals(4, cols.size());
- assertNumDocs("2017-10-23", 1);
- assertNumDocs("2017-10-24", 1);
- assertNumDocs("2017-10-25", 4);
- assertNumDocs("2017-10-26", 0);
+ // if the design for terminating our executor is correct create/delete above will not cause failures below
+ // continue testing...
// now test with pre-create window longer than time slice, and forcing multiple creations.
CollectionAdminRequest.setAliasProperty(alias)
@@ -425,31 +464,31 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-27 now
params));
assertUpdateResponse(solrClient.commit(alias));
- waitCol("2017-10-27", numShards);
+ waitColAndAlias("2017-10-27", numShards, alias);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(5,cols.size()); // only one created in async case
- assertNumDocs("2017-10-23", 1);
- assertNumDocs("2017-10-24", 1);
- assertNumDocs("2017-10-25", 5);
- assertNumDocs("2017-10-26", 0);
- assertNumDocs("2017-10-27", 0);
+ assertNumDocs("2017-10-23", 1, alias);
+ assertNumDocs("2017-10-24", 1, alias);
+ assertNumDocs("2017-10-25", 5, alias);
+ assertNumDocs("2017-10-26", 0, alias);
+ assertNumDocs("2017-10-27", 0, alias);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-28 now
params));
assertUpdateResponse(solrClient.commit(alias));
- waitCol("2017-10-27", numShards);
- waitCol("2017-10-28", numShards);
+ waitColAndAlias("2017-10-27", numShards, alias);
+ waitColAndAlias("2017-10-28", numShards, alias);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
- assertNumDocs("2017-10-23", 1);
- assertNumDocs("2017-10-24", 1);
- assertNumDocs("2017-10-25", 6);
- assertNumDocs("2017-10-26", 0);
- assertNumDocs("2017-10-27", 0);
- assertNumDocs("2017-10-28", 0);
+ assertNumDocs("2017-10-23", 1, alias);
+ assertNumDocs("2017-10-24", 1, alias);
+ assertNumDocs("2017-10-25", 6, alias);
+ assertNumDocs("2017-10-26", 0, alias);
+ assertNumDocs("2017-10-27", 0, alias);
+ assertNumDocs("2017-10-28", 0, alias);
QueryResponse resp;
resp = solrClient.query(alias, params(
@@ -472,17 +511,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation
params));
assertUpdateResponse(solrClient.commit(alias));
- waitCol("2017-10-29", numShards);
+ waitColAndAlias("2017-10-29", numShards, alias);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(7,cols.size());
- assertNumDocs("2017-10-23", 1);
- assertNumDocs("2017-10-24", 1);
- assertNumDocs("2017-10-25", 6);
- assertNumDocs("2017-10-26", 0);
- assertNumDocs("2017-10-27", 1);
- assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
- assertNumDocs("2017-10-29", 0);
+ assertNumDocs("2017-10-23", 1, alias);
+ assertNumDocs("2017-10-24", 1, alias);
+ assertNumDocs("2017-10-25", 6, alias);
+ assertNumDocs("2017-10-26", 0, alias);
+ assertNumDocs("2017-10-27", 1, alias);
+ assertNumDocs("2017-10-28", 3, alias); // should get through even though preemptive creation ignored it.
+ assertNumDocs("2017-10-29", 0, alias);
resp = solrClient.query(alias, params(
"q", "*:*",
@@ -494,20 +533,20 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky?
params));
assertUpdateResponse(solrClient.commit(alias));
- waitCol("2017-10-30", numShards);
- waitCol("2017-10-31", numShards); // spooky! async case arising in middle of sync creation!!
+ waitColAndAlias("2017-10-30", numShards, alias);
+ waitColAndAlias("2017-10-31", numShards, alias); // spooky! async case arising in middle of sync creation!!
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(9,cols.size());
- assertNumDocs("2017-10-23", 1);
- assertNumDocs("2017-10-24", 1);
- assertNumDocs("2017-10-25", 6);
- assertNumDocs("2017-10-26", 0);
- assertNumDocs("2017-10-27", 1);
- assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
- assertNumDocs("2017-10-29", 0);
- assertNumDocs("2017-10-30", 1);
- assertNumDocs("2017-10-31", 0);
+ assertNumDocs("2017-10-23", 1, alias);
+ assertNumDocs("2017-10-24", 1, alias);
+ assertNumDocs("2017-10-25", 6, alias);
+ assertNumDocs("2017-10-26", 0, alias);
+ assertNumDocs("2017-10-27", 1, alias);
+ assertNumDocs("2017-10-28", 3, alias); // should get through even though preemptive creation ignored it.
+ assertNumDocs("2017-10-29", 0, alias);
+ assertNumDocs("2017-10-30", 1, alias);
+ assertNumDocs("2017-10-31", 0, alias);
resp = solrClient.query(alias, params(
"q", "*:*",
@@ -517,17 +556,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "14", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-01
params));
- waitCol("2017-11-01", numShards);
+ waitColAndAlias("2017-11-01", numShards, alias);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "15", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-02
params));
- waitCol("2017-11-02", numShards);
+ waitColAndAlias("2017-11-02", numShards, alias);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "16", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-03
params));
- waitCol("2017-11-03", numShards);
+ waitColAndAlias("2017-11-03", numShards, alias);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "17", "timestamp_dt", "2017-10-31T23:01:00Z")), // should NOT cause preemptive creation 11-04
@@ -539,13 +578,106 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "18", "timestamp_dt", "2017-11-01T23:01:00Z")), // should cause preemptive creation 11-04
params));
- waitCol("2017-11-04",numShards);
+ waitColAndAlias("2017-11-04",numShards, alias);
- Thread.sleep(2000); // allow the executor used in preemptive creation time to shut down.
+ }
+ // used to verify a core has been deleted (count = 0)
+ private void waitCoreCount(String collection, int count) {
+ long start = System.nanoTime();
+ CoreContainer coreContainer = cluster.getRandomJetty(random()).getCoreContainer();
+ int coreFooCount;
+ do {
+ coreFooCount = 0;
+ List<CoreDescriptor> coreDescriptors = coreContainer.getCoreDescriptors();
+ for (CoreDescriptor coreDescriptor : coreDescriptors) {
+ String collectionName = coreDescriptor.getCollectionName();
+ if (collection.equals(collectionName)) {
+ coreFooCount ++;
+ }
+ }
+ if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
+ fail("took over 10 seconds after collection creation to update aliases");
+ } else {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ } while(coreFooCount != count);
}
- private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException {
+ private void concurrentUpdates(ModifiableSolrParams params, String alias) throws SolrServerException, IOException {
+ // In this method we intentionally rely on timing of a race condition but the gap in collection creation time vs
+ // requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we
+ // should always win the race) This is necessary because we are testing that we can guard against specific race
+ // conditions that happen while a collection is being created. To test this without timing sensitivity we would
+ // need a means to pass a semaphore to the server that it can use to delay collection creation
+ //
+ // This method must NOT gain any Thread.sleep() statements, nor should it gain any long running operations
+ assertUpdateResponse(add(alias, Arrays.asList(
+ sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
+ sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
+ sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"),
+ sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation
+ params));
+ assertUpdateResponse(solrClient.commit(alias));
+
+ List<String> colsT1;
+ colsT1 = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+ assertEquals(3, colsT1.size());
+ assertTrue("Preemptive creation appears to not be asynchronous anymore", !colsT1.contains("myalias_2017-10-26"));
+ assertNumDocs("2017-10-23", 1, alias);
+ assertNumDocs("2017-10-24", 1, alias);
+ assertNumDocs("2017-10-25", 3, alias);
+
+ // Here we quickly add another doc in a separate request, before the collection creation has completed.
+ // This has the potential to incorrectly cause preemptive collection creation to run twice and create a
+ // second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
+ assertUpdateResponse(add(alias, Collections.singletonList(
+ sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
+ params));
+ assertUpdateResponse(solrClient.commit(alias));
+ }
+
+ private void checkPreemptiveCase1(String alias) throws SolrServerException, IOException {
+ List<String> cols;
+ cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+ assertTrue("Preemptive creation happened twice and created a collection " +
+ "further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27"));
+
+ assertEquals(4, cols.size());
+ assertNumDocs("2017-10-23", 1, alias);
+ assertNumDocs("2017-10-24", 1, alias);
+ assertNumDocs("2017-10-25", 4, alias);
+ assertNumDocs("2017-10-26", 0, alias);
+ }
+
+ private void addOneDocSynchCreation(int numShards, String alias) throws SolrServerException, IOException, InterruptedException {
+ // cause some collections to be created
+ assertUpdateResponse(solrClient.add(alias,
+ sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z")
+ ));
+ assertUpdateResponse(solrClient.commit(alias));
+
+ // wait for all the collections to exist...
+ waitColAndAlias("2017-10-23", numShards, alias); // This one should have already existed from the alias creation
+ waitColAndAlias("2017-10-24", numShards, alias); // Create 1
+ waitColAndAlias("2017-10-25", numShards, alias); // Create 2nd synchronously (ensure this is not broken)
+
+ // normal update, nothing special, no collection creation required.
+ List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+ assertEquals(3,cols.size());
+
+ assertNumDocs("2017-10-23", 0, alias);
+ assertNumDocs("2017-10-24", 0, alias);
+ assertNumDocs("2017-10-25", 1, alias);
+ }
+
+ private void assertNumDocs(final String datePart, int expected, String alias) throws SolrServerException, IOException {
QueryResponse resp = solrClient.query(alias + "_" + datePart, params(
"q", "*:*",
"rows", "10"));
@@ -572,9 +704,33 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
return leaders;
}
- private void waitCol(final String datePart, int slices) throws InterruptedException {
+ private void waitColAndAlias(final String datePart, int slices, String alias) throws InterruptedException {
// collection to exist
String collection = alias + "_" + datePart;
+ waitCol(slices, collection);
+ // and alias to be aware of collection
+ long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
+ while (!haveCollection(alias, collection)) {
+ if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
+ fail("took over 10 seconds after collection creation to update aliases");
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ private boolean haveCollection(String alias, String collection) {
+ // separated into separate lines to make it easier to track down an NPE that occurred once
+ // 3000 runs if it shows up again...
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ ZkStateReader zkStateReader = solrClient.getZkStateReader();
+ Aliases aliases = zkStateReader.getAliases();
+ Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
+ List<String> strings = collectionAliasListMap.get(alias);
+ return strings.contains(collection);
+ }
+
+ private void waitCol(int slices, String collection) {
waitForState("waiting for collections to be created", collection,
(liveNodes, collectionState) -> {
if (collectionState == null) {
@@ -585,15 +741,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
int size = activeSlices.size();
return size == slices;
});
- // and alias to be aware of collection
- long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
- while (!cluster.getSolrClient().getZkStateReader().getAliases().getCollectionAliasListMap().get(alias).contains(collection)) {
- if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
- fail("took over 10 seconds after collection creation to update aliases");
- } else {
- Thread.sleep(500);
- }
- }
}
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {