You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/02/19 02:57:24 UTC
[lucene-solr] branch solr-13131 updated: SOLR-13149 Tests,
fix deadlock exposed by test
This is an automated email from the ASF dual-hosted git repository.
gus pushed a commit to branch solr-13131
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/solr-13131 by this push:
new 4168f0b SOLR-13149 Tests, fix deadlock exposed by test
4168f0b is described below
commit 4168f0b43ca1d8128f0605db1c355ef538b50b24
Author: Gus Heck <gu...@apache.org>
AuthorDate: Mon Feb 18 21:56:57 2019 -0500
SOLR-13149 Tests, fix deadlock exposed by test
---
.../MaintainCategoryRoutedAliasCmd.java | 69 ++++--
.../java/org/apache/solr/core/CoreContainer.java | 121 ++++++-----
.../CategoryRoutedAliasUpdateProcessorTest.java | 207 +++++++++++++-----
.../processor/RoutedAliasUpdateProcessorTest.java | 171 +++++++++++++++
.../TimeRoutedAliasUpdateProcessorTest.java | 231 ++++-----------------
5 files changed, 491 insertions(+), 308 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
index a0a7199..ccd80f3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
@@ -17,10 +17,12 @@
package org.apache.solr.cloud.api.collections;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -37,15 +39,20 @@ import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.CategoryRoutedAlias.UNINITIALIZED;
import static org.apache.solr.common.params.CommonParams.NAME;
public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@SuppressWarnings("WeakerAccess")
public static final String IF_CATEGORY_COLLECTION_NOT_FOUND = "ifCategoryCollectionNotFound";
+ private static NamedSimpleSemaphore DELETE_LOCK = new NamedSimpleSemaphore();
+
private final OverseerCollectionMessageHandler ocmh;
MaintainCategoryRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
@@ -112,20 +119,38 @@ public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
// requests to the alias will fail with internal errors (incl. queries etc).
// TODO avoid this contains check?
List<String> colList = new ArrayList<>(collectionAliasListMap.get(aliasName));
- if (colList.contains(initialCollection) && colList.size() > 1) {
-
- aliasesManager.applyModificationAndExportToZk(curAliases -> {
- colList.remove(initialCollection);
- final String collectionsToKeepStr = StrUtils.join(colList, ',');
- return curAliases.cloneWithCollectionAlias(aliasName, collectionsToKeepStr);
- });
- final CollectionsHandler collHandler = ocmh.overseer.getCoreContainer().getCollectionsHandler();
- final SolrParams reqParams = CollectionAdminRequest
- .deleteCollection(initialCollection).getParams();
- SolrQueryResponse rsp = new SolrQueryResponse();
- collHandler.handleRequestBody(new LocalSolrQueryRequest(null, reqParams), rsp);
- //noinspection unchecked
- results.add(UNINITIALIZED, rsp.getValues());
+ if (colList.contains(initialCollection) && colList.size() > 1 ) {
+
+ // need to run the delete async, otherwise we may deadlock with incoming updates that are attempting
+ // to create collections (they will have called getCore() but may be waiting on the overseer alias lock
+ // we hold and we will be waiting for the Core reference count to reach zero). By deleting asynchronously
+ // we allow this request to complete and the alias lock to be released, which allows the update to complete
+ // so that we can do the delete. Additionally we don't want to cause multiple delete operations during
+ // the time the delete is in progress, since that just wastes overseer cycles.
+ // TODO: check TRA's are protected against this
+
+ if (DELETE_LOCK.tryAcquire(aliasName)) {
+ // note that the overseer might not have any cores (and the unit test occasionally catches this)
+ ocmh.overseer.getCoreContainer().runAsync(() -> {
+ aliasesManager.applyModificationAndExportToZk(curAliases -> {
+ colList.remove(initialCollection);
+ final String collectionsToKeepStr = StrUtils.join(colList, ',');
+ return curAliases.cloneWithCollectionAlias(aliasName, collectionsToKeepStr);
+ });
+ final CollectionsHandler collHandler = ocmh.overseer.getCoreContainer().getCollectionsHandler();
+ final SolrParams reqParams = CollectionAdminRequest
+ .deleteCollection(initialCollection).getParams();
+ SolrQueryResponse rsp = new SolrQueryResponse();
+ try {
+ collHandler.handleRequestBody(new LocalSolrQueryRequest(null, reqParams), rsp);
+ } catch (Exception e) {
+ log.error("Could not delete initial collection from CRA", e);
+ }
+ //noinspection unchecked
+ results.add(UNINITIALIZED, rsp.getValues());
+ DELETE_LOCK.release(aliasName);
+ });
+ }
}
//---- CREATE THE COLLECTION
@@ -138,7 +163,21 @@ public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
//---- UPDATE THE ALIAS WITH NEW COLLECTION
updateAlias(aliasName, aliasesManager, categoryRequired);
}
-
}
+ private static class NamedSimpleSemaphore {
+
+ private final HashMap<String, Semaphore> semaphores = new HashMap<>();
+
+ NamedSimpleSemaphore() {
+ }
+
+ boolean tryAcquire(String name) {
+ return semaphores.computeIfAbsent(name, s -> new Semaphore(1)).tryAcquire();
+ }
+
+ public void release(String name) {
+ semaphores.get(name).release();
+ }
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 3dc1bb5..803e0cb 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,21 +16,6 @@
*/
package org.apache.solr.core;
-import static java.util.Objects.requireNonNull;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_HISTORY_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
-import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
-
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -53,6 +38,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.config.Lookup;
@@ -128,8 +115,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
+import static java.util.Objects.requireNonNull;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_HISTORY_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
+import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
/**
*
@@ -224,6 +223,8 @@ public class CoreContainer {
protected volatile AutoScalingHandler autoScalingHandler;
+ private ExecutorService coreContainerAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Container Async Task");
+
private enum CoreInitFailedAction { fromleader, none }
/**
@@ -417,7 +418,7 @@ public class CoreContainer {
SolrHttpClientContextBuilder httpClientBuilder = new SolrHttpClientContextBuilder();
if (builder.getCredentialsProviderProvider() != null) {
httpClientBuilder.setDefaultCredentialsProvider(new CredentialsProviderProvider() {
-
+
@Override
public CredentialsProvider getCredentialsProvider() {
return builder.getCredentialsProviderProvider().getCredentialsProvider();
@@ -832,9 +833,9 @@ public class CoreContainer {
}
public void shutdown() {
- log.info("Shutting down CoreContainer instance="
- + System.identityHashCode(this));
+ log.info("Shutting down CoreContainer instance=" + System.identityHashCode(this));
+ ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor);
ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
isShutDown = true;
@@ -864,7 +865,7 @@ public class CoreContainer {
}
ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
-
+
// First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (solrCores.getModifyLock()) {
solrCores.getModifyLock().notifyAll(); // wake up anyone waiting
@@ -896,7 +897,7 @@ public class CoreContainer {
synchronized (solrCores.getModifyLock()) {
solrCores.getModifyLock().notifyAll(); // wake up the thread
}
-
+
customThreadPool.submit(() -> {
replayUpdatesExecutor.shutdownAndAwaitTermination();
});
@@ -1104,8 +1105,8 @@ public class CoreContainer {
return core;
} catch (Exception ex) {
- // First clean up any core descriptor, there should never be an existing core.properties file for any core that
- // failed to be created on-the-fly.
+ // First clean up any core descriptor, there should never be an existing core.properties file for any core that
+ // failed to be created on-the-fly.
coresLocator.delete(this, cd);
if (isZooKeeperAware() && !preExisitingZkEntry) {
try {
@@ -1237,7 +1238,7 @@ public class CoreContainer {
private ConfigSet getConfigSet(CoreDescriptor cd) {
return coreConfigService.getConfig(cd);
}
-
+
/**
* Take action when we failed to create a SolrCore. If error is due to corrupt index, try to recover. Various recovery
* strategies can be specified via system properties "-DCoreInitFailedAction={fromleader, none}"
@@ -1263,13 +1264,13 @@ public class CoreContainer {
break;
}
}
-
+
// If no CorruptIndexException, nothing we can try here
if (cause == null) throw original;
-
+
CoreInitFailedAction action = CoreInitFailedAction.valueOf(System.getProperty(CoreInitFailedAction.class.getSimpleName(), "none"));
log.debug("CorruptIndexException while creating core, will attempt to repair via {}", action);
-
+
switch (action) {
case fromleader: // Recovery from leader on a CorruptedIndexException
if (isZooKeeperAware()) {
@@ -1369,13 +1370,13 @@ public class CoreContainer {
}
/**
- * Returns an immutable Map of Exceptions that occurred when initializing
- * SolrCores (either at startup, or do to runtime requests to create cores)
- * keyed off of the name (String) of the SolrCore that had the Exception
+ * Returns an immutable Map of Exceptions that occurred when initializing
+ * SolrCores (either at startup, or do to runtime requests to create cores)
+ * keyed off of the name (String) of the SolrCore that had the Exception
* during initialization.
* <p>
- * While the Map returned by this method is immutable and will not change
- * once returned to the client, the source data used to generate this Map
+ * While the Map returned by this method is immutable and will not change
+ * once returned to the client, the source data used to generate this Map
* can be changed as various SolrCore operations are performed:
* </p>
* <ul>
@@ -1426,7 +1427,7 @@ public class CoreContainer {
* Recreates a SolrCore.
* While the new core is loading, requests will continue to be dispatched to
* and processed by the old core
- *
+ *
* @param name the name of the SolrCore to reload
*/
public void reload(String name) {
@@ -1526,7 +1527,7 @@ public class CoreContainer {
public void unload(String name, boolean deleteIndexDir, boolean deleteDataDir, boolean deleteInstanceDir) {
CoreDescriptor cd = solrCores.getCoreDescriptor(name);
-
+
if (name != null) {
// check for core-init errors first
CoreLoadFailure loadFailure = coreInitFailures.remove(name);
@@ -1543,7 +1544,7 @@ public class CoreContainer {
return;
}
}
-
+
if (cd == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot unload non-existent core [" + name + "]");
}
@@ -1571,7 +1572,7 @@ public class CoreContainer {
zkSys.getZkController().stopReplicationFromLeader(name);
}
}
-
+
core.unloadOnClose(cd, deleteIndexDir, deleteDataDir, deleteInstanceDir);
if (close)
core.closeAndWait();
@@ -1686,17 +1687,17 @@ public class CoreContainer {
public BlobRepository getBlobRepository(){
return blobRepository;
}
-
+
/**
* If using asyncSolrCoreLoad=true, calling this after {@link #load()} will
* not return until all cores have finished loading.
- *
+ *
* @param timeoutMs timeout, upon which method simply returns
*/
public void waitForLoadingCoresToFinish(long timeoutMs) {
solrCores.waitForLoadingCoresToFinish(timeoutMs);
}
-
+
public void waitForLoadingCore(String name, long timeoutMs) {
solrCores.waitForLoadingCoreToFinish(name, timeoutMs);
}
@@ -1781,11 +1782,11 @@ public class CoreContainer {
public boolean isZooKeeperAware() {
return zkSys.getZkController() != null;
}
-
+
public ZkController getZkController() {
return zkSys.getZkController();
}
-
+
public NodeConfig getConfig() {
return cfg;
}
@@ -1794,7 +1795,7 @@ public class CoreContainer {
public ShardHandlerFactory getShardHandlerFactory() {
return shardHandlerFactory;
}
-
+
public UpdateShardHandler getUpdateShardHandler() {
return updateShardHandler;
}
@@ -1802,7 +1803,7 @@ public class CoreContainer {
public SolrResourceLoader getResourceLoader() {
return loader;
}
-
+
public boolean isCoreLoading(String name) {
return solrCores.isCoreLoading(name);
}
@@ -1822,8 +1823,8 @@ public class CoreContainer {
public long getStatus() {
return status;
}
-
- // Occasaionally we need to access the transient cache handler in places other than coreContainer.
+
+ // Occasionally we need to access the transient cache handler in places other than coreContainer.
public TransientSolrCoreCache getTransientCache() {
return solrCores.getTransientCacheHandler();
}
@@ -1883,7 +1884,7 @@ public class CoreContainer {
if (tragicException != null && isZooKeeperAware()) {
getZkController().giveupLeadership(solrCore.getCoreDescriptor(), tragicException);
}
-
+
return tragicException != null;
}
@@ -1891,6 +1892,32 @@ public class CoreContainer {
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
}
+ /**
+ * Run an arbitrary task in it's own thread. This is an expert option and is
+ * a method you should use with great care. It would be bad to run something that never stopped
+ * or run something that took a very long time. Typically this is intended for actions that take
+ * a few seconds, and therefore would be bad to wait for within a request, or actions that need to happen
+ * when a core has zero references, but but would not pose a significant hindrance to server shut down times.
+ * It is not intended for long running tasks and if you are using a Runnable with a loop in it, you are
+ * almost certainly doing it wrong.
+ * <p><br>
+ * WARNING: Solr wil not be able to shut down gracefully until this task completes!
+ * <p><br>
+ * A significant upside of using this method vs creating your own ExecutorService is that your code
+ * does not have to properly shutdown executors which typically is risky from a unit testing
+ * perspective since the test framework will complain if you don't carefully ensure the executor
+ * shuts down before the end of the test. Also the threads running this task are sure to have
+ * a proper MDC for logging.
+ * <p><br>
+ * Normally, one uses {@link SolrCore#runAsync(Runnable)} if possible, but in some cases
+ * you might need to execute a task asynchronously when you could be running on a node with no
+ * cores, and then use of this method is indicated.
+ *
+ * @param r the task to run
+ */
+ public void runAsync(Runnable r) {
+ coreContainerAsyncTaskExecutor.submit(r);
+ }
}
class CloserThread extends Thread {
diff --git a/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
index 850f4d1..566e27d 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
@@ -21,21 +21,22 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.lucene.util.IOUtils;
-import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
-import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.api.collections.CategoryRoutedAlias;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -59,7 +60,6 @@ public class CategoryRoutedAliasUpdateProcessorTest extends RoutedAliasUpdatePro
private static final String intField = "integer_i";
private int lastDocId = 0;
- private int numDocsDeletedOrFailed = 0;
private static CloudSolrClient solrClient;
@Before
@@ -82,6 +82,86 @@ public class CategoryRoutedAliasUpdateProcessorTest extends RoutedAliasUpdatePro
IOUtils.close(solrClient);
}
+ public void testNonEnglish() throws Exception {
+ // test to document the expected behavior with non-english text for categories
+ // the present expectation is that non-latin text and many accented latin characters
+ // will get replaced with '_'. This is necessary to maintain collection naming
+ // conventions. The net effect is that documents get sorted by the number of characters
+ // in the category rather than the actual categories.
+
+ // This should be changed in an enhancement (wherein the category is RFC-4648 url-safe encoded).
+ // For now document it as an expected limitation.
+
+ String somethingInChinese = "中文的东西"; // 5 chars
+ String somethingInHebrew = "משהו בסינית"; // 11 chars
+ String somethingInThai = "บางอย่างในภาษาจีน"; // 17 chars
+ String somethingInArabic = "شيء في الصينية"; // 14 chars
+ String somethingInGreek = "κάτι κινεζικό"; // 13 chars
+ String somethingInGujarati = "િનીમાં કંઈક"; // 11 chars (same as hebrew)
+
+ String ONE_ = "_";
+ String TWO_ = "__";
+ String THREE_ = "___";
+ String FOUR_ = "____";
+ String FIVE_ = "_____";
+
+ String collectionChinese = getAlias() + "__CRA__" + FIVE_;
+ String collectionHebrew = getAlias() + "__CRA__" + FIVE_ + FIVE_ + ONE_;
+ String collectionThai = getAlias() + "__CRA__" + FIVE_ + FIVE_ + FIVE_ + TWO_;
+ String collectionArabic = getAlias() + "__CRA__" + FIVE_ + FIVE_ + FOUR_;
+ String collectionGreek = getAlias() + "__CRA__" + FIVE_ + FIVE_ + THREE_;
+ // Note Gujarati not listed, because it duplicates hebrew.
+
+ String configName = getSaferTestName();
+ createConfigSet(configName);
+
+ List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+ List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+ System.out.println("*****************FOO***************");
+
+ // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+ assertTrue("We expect at least 2 configSets",
+ retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+ assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+ System.out.println("*****************FOO2***************");
+ CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField,
+ CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+ .setMaxShardsPerNode(2))
+ .process(solrClient);
+ System.out.println("*****************FOO3***************");
+ addDocsAndCommit(true,
+ newDoc(somethingInChinese),
+ newDoc(somethingInHebrew),
+ newDoc(somethingInThai),
+ newDoc(somethingInArabic),
+ newDoc(somethingInGreek),
+ newDoc(somethingInGujarati));
+
+ System.out.println("*****************FOO4***************");
+
+ // Note Gujarati not listed, because it duplicates hebrew.
+ assertInvariants(collectionChinese, collectionHebrew, collectionThai, collectionArabic, collectionGreek);
+
+ System.out.println("*****************FOO5***************");
+
+ assertColHasDocCount(collectionChinese, 1);
+ assertColHasDocCount(collectionHebrew, 2);
+ assertColHasDocCount(collectionThai, 1);
+ assertColHasDocCount(collectionArabic, 1);
+ assertColHasDocCount(collectionGreek, 1);
+
+ }
+
+ private void assertColHasDocCount(String collectionChinese, int expected) throws SolrServerException, IOException {
+ final QueryResponse colResponse = solrClient.query(collectionChinese, params(
+ "q", "*:*",
+ "rows", "0"));
+ long aliasNumFound = colResponse.getResults().getNumFound();
+ assertEquals(expected,aliasNumFound);
+ }
+
@Slow
@Test
public void test() throws Exception {
@@ -93,10 +173,10 @@ public class CategoryRoutedAliasUpdateProcessorTest extends RoutedAliasUpdatePro
final String colVogon = getAlias() + "__CRA__" + SHIPS[0];
// we expect changes ensuring a legal collection name.
- final String colHoG = getAlias() + "__CRA__" + SHIPS[1].replaceAll("\\s", "_");
- final String colStunt = getAlias() + "__CRA__" + SHIPS[2].replaceAll("\\s", "_");
- final String colArk = getAlias() + "__CRA__" + SHIPS[3].replaceAll("-","_");
- final String colBistro = getAlias() + "__CRA__" + SHIPS[4].replaceAll("\\$", "_");
+ final String colHoG = getAlias() + "__CRA__" + noSpaces(SHIPS[1]);
+ final String colStunt = getAlias() + "__CRA__" + noSpaces(SHIPS[2]);
+ final String colArk = getAlias() + "__CRA__" + noDashes(SHIPS[3]);
+ final String colBistro = getAlias() + "__CRA__" + noDollar(SHIPS[4]);
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
@@ -127,56 +207,77 @@ public class CategoryRoutedAliasUpdateProcessorTest extends RoutedAliasUpdatePro
assertInvariants(colVogon, colHoG, colStunt, colArk, colBistro);
}
- private void createConfigSet(String configName) throws SolrServerException, IOException {
- // First create a configSet
- // Then we create a collection with the name of the eventual config.
- // We configure it, and ultimately delete the collection, leaving a modified config-set behind.
- // Later we create the "real" collections referencing this modified config-set.
- assertEquals(0, new ConfigSetAdminRequest.Create()
- .setConfigSetName(configName)
- .setBaseConfigSetName("_default")
- .process(solrClient).getStatus());
-
- CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
-
- // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
- waitCol(1, configName);
- // manipulate the config...
- checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
- .withMethod(SolrRequest.METHOD.POST)
- .withPayload("{" +
- " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
- " 'add-updateprocessor' : {" +
- " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
- " }," +
- " 'add-updateprocessor' : {" + // for testing
- " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
- " 'fieldName':'" + intField + "'" +
- " }," +
- "}").build()));
- // only sometimes test with "tolerant" URP:
- final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
- checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
- .withMethod(SolrRequest.METHOD.POST)
- .withPayload("{" +
- " 'set' : {" +
- " '_UPDATE' : {'processor':'" + urpNames + "'}" +
- " }" +
- "}").build()));
-
- CollectionAdminRequest.deleteCollection(configName).process(solrClient);
- assertTrue(
- new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()
- .contains(configName)
- );
+ private String noSpaces(String ship) {
+ return ship.replaceAll("\\s", "_");
+ }
+ private String noDashes(String ship) {
+ return ship.replaceAll("-", "_");
+ }
+ private String noDollar(String ship) {
+ return ship.replaceAll("\\$", "_");
}
- private void checkNoError(NamedList<Object> response) {
- Object errors = response.get("errorMessages");
- assertNull("" + errors, errors);
+ /**
+ * Test that the Update Processor Factory routes documents to leader shards and thus
+ * avoids the possibility of introducing an extra hop to find the leader.
+ *
+ * @throws Exception when it blows up unexpectedly :)
+ */
+ @Slow
+ @Test
+ @LogLevel("org.apache.solr.update.processor.TrackingUpdateProcessorFactory=DEBUG")
+ public void testSliceRouting() throws Exception {
+ String configName = getSaferTestName();
+ createConfigSet(configName);
+
+ // each collection has 4 shards with 3 replicas for 12 possible destinations
+ // 4 of which are leaders, and 8 of which should fail this test.
+ final int numShards = 1 + random().nextInt(4);
+ final int numReplicas = 1 + random().nextInt(3);
+ CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField,
+ CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
+ .setMaxShardsPerNode(numReplicas))
+ .process(solrClient);
+
+ // cause some collections to be created
+ assertUpdateResponse(solrClient.add(getAlias(), new SolrInputDocument("id","1",categoryField, SHIPS[0])));
+ assertUpdateResponse(solrClient.add(getAlias(), new SolrInputDocument("id","2",categoryField, SHIPS[1])));
+ assertUpdateResponse(solrClient.add(getAlias(), new SolrInputDocument("id","3",categoryField, SHIPS[2])));
+ assertUpdateResponse(solrClient.commit(getAlias()));
+
+ // wait for all the collections to exist...
+
+ waitColAndAlias(getAlias(), "__CRA__", SHIPS[0], numShards);
+ waitColAndAlias(getAlias(), "__CRA__", noSpaces(SHIPS[1]), numShards);
+ waitColAndAlias(getAlias(), "__CRA__", noSpaces(SHIPS[2]), numShards);
+
+ // at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
+ // 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
+ // leader randomly and not causing a failure if the code is broken, but as a whole this test will therefore only have
+ // about a 3.6% false positive rate (0.33^3). If that's not good enough, add more docs or more replicas per shard :).
+
+ final String trackGroupName = getTrackUpdatesGroupName();
+ final List<UpdateCommand> updateCommands;
+ try {
+ TrackingUpdateProcessorFactory.startRecording(trackGroupName);
+
+ ModifiableSolrParams params = params("post-processor", "tracking-" + trackGroupName);
+ List<SolrInputDocument> list = Arrays.asList(
+ sdoc("id", "4", categoryField, SHIPS[0]),
+ sdoc("id", "5", categoryField, SHIPS[1]),
+ sdoc("id", "6", categoryField, SHIPS[2]));
+ Collections.shuffle(list, random()); // order should not matter here
+ assertUpdateResponse(add(getAlias(), list,
+ params));
+ } finally {
+ updateCommands = TrackingUpdateProcessorFactory.stopRecording(trackGroupName);
+ }
+ assertRouting(numShards, updateCommands);
}
+
private void assertInvariants(String... expectedColls) throws IOException, SolrServerException {
+ int numDocsDeletedOrFailed = 0;
final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(getAlias());
diff --git a/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
index 0f3bfa2..a15f8c0 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
@@ -23,29 +23,180 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Ignore;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
@Ignore // don't try too run abstract base class
public abstract class RoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+ private static final String intField = "integer_i";
+
+ void waitColAndAlias(String alias, String separator, final String suffix, int slices) throws InterruptedException {
+ // collection to exist
+ String collection = alias + separator + suffix;
+ waitCol(slices, collection);
+ // and alias to be aware of collection
+ long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
+ while (!haveCollection(alias, collection)) {
+ if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
+ fail("took over 10 seconds after collection creation to update aliases");
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ private boolean haveCollection(String alias, String collection) {
+ // separated into separate lines to make it easier to track down an NPE that occurred once
+ // 3000 runs if it shows up again...
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ ZkStateReader zkStateReader = solrClient.getZkStateReader();
+ Aliases aliases = zkStateReader.getAliases();
+ Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
+ List<String> strings = collectionAliasListMap.get(alias);
+ return strings.contains(collection);
+ }
+
+ /** @see TrackingUpdateProcessorFactory */
+ String getTrackUpdatesGroupName() {
+ return getSaferTestName();
+ }
+
+ void createConfigSet(String configName) throws SolrServerException, IOException {
+ // First create a configSet
+ // Then we create a collection with the name of the eventual config.
+ // We configure it, and ultimately delete the collection, leaving a modified config-set behind.
+ // Later we create the "real" collections referencing this modified config-set.
+ assertEquals(0, new ConfigSetAdminRequest.Create()
+ .setConfigSetName(configName)
+ .setBaseConfigSetName("_default")
+ .process(getSolrClient()).getStatus());
+
+ CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(getSolrClient());
+
+ // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
+ waitCol(1,configName);
+ // manipulate the config...
+ checkNoError(getSolrClient().request(new V2Request.Builder("/collections/" + configName + "/config")
+ .withMethod(SolrRequest.METHOD.POST)
+ .withPayload("{" +
+ " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
+ " 'add-updateprocessor' : {" +
+ " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
+ " }," +
+ // See TrackingUpdateProcessorFactory javadocs for details...
+ " 'add-updateprocessor' : {" +
+ " 'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
+ " }," +
+ " 'add-updateprocessor' : {" + // for testing
+ " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
+ " 'fieldName':'" + getIntField() + "'" +
+ " }," +
+ "}").build()));
+ // only sometimes test with "tolerant" URP:
+ final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
+ checkNoError(getSolrClient().request(new V2Request.Builder("/collections/" + configName + "/config/params")
+ .withMethod(SolrRequest.METHOD.POST)
+ .withPayload("{" +
+ " 'set' : {" +
+ " '_UPDATE' : {'processor':'" + urpNames + "'}" +
+ " }" +
+ "}").build()));
+
+ CollectionAdminRequest.deleteCollection(configName).process(getSolrClient());
+ assertTrue(
+ new ConfigSetAdminRequest.List().process(getSolrClient()).getConfigSets()
+ .contains(configName)
+ );
+ }
+
+ String getIntField() {
+ return intField;
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ void checkNoError(NamedList<Object> response) {
+ Object errors = response.get("errorMessages");
+ assertNull("" + errors, errors);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ Set<String> getLeaderCoreNames(ClusterState clusterState) {
+ Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
+ List<CoreDescriptor> coreDescriptors = jettySolrRunner.getCoreContainer().getCoreDescriptors();
+ for (CoreDescriptor core : coreDescriptors) {
+ String nodeName = jettySolrRunner.getNodeName();
+ String collectionName = core.getCollectionName();
+ DocCollection collectionOrNull = clusterState.getCollectionOrNull(collectionName);
+ List<Replica> leaderReplicas = collectionOrNull.getLeaderReplicas(nodeName);
+ if (leaderReplicas != null) {
+ for (Replica leaderReplica : leaderReplicas) {
+ leaders.add(leaderReplica.getCoreName());
+ }
+ }
+ }
+ }
+ return leaders;
+ }
+
+ void assertRouting(int numShards, List<UpdateCommand> updateCommands) throws IOException {
+ try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
+ ClusterStateProvider clusterStateProvider = cloudSolrClient.getClusterStateProvider();
+ clusterStateProvider.connect();
+ Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
+ assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
+
+ assertEquals(3, updateCommands.size());
+ for (UpdateCommand updateCommand : updateCommands) {
+ String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
+ assertTrue("Update was not routed to a leader (" + node + " not in list of leaders" + leaders, leaders.contains(node));
+ }
+ }
+ }
+
public abstract String getAlias() ;
public abstract CloudSolrClient getSolrClient() ;
+ @SuppressWarnings("WeakerAccess")
void waitCol(int slices, String collection) {
waitForState("waiting for collections to be created", collection,
(liveNodes, collectionState) -> {
@@ -134,4 +285,24 @@ public abstract class RoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
private int queryNumDocs(String q) throws SolrServerException, IOException {
return (int) getSolrClient().query(getAlias(), params("q", q, "rows", "0")).getResults().getNumFound();
}
+
+ /** Adds the docs to Solr via {@link #getSolrClient()} with the params */
+ @SuppressWarnings("SameParameterValue")
+ protected UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
+ UpdateRequest req = new UpdateRequest();
+ if (params != null) {
+ req.setParams(new ModifiableSolrParams(params));// copy because will be modified
+ }
+ req.add(docs);
+ return req.process(getSolrClient(), collection);
+ }
+
+ public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
+
+ @Override
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
+ (src) -> Integer.valueOf(src.toString()) + 1);
+ }
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index 1c47b61..c1d25a6 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -22,27 +22,17 @@ import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.FieldStatsInfo;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
@@ -51,23 +41,13 @@ import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.LogLevel;
import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -82,9 +62,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
private static final String alias = "myalias";
private static final String alias2 = "myalias2";
private static final String timeField = "timestamp_dt";
- private static final String intField = "integer_i";
- private static CloudSolrClient solrClient;
+ private CloudSolrClient solrClient;
private int lastDocId = 0;
private int numDocsDeletedOrFailed = 0;
@@ -104,10 +83,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
shutdownCluster();
}
- @AfterClass
- public static void finish() throws Exception {
- IOUtils.close(solrClient);
- }
+
@Slow
@Test
@@ -124,7 +100,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
.setMaxShardsPerNode(2)
.withProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
.process(solrClient);
-
+
cluster.waitForActiveCollection(col23rd, 2, 4);
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
@@ -135,7 +111,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
- CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
.setMaxShardsPerNode(2))
.process(solrClient);
@@ -168,7 +144,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertInvariants(col24th, col23rd);
// assert that the IncrementURP has updated all '0' to '1'
- final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
+ final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + getIntField() + ":1")).getResults();
assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
//delete a random document id; ensure we don't find it
@@ -206,7 +182,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
.process(solrClient);
// add more docs, creating one new collection, but trigger ones prior to
- int numDocsToBeAutoDeleted = queryNumDocs(timeField+":[* TO \"2017-10-26T00:00:00Z\"}");
+ int numDocsToBeAutoDeleted = queryNumDocs(getTimeField() +":[* TO \"2017-10-26T00:00:00Z\"}");
addDocsAndCommit(true, // send these to alias only
newDoc(Instant.parse("2017-10-26T07:00:00Z")), // existing
newDoc(Instant.parse("2017-10-27T08:00:00Z")) // new
@@ -215,56 +191,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertInvariants(alias + "_2017-10-27", alias + "_2017-10-26");
}
- private void createConfigSet(String configName) throws SolrServerException, IOException {
- // First create a configSet
- // Then we create a collection with the name of the eventual config.
- // We configure it, and ultimately delete the collection, leaving a modified config-set behind.
- // Later we create the "real" collections referencing this modified config-set.
- assertEquals(0, new ConfigSetAdminRequest.Create()
- .setConfigSetName(configName)
- .setBaseConfigSetName("_default")
- .process(solrClient).getStatus());
-
- CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
-
- // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
- waitCol(1,configName);
- // manipulate the config...
- checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
- .withMethod(SolrRequest.METHOD.POST)
- .withPayload("{" +
- " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
- " 'add-updateprocessor' : {" +
- " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
- " }," +
- // See TrackingUpdateProcessorFactory javadocs for details...
- " 'add-updateprocessor' : {" +
- " 'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
- " }," +
- " 'add-updateprocessor' : {" + // for testing
- " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
- " 'fieldName':'" + intField + "'" +
- " }," +
- "}").build()));
- // only sometimes test with "tolerant" URP:
- final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
- checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
- .withMethod(SolrRequest.METHOD.POST)
- .withPayload("{" +
- " 'set' : {" +
- " '_UPDATE' : {'processor':'" + urpNames + "'}" +
- " }" +
- "}").build()));
-
- CollectionAdminRequest.deleteCollection(configName).process(solrClient);
- assertTrue(
- new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()
- .contains(configName)
- );
- }
-
/**
- * Test that the Tracking Update Processor Factory routes documents to leader shards and thus
+ * Test that the Update Processor Factory routes documents to leader shards and thus
* avoids the possibility of introducing an extra hop to find the leader.
*
* @throws Exception when it blows up unexpectedly :)
@@ -280,7 +208,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
// 4 of which are leaders, and 8 of which should fail this test.
final int numShards = 1 + random().nextInt(4);
final int numReplicas = 1 + random().nextInt(3);
- CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
.setMaxShardsPerNode(numReplicas))
.process(solrClient);
@@ -290,9 +218,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertUpdateResponse(solrClient.commit(alias));
// wait for all the collections to exist...
- waitColAndAlias("2017-10-23", numShards, alias);
- waitColAndAlias("2017-10-24", numShards, alias);
- waitColAndAlias("2017-10-25", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-10-23", numShards);
+ waitColAndAlias(alias, "_", "2017-10-24", numShards);
+ waitColAndAlias(alias, "_", "2017-10-25", numShards);
// at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
// 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
@@ -316,23 +244,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
updateCommands = TrackingUpdateProcessorFactory.stopRecording(trackGroupName);
}
- try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
- ClusterStateProvider clusterStateProvider = cloudSolrClient.getClusterStateProvider();
- clusterStateProvider.connect();
- Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
- assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
-
- assertEquals(3, updateCommands.size());
- for (UpdateCommand updateCommand : updateCommands) {
- String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
- assertTrue("Update was not routed to a leader (" + node + " not in list of leaders" + leaders, leaders.contains(node));
- }
- }
- }
-
- /** @see TrackingUpdateProcessorFactory */
- private String getTrackUpdatesGroupName() {
- return getSaferTestName();
+ assertRouting(numShards, updateCommands);
}
@Test
@@ -343,13 +255,13 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
final int numShards = 1 ;
final int numReplicas = 1 ;
- CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
.process(solrClient);
// needed to verify that preemptive creation in one alias doesn't inhibit preemptive creation in another
- CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
.process(solrClient);
@@ -411,8 +323,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertTrue(threadFinished[1]);
// if one of these times out then the test has failed due to interference between aliases
- waitColAndAlias("2017-10-26", numShards, alias);
- waitColAndAlias("2017-10-26", numShards, alias2);
+ waitColAndAlias(alias, "_", "2017-10-26", numShards);
+ waitColAndAlias(alias2, "_", "2017-10-26", numShards);
// after this we can ignore alias2
checkPreemptiveCase1(alias);
@@ -427,12 +339,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
//
// Start and stop some cores that have TRA's... 2x2 used to ensure every jetty gets at least one
- CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", timeField,
+ CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
.process(solrClient);
- waitColAndAlias("2017-10-23",2, "foo");
+ waitColAndAlias("foo", "_", "2017-10-23",2);
waitCoreCount("foo_2017-10-23", 1); // prove this works, for confidence in deletion checking below.
assertUpdateResponse(solrClient.add("foo",
sdoc("id","1","timestamp_dt", "2017-10-23T00:00:00Z") // no extra collections should be created
@@ -459,7 +371,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-27 now
params));
assertUpdateResponse(solrClient.commit(alias));
- waitColAndAlias("2017-10-27", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-10-27", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(5,cols.size()); // only one created in async case
@@ -473,8 +385,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-28 now
params));
assertUpdateResponse(solrClient.commit(alias));
- waitColAndAlias("2017-10-27", numShards, alias);
- waitColAndAlias("2017-10-28", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-10-27", numShards);
+ waitColAndAlias(alias, "_", "2017-10-28", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
@@ -506,7 +418,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation
params));
assertUpdateResponse(solrClient.commit(alias));
- waitColAndAlias("2017-10-29", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-10-29", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(7,cols.size());
@@ -528,8 +440,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky?
params));
assertUpdateResponse(solrClient.commit(alias));
- waitColAndAlias("2017-10-30", numShards, alias);
- waitColAndAlias("2017-10-31", numShards, alias); // spooky! async case arising in middle of sync creation!!
+ waitColAndAlias(alias, "_", "2017-10-30", numShards);
+ waitColAndAlias(alias, "_", "2017-10-31", numShards); // spooky! async case arising in middle of sync creation!!
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(9,cols.size());
@@ -551,17 +463,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "14", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-01
params));
- waitColAndAlias("2017-11-01", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-11-01", numShards);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "15", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-02
params));
- waitColAndAlias("2017-11-02", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-11-02", numShards);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "16", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-03
params));
- waitColAndAlias("2017-11-03", numShards, alias);
+ waitColAndAlias(alias, "_", "2017-11-03", numShards);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "17", "timestamp_dt", "2017-10-31T23:01:00Z")), // should NOT cause preemptive creation 11-04
@@ -573,7 +485,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "18", "timestamp_dt", "2017-11-01T23:01:00Z")), // should cause preemptive creation 11-04
params));
- waitColAndAlias("2017-11-04",numShards, alias);
+ waitColAndAlias(alias, "_", "2017-11-04",numShards);
}
@@ -651,6 +563,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertNumDocs("2017-10-26", 0, alias);
}
+ @SuppressWarnings("SameParameterValue")
private void addOneDocSynchCreation(int numShards, String alias) throws SolrServerException, IOException, InterruptedException {
// cause some collections to be created
assertUpdateResponse(solrClient.add(alias,
@@ -659,9 +572,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertUpdateResponse(solrClient.commit(alias));
// wait for all the collections to exist...
- waitColAndAlias("2017-10-23", numShards, alias); // This one should have already existed from the alias creation
- waitColAndAlias("2017-10-24", numShards, alias); // Create 1
- waitColAndAlias("2017-10-25", numShards, alias); // Create 2nd synchronously (ensure this is not broken)
+ waitColAndAlias(alias, "_", "2017-10-23", numShards); // This one should have already existed from the alias creation
+ waitColAndAlias(alias, "_", "2017-10-24", numShards); // Create 1
+ waitColAndAlias(alias, "_", "2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken)
// normal update, nothing special, no collection creation required.
List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
@@ -679,53 +592,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
assertEquals(expected, resp.getResults().getNumFound());
}
- private Set<String> getLeaderCoreNames(ClusterState clusterState) {
- Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
- List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
- for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
- List<CoreDescriptor> coreDescriptors = jettySolrRunner.getCoreContainer().getCoreDescriptors();
- for (CoreDescriptor core : coreDescriptors) {
- String nodeName = jettySolrRunner.getNodeName();
- String collectionName = core.getCollectionName();
- DocCollection collectionOrNull = clusterState.getCollectionOrNull(collectionName);
- List<Replica> leaderReplicas = collectionOrNull.getLeaderReplicas(nodeName);
- if (leaderReplicas != null) {
- for (Replica leaderReplica : leaderReplicas) {
- leaders.add(leaderReplica.getCoreName());
- }
- }
- }
- }
- return leaders;
- }
-
- private void waitColAndAlias(final String datePart, int slices, String alias) throws InterruptedException {
- // collection to exist
- String collection = alias + "_" + datePart;
- waitCol(slices, collection);
- // and alias to be aware of collection
- long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
- while (!haveCollection(alias, collection)) {
- if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
- fail("took over 10 seconds after collection creation to update aliases");
- } else {
- Thread.sleep(500);
- }
- }
- }
-
- private boolean haveCollection(String alias, String collection) {
- // separated into separate lines to make it easier to track down an NPE that occurred once
- // 3000 runs if it shows up again...
- CloudSolrClient solrClient = cluster.getSolrClient();
- ZkStateReader zkStateReader = solrClient.getZkStateReader();
- Aliases aliases = zkStateReader.getAliases();
- Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
- List<String> strings = collectionAliasListMap.get(alias);
- return strings.contains(collection);
- }
-
-
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
try {
@@ -739,11 +605,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
numDocsDeletedOrFailed++;
}
- private void checkNoError(NamedList<Object> response) { //TODO rename
- Object errors = response.get("errorMessages");
- assertNull("" + errors, errors);
- }
-
@Override
public String getAlias() {
@@ -777,11 +638,11 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
"q", "*:*",
"rows", "0",
"stats", "true",
- "stats.field", timeField));
+ "stats.field", getTimeField()));
long numFound = colStatsResp.getResults().getNumFound();
if (numFound > 0) {
totalNumFound += numFound;
- final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
+ final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(getTimeField());
assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
if (colEndInstant != null) {
assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
@@ -796,19 +657,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
private SolrInputDocument newDoc(Instant timestamp) {
return sdoc("id", Integer.toString(++lastDocId),
- timeField, timestamp.toString(),
- intField, "0"); // always 0
+ getTimeField(), timestamp.toString(),
+ getIntField(), "0"); // always 0
}
- /** Adds the docs to Solr via {@link #solrClient} with the params */
- @SuppressWarnings("SameParameterValue")
- private static UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
- UpdateRequest req = new UpdateRequest();
- if (params != null) {
- req.setParams(new ModifiableSolrParams(params));// copy because will be modified
- }
- req.add(docs);
- return req.process(solrClient, collection);
+ private String getTimeField() {
+ return timeField;
}
@Test
@@ -823,13 +677,4 @@ public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcess
TimeRoutedAlias.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
}
- public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
-
- @Override
- public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
- return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
- (src) -> Integer.valueOf(src.toString()) + 1);
- }
- }
-
}