You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/02/06 15:47:58 UTC
[lucene-solr] 03/03: SOLR-13149 MaintainCategoryRoutedAlias & test
This is an automated email from the ASF dual-hosted git repository.
gus pushed a commit to branch solr-13131
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit d42414d021e38e61d0fdea9a4863e124bb8e3d55
Author: Gus Heck <gu...@apache.org>
AuthorDate: Wed Feb 6 10:46:35 2019 -0500
SOLR-13149 MaintainCategoryRoutedAlias & test
---
.../solr/cloud/api/collections/AliasCmd.java | 26 ++-
.../cloud/api/collections/CategoryRoutedAlias.java | 78 +++----
.../solr/cloud/api/collections/CreateAliasCmd.java | 26 ++-
.../MaintainCategoryRoutedAliasCmd.java | 91 +++++++-
.../collections/MaintainTimeRoutedAliasCmd.java | 14 +-
.../OverseerCollectionMessageHandler.java | 43 ++--
.../solr/cloud/api/collections/RoutedAlias.java | 15 +-
.../cloud/api/collections/TimeRoutedAlias.java | 14 +-
.../CategoryRoutedAliasUpdateProcessorTest.java | 237 +++++++++++++++++++++
.../processor/RoutedAliasUpdateProcessorTest.java | 137 ++++++++++++
.../TimeRoutedAliasUpdateProcessorTest.java | 98 +--------
.../solrj/request/CollectionAdminRequest.java | 57 ++++-
12 files changed, 619 insertions(+), 217 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
index c653569..05cca40 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -17,6 +17,8 @@
package org.apache.solr.cloud.api.collections;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.solr.cloud.Overseer;
@@ -24,14 +26,16 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
-import static org.apache.solr.cloud.api.collections.RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
import static org.apache.solr.cloud.api.collections.RoutedAlias.CREATE_COLLECTION_PREFIX;
+import static org.apache.solr.cloud.api.collections.RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CommonParams.NAME;
@@ -40,14 +44,14 @@ import static org.apache.solr.common.params.CommonParams.NAME;
* means, given the current state of the alias and some information from a routed field in a document that
* may imply a need for changes, create, delete or otherwise modify collections as required.
*/
-public abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
+abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
/**
* Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
* If the collection already exists then this is not an error.<p>
*/
- NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
- String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
+ static NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
+ String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
// Map alias metadata starting with a prefix to a create-collection API request
final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
@@ -87,4 +91,18 @@ public abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
return results;
}
+ void updateAlias(String aliasName, ZkStateReader.AliasesManager aliasesManager, String createCollName) {
+ aliasesManager.applyModificationAndExportToZk(curAliases -> {
+ final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
+ if (curTargetCollections.contains(createCollName)) {
+ return curAliases;
+ } else {
+ List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
+ // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
+ newTargetCollections.add(createCollName);
+ newTargetCollections.addAll(curTargetCollections);
+ return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
+ }
+ });
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
index a20cf63..3de1e41 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
@@ -18,16 +18,12 @@
package org.apache.solr.cloud.api.collections;
import java.lang.invoke.MethodHandles;
-import java.util.AbstractMap;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
@@ -41,30 +37,40 @@ import org.apache.solr.update.AddUpdateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CategoryRoutedAlias implements RoutedAlias<String> {
+public class CategoryRoutedAlias implements RoutedAlias {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String COLLECTION_INFIX = "__CRA__";
+ private static final String COLLECTION_INFIX = "__CRA__";
+
+ // This constant is terribly annoying but a great many things fall apart if we allow an alias with
+ // no collections to be created. So this kludge seems better than reworking every request path that
+ // expects a collection but also works with an alias to handle or error out on empty alias. The
+ // collection with this constant as a suffix is automatically removed after the alias begins to
+ // receive data.
+ public static final String UNINITIALIZED = "NEW_CATEGORY_ROUTED_ALIAS_WAITING_FOR_DATA__TEMP";
/**
* Parameters required for creating a category routed alias
*/
+ @SuppressWarnings("WeakerAccess")
public static final Set<String> REQUIRED_ROUTER_PARAMS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
CommonParams.NAME,
ROUTER_TYPE_NAME,
ROUTER_FIELD)));
+ @SuppressWarnings("WeakerAccess")
public static final String ROUTER_MAX_CARDINALITY = "router.maxCardinality";
+ @SuppressWarnings("WeakerAccess")
public static final String ROUTER_MUST_MATCH = "router.mustMatch";
/**
* Optional parameters for creating a category routed alias excluding parameters for collection creation.
*/
+ @SuppressWarnings("WeakerAccess")
public static final Set<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
ROUTER_MAX_CARDINALITY,
ROUTER_MUST_MATCH)));
- private List<String> collectionNames; // List of collections currently in the CRA
- private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
+ private Aliases parsedAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
private final String aliasName;
private final Map<String, String> aliasMetadata;
@@ -76,13 +82,12 @@ public class CategoryRoutedAlias implements RoutedAlias<String> {
@Override
public boolean updateParsedCollectionAliases(ZkController zkController) {
final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
- if (this.parsedCollectionsAliases != aliases) {
- if (this.parsedCollectionsAliases != null) {
+ if (this.parsedAliases != aliases) {
+ if (this.parsedAliases != null) {
log.debug("Observing possibly updated alias: {}", getAliasName());
}
// slightly inefficient, but not easy to make changes to the return value of parseCollections
- this.collectionNames = parseCollections(aliases).stream().map(Map.Entry::getValue).collect(Collectors.toList());
- this.parsedCollectionsAliases = aliases;
+ this.parsedAliases = aliases;
return true;
}
return false;
@@ -99,43 +104,6 @@ public class CategoryRoutedAlias implements RoutedAlias<String> {
}
@Override
- public List<Map.Entry<String, String>> parseCollections(Aliases aliases) {
- final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
- if (collections == null) {
- throw RoutedAlias.newAliasMustExistException(getAliasName());
- }
- List<Map.Entry<String,String>> result = new ArrayList<>(collections.size());
- for (String collection : collections) {
- String collCategory = parseCategoryFromCollectionName(aliasName, collection);
- result.add(new AbstractMap.SimpleImmutableEntry<>(collCategory, collection));
- }
- // TODO Think about this... is order needed? if so perhaps better if insertion maintains order?
- result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
-
- // note that this is also sorted by value since the value corresponds to the key plus a the alias name which
- // is constant within a given alias.
- return result;
- }
-
- /**
- * Pattern for Category Routed Alias is aliasName__CRA__datadrivincategory. The __CRA__ infix is to
- * reduce the chance of inadvertently duplicating (or worse yet, adopting) other collections
- * that are not supposed to be included in the alias. With Time routed aliases the timestamp in
- * the collection name was sufficiently unique, but given that aliasName could be anything and
- * 2 part collection names of the form foo_bar are probably common in the wild, the infix seems
- * necessary.
- *
- */
- private String parseCategoryFromCollectionName(String aliasName, String collection) {
- String prefix = aliasName + COLLECTION_INFIX;
- if (!collection.startsWith(prefix)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Category Routed Alias collection names " +
- "must start with the name of the alias plus " + COLLECTION_INFIX);
- }
- return collection.substring(prefix.length(),collection.length());
- }
-
- @Override
public void validateRouteValue(AddUpdateCommand cmd) throws SolrException {
//Mostly this will be filled out by SOLR-13150 and SOLR-13151
}
@@ -151,7 +119,7 @@ public class CategoryRoutedAlias implements RoutedAlias<String> {
return dataValue.trim().replaceAll("\\W", "_");
}
- private String buildCollectionNameFromValue(String value) {
+ String buildCollectionNameFromValue(String value) {
return aliasName + COLLECTION_INFIX + safeKeyValue(value);
}
@@ -169,7 +137,7 @@ public class CategoryRoutedAlias implements RoutedAlias<String> {
// Note: CRA's have no way to predict values that determine collection so preemptive async creation
// is not possible. We have no choice but to block and wait (to do otherwise would imperil the overseer).
do {
- if (this.collectionNames.contains(candidateCollectionName)) {
+ if (getCollectionList(this.parsedAliases).contains(candidateCollectionName)) {
return candidateCollectionName;
} else {
// this could time out in which case we simply let it throw an error
@@ -196,9 +164,13 @@ public class CategoryRoutedAlias implements RoutedAlias<String> {
}
}
+ private List<String> getCollectionList(Aliases p) {
+ return p.getCollectionAliasListMap().get(this.aliasName);
+ }
+
@Override
- public Optional<String> computeInitialCollectionName() {
- return Optional.empty();
+ public String computeInitialCollectionName() {
+ return buildCollectionNameFromValue(UNINITIALIZED);
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
index 320a4ae..0e41baf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -36,6 +35,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
public class CreateAliasCmd extends AliasCmd {
@@ -45,6 +45,7 @@ public class CreateAliasCmd extends AliasCmd {
return message.keySet().stream().anyMatch(k -> k.startsWith(RoutedAlias.ROUTER_PREFIX));
}
+ @SuppressWarnings("WeakerAccess")
public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@@ -79,7 +80,7 @@ public class CreateAliasCmd extends AliasCmd {
private void callCreatePlainAlias(ZkNodeProps message, String aliasName, ZkStateReader zkStateReader) {
final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
- validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
+ validateAllCollectionsExistAndNoDuplicates(canonicalCollectionList, zkStateReader);
zkStateReader.aliasesManager
.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
}
@@ -97,6 +98,7 @@ public class CreateAliasCmd extends AliasCmd {
.collect(Collectors.toList());
}
+ @SuppressWarnings("unchecked")
private void callCreateRoutedAlias(ZkNodeProps message, String aliasName, ZkStateReader zkStateReader, ClusterState state) throws Exception {
// Validate we got a basic minimum
if (!message.getProperties().keySet().containsAll(RoutedAlias.MINIMAL_REQUIRED_PARAMS)) {
@@ -110,31 +112,27 @@ public class CreateAliasCmd extends AliasCmd {
// Further validation happens here
RoutedAlias routedAlias = RoutedAlias.fromProps(aliasName, props);
+ if (routedAlias == null) {
+ // should never happen here, but keep static analysis in IDE's happy...
+ throw new SolrException(SERVER_ERROR,"Tried to create a routed alias with no type!");
+ }
- // If we can, create the first collection.
- Optional<String> initialCollectionName = routedAlias.computeInitialCollectionName();
- if (initialCollectionName.isPresent()) {
- String initialColl = initialCollectionName.get();
+ // Create the first collection.
+ String initialColl = routedAlias.computeInitialCollectionName();
ensureAliasCollection(aliasName, zkStateReader, state, routedAlias.getAliasMetadata(), initialColl);
// Create/update the alias
zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> aliases
.cloneWithCollectionAlias(aliasName, initialColl)
.cloneWithCollectionAliasProperties(aliasName, routedAlias.getAliasMetadata()));
- return;
- }
-
- // Create/update the alias
- zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> aliases
- .cloneWithCollectionAliasProperties(aliasName, routedAlias.getAliasMetadata()));
}
private void ensureAliasCollection(String aliasName, ZkStateReader zkStateReader, ClusterState state, Map<String, String> aliasProperties, String initialCollectionName) throws Exception {
// Create the collection
createCollectionAndWait(state, aliasName, aliasProperties, initialCollectionName, ocmh);
- validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);
+ validateAllCollectionsExistAndNoDuplicates(Collections.singletonList(initialCollectionName), zkStateReader);
}
- private void validateAllCollectionsExistAndNoDups(List<String> collectionList, ZkStateReader zkStateReader) {
+ private void validateAllCollectionsExistAndNoDuplicates(List<String> collectionList, ZkStateReader zkStateReader) {
final String collectionStr = StrUtils.join(collectionList, ',');
if (new HashSet<>(collectionList).size() != collectionList.size()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
index 93a832b..a0a7199 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
@@ -17,28 +17,49 @@
package org.apache.solr.cloud.api.collections;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+import static org.apache.solr.cloud.api.collections.CategoryRoutedAlias.UNINITIALIZED;
+import static org.apache.solr.common.params.CommonParams.NAME;
public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
+ @SuppressWarnings("WeakerAccess")
public static final String IF_CATEGORY_COLLECTION_NOT_FOUND = "ifCategoryCollectionNotFound";
+ private final OverseerCollectionMessageHandler ocmh;
+
+ MaintainCategoryRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
/**
* Invokes this command from the client. If there's a problem it will throw an exception.
* Please note that is important to never add async to this invocation. This method must
* block (up to the standard OCP timeout) to prevent large batches of add's from sending a message
* to the overseer for every document added in RoutedAliasUpdateProcessor.
*/
- public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String categoryCollection)
+ @SuppressWarnings("WeakerAccess")
+ public static void remoteInvoke(CollectionsHandler collHandler, String aliasName, String categoryCollection)
throws Exception {
final String operation = CollectionParams.CollectionAction.MAINTAINCATEGORYROUTEDALIAS.toLower();
Map<String, Object> msg = new HashMap<>();
@@ -49,11 +70,75 @@ public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
if (rsp.getException() != null) {
throw rsp.getException();
}
- return rsp.getResponse();
}
@Override
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- //todo
+ //---- PARSE PRIMARY MESSAGE PARAMS
+ // important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us
+ final String aliasName = message.getStr(NAME);
+ // the client believes this collection name should exist. Our goal is to ensure it does.
+ final String categoryRequired = message.getStr(IF_CATEGORY_COLLECTION_NOT_FOUND); // optional
+
+
+ //---- PARSE ALIAS INFO FROM ZK
+ final ZkStateReader.AliasesManager aliasesManager = ocmh.zkStateReader.aliasesManager;
+ final Aliases aliases = aliasesManager.getAliases();
+ final Map<String, String> aliasMetadata = aliases.getCollectionAliasProperties(aliasName);
+ if (aliasMetadata == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Alias " + aliasName + " does not exist."); // if it did exist, we'd have a non-null map
+ }
+ final CategoryRoutedAlias categoryRoutedAlias = (CategoryRoutedAlias) RoutedAlias.fromProps(aliasName, aliasMetadata);
+
+ if (categoryRoutedAlias == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, getClass() + " got alias metadata with an " +
+ "invalid routing type and produced null");
+ }
+
+
+ //---- SEARCH FOR REQUESTED COLL
+ Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
+
+ // if we found it the collection already exists and we're done (concurrent creation on another request)
+ // so this if does not need an else.
+ if (!collectionAliasListMap.get(aliasName).contains(categoryRequired)) {
+ //---- DETECT and REMOVE the initial place holder collection if it still exists:
+
+ String initialCollection = categoryRoutedAlias.buildCollectionNameFromValue(UNINITIALIZED);
+
+ // important not to delete the place holder collection it until after a second collection exists,
+ // otherwise we have a situation where the alias has no collections briefly and concurrent
+ // requests to the alias will fail with internal errors (incl. queries etc).
+ // TODO avoid this contains check?
+ List<String> colList = new ArrayList<>(collectionAliasListMap.get(aliasName));
+ if (colList.contains(initialCollection) && colList.size() > 1) {
+
+ aliasesManager.applyModificationAndExportToZk(curAliases -> {
+ colList.remove(initialCollection);
+ final String collectionsToKeepStr = StrUtils.join(colList, ',');
+ return curAliases.cloneWithCollectionAlias(aliasName, collectionsToKeepStr);
+ });
+ final CollectionsHandler collHandler = ocmh.overseer.getCoreContainer().getCollectionsHandler();
+ final SolrParams reqParams = CollectionAdminRequest
+ .deleteCollection(initialCollection).getParams();
+ SolrQueryResponse rsp = new SolrQueryResponse();
+ collHandler.handleRequestBody(new LocalSolrQueryRequest(null, reqParams), rsp);
+ //noinspection unchecked
+ results.add(UNINITIALIZED, rsp.getValues());
+ }
+
+ //---- CREATE THE COLLECTION
+ NamedList createResults = createCollectionAndWait(state, aliasName, aliasMetadata,
+ categoryRequired, ocmh);
+ if (createResults != null) {
+ //noinspection unchecked
+ results.add("create", createResults);
+ }
+ //---- UPDATE THE ALIAS WITH NEW COLLECTION
+ updateAlias(aliasName, aliasesManager, categoryRequired);
+ }
+
}
+
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
index 4751979..cb95d76 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
@@ -20,7 +20,6 @@ package org.apache.solr.cloud.api.collections;
import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@@ -153,18 +152,7 @@ public class MaintainTimeRoutedAliasCmd extends AliasCmd {
}
//---- UPDATE THE ALIAS WITH NEW COLLECTION
- aliasesManager.applyModificationAndExportToZk(curAliases -> {
- final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
- if (curTargetCollections.contains(createCollName)) {
- return curAliases;
- } else {
- List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
- // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
- newTargetCollections.add(createCollName);
- newTargetCollections.addAll(curTargetCollections);
- return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
- }
- });
+ updateAlias(aliasName, aliasesManager, createCollName);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index c29c711..ccccffd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -16,26 +16,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
-import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
-
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -52,6 +32,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
@@ -111,7 +92,25 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
/**
* A {@link OverseerMessageHandler} that handles Collections API related
@@ -234,7 +233,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETEALIAS, new DeleteAliasCmd(this))
.put(ALIASPROP, new SetAliasPropCmd(this))
.put(MAINTAINTIMEROUTEDALIAS, new MaintainTimeRoutedAliasCmd(this))
- .put(MAINTAINCATEGORYROUTEDALIAS, new MaintainTimeRoutedAliasCmd(this))
+ .put(MAINTAINCATEGORYROUTEDALIAS, new MaintainCategoryRoutedAliasCmd(this))
.put(OVERSEERSTATUS, new OverseerStatusCmd(this))
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
index c083ba2..8bb95cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
@@ -18,22 +18,19 @@
package org.apache.solr.cloud.api.collections;
import java.util.Arrays;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.update.AddUpdateCommand;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
-public interface RoutedAlias<K> {
+public interface RoutedAlias {
/**
* Types supported. Every entry here must have a case in the switch statement in {@link #fromProps(String, Map)}
@@ -68,7 +65,7 @@ public interface RoutedAlias<K> {
String typeStr = props.get(ROUTER_TYPE_NAME);
if (typeStr == null) {
- return null;
+ return null; // non-routed aliases are being created
}
SupportedRouterTypes routerType;
try {
@@ -106,7 +103,7 @@ public interface RoutedAlias<K> {
*
* @return optional string of initial collection name
*/
- Optional<String> computeInitialCollectionName();
+ String computeInitialCollectionName();
/**
@@ -118,11 +115,7 @@ public interface RoutedAlias<K> {
String getRouteField();
- /**
- * Parses the elements of the collection list. Result is returned them in sorted order (desc) if there
- * is a natural order for this type of routed alias
- */
- List<Map.Entry<K, String>> parseCollections(Aliases aliases);
+
/**
* Check that the value we will be routing on is legal for this type of routed alias.
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
index e48d406..dcd4a37 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
@@ -34,7 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
@@ -71,7 +70,7 @@ import static org.apache.solr.common.params.CommonParams.TZ;
* @see MaintainTimeRoutedAliasCmd
* @see RoutedAliasUpdateProcessor
*/
-public class TimeRoutedAlias implements RoutedAlias<Instant> {
+public class TimeRoutedAlias implements RoutedAlias {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// This class is created once per request and the overseer methods prevent duplicate create requests
@@ -199,8 +198,8 @@ public class TimeRoutedAlias implements RoutedAlias<Instant> {
}
@Override
- public Optional<String> computeInitialCollectionName() {
- return Optional.of(formatCollectionNameFromInstant(aliasName, parseStringAsInstant(this.start, timeZone)));
+ public String computeInitialCollectionName() {
+ return formatCollectionNameFromInstant(aliasName, parseStringAsInstant(this.start, timeZone));
}
public static Instant parseInstantFromCollectionName(String aliasName, String collection) {
@@ -289,9 +288,10 @@ public class TimeRoutedAlias implements RoutedAlias<Instant> {
.add("timeZone", timeZone)
.toString();
}
-
- /** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
- public List<Map.Entry<Instant,String>> parseCollections(Aliases aliases) {
+ /**
+ * Parses the elements of the collection list. Result is returned them in sorted order (most recent 1st)
+ */
+ List<Map.Entry<Instant,String>> parseCollections(Aliases aliases) {
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
if (collections == null) {
throw RoutedAlias.newAliasMustExistException(getAliasName());
diff --git a/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
new file mode 100644
index 0000000..850f4d1
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.api.collections.CategoryRoutedAlias;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CategoryRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcessorTest {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // use this for example categories
+ private static final String[] SHIPS = {
+ "Constructor",
+ "Heart of Gold",
+ "Stunt Ship",
+ "B-ark",
+ "Bi$tromath"
+ };
+
+ private static final String categoryField = "ship_name_en";
+ private static final String intField = "integer_i";
+
+ private int lastDocId = 0;
+ private int numDocsDeletedOrFailed = 0;
+ private static CloudSolrClient solrClient;
+
+ @Before
+ public void doBefore() throws Exception {
+ configureCluster(4).configure();
+ solrClient = getCloudSolrClient(cluster);
+ //log this to help debug potential causes of problems
+ log.info("SolrClient: {}", solrClient);
+ log.info("ClusterStateProvider {}", solrClient.getClusterStateProvider());
+ }
+
+ @After
+ public void doAfter() throws Exception {
+ solrClient.close();
+ shutdownCluster();
+ }
+
+ @AfterClass
+ public static void finish() throws Exception {
+ IOUtils.close(solrClient);
+ }
+
+ @Slow
+ @Test
+ public void test() throws Exception {
+ String configName = getSaferTestName();
+ createConfigSet(configName);
+
+ // Start with one collection manually created (and use higher numShards & replicas than we'll use for others)
+ // This tests we may pre-create the collection and it's acceptable.
+ final String colVogon = getAlias() + "__CRA__" + SHIPS[0];
+
+ // we expect changes ensuring a legal collection name.
+ final String colHoG = getAlias() + "__CRA__" + SHIPS[1].replaceAll("\\s", "_");
+ final String colStunt = getAlias() + "__CRA__" + SHIPS[2].replaceAll("\\s", "_");
+ final String colArk = getAlias() + "__CRA__" + SHIPS[3].replaceAll("-","_");
+ final String colBistro = getAlias() + "__CRA__" + SHIPS[4].replaceAll("\\$", "_");
+
+ List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+ List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+ // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+ assertTrue("We expect at least 2 configSets",
+ retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+ assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+ CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField,
+ CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+ .setMaxShardsPerNode(2))
+ .process(solrClient);
+
+ // now we index a document
+ addDocsAndCommit(true, newDoc(SHIPS[0]));
+ //assertDocRoutedToCol(lastDocId, col23rd);
+
+ String uninitialized = getAlias() + "__CRA__" + CategoryRoutedAlias.UNINITIALIZED;
+ assertInvariants(colVogon, uninitialized);
+
+ addDocsAndCommit(true,
+ newDoc(SHIPS[1]),
+ newDoc(SHIPS[2]),
+ newDoc(SHIPS[3]),
+ newDoc(SHIPS[4]));
+
+ assertInvariants(colVogon, colHoG, colStunt, colArk, colBistro);
+ }
+
+ private void createConfigSet(String configName) throws SolrServerException, IOException {
+ // First create a configSet
+ // Then we create a collection with the name of the eventual config.
+ // We configure it, and ultimately delete the collection, leaving a modified config-set behind.
+ // Later we create the "real" collections referencing this modified config-set.
+ assertEquals(0, new ConfigSetAdminRequest.Create()
+ .setConfigSetName(configName)
+ .setBaseConfigSetName("_default")
+ .process(solrClient).getStatus());
+
+ CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
+
+ // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
+ waitCol(1, configName);
+ // manipulate the config...
+ checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
+ .withMethod(SolrRequest.METHOD.POST)
+ .withPayload("{" +
+ " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
+ " 'add-updateprocessor' : {" +
+ " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
+ " }," +
+ " 'add-updateprocessor' : {" + // for testing
+ " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
+ " 'fieldName':'" + intField + "'" +
+ " }," +
+ "}").build()));
+ // only sometimes test with "tolerant" URP:
+ final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
+ checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
+ .withMethod(SolrRequest.METHOD.POST)
+ .withPayload("{" +
+ " 'set' : {" +
+ " '_UPDATE' : {'processor':'" + urpNames + "'}" +
+ " }" +
+ "}").build()));
+
+ CollectionAdminRequest.deleteCollection(configName).process(solrClient);
+ assertTrue(
+ new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()
+ .contains(configName)
+ );
+ }
+
+ private void checkNoError(NamedList<Object> response) {
+ Object errors = response.get("errorMessages");
+ assertNull("" + errors, errors);
+ }
+
+ private void assertInvariants(String... expectedColls) throws IOException, SolrServerException {
+ final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
+
+ List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(getAlias());
+ cols = new ArrayList<>(cols);
+ cols.sort(String::compareTo); // don't really care about the order here.
+ assert !cols.isEmpty();
+
+ int totalNumFound = 0;
+ for (String col : cols) {
+ final QueryResponse colResponse = solrClient.query(col, params(
+ "q", "*:*",
+ "rows", "0"));
+ long numFound = colResponse.getResults().getNumFound();
+ if (numFound > 0) {
+ totalNumFound += numFound;
+ }
+ }
+ final QueryResponse colResponse = solrClient.query(getAlias(), params(
+ "q", "*:*",
+ "rows", "0"));
+ long aliasNumFound = colResponse.getResults().getNumFound();
+ List<String> actual = Arrays.asList(expectedColls);
+ actual.sort(String::compareTo);
+ assertArrayEquals("Expected " + expectedColls.length + " collections, found " + cols.size() + ":\n" +
+ cols + " vs \n" + actual, expectedColls, cols.toArray());
+ assertEquals("Expected collections and alias to have same number of documents",
+ aliasNumFound, totalNumFound);
+ assertEquals("Expected to find " + expectNumFound + " docs but found " + aliasNumFound,
+ expectNumFound, aliasNumFound);
+ }
+
+ private SolrInputDocument newDoc(String routedValue) {
+ return sdoc("id", Integer.toString(++lastDocId),
+ categoryField, routedValue,
+ intField, "0"); // always 0
+ }
+
+ @Override
+ public String getAlias() {
+ return "myAlias";
+ }
+
+ @Override
+ public CloudSolrClient getSolrClient() {
+ return solrClient;
+ }
+
+
+ public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
+
+ @Override
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ return FieldValueMutatingUpdateProcessor.valueMutator(getSelector(), next,
+ (src) -> Integer.valueOf(src.toString()) + 1);
+ }
+ }
+
+}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
new file mode 100644
index 0000000..0f3bfa2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.junit.Ignore;
+
+@Ignore // don't try too run abstract base class
+public abstract class RoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+
+ public abstract String getAlias() ;
+
+ public abstract CloudSolrClient getSolrClient() ;
+
+
+ void waitCol(int slices, String collection) {
+ waitForState("waiting for collections to be created", collection,
+ (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ // per predicate javadoc, this is what we get if the collection doesn't exist at all.
+ return false;
+ }
+ Collection<Slice> activeSlices = collectionState.getActiveSlices();
+ int size = activeSlices.size();
+ return size == slices;
+ });
+ }
+
+ /** Adds these documents and commits, returning when they are committed.
+ * We randomly go about this in different ways. */
+ void addDocsAndCommit(boolean aliasOnly, SolrInputDocument... solrInputDocuments) throws Exception {
+ // we assume all docs will be added (none too old/new to cause exception)
+ Collections.shuffle(Arrays.asList(solrInputDocuments), random());
+
+ // this is a list of the collections & the alias name. Use to pick randomly where to send.
+ // (it doesn't matter where we send docs since the alias is honored at the URP level)
+ List<String> collections = new ArrayList<>();
+ collections.add(getAlias());
+ if (!aliasOnly) {
+ collections.addAll(new CollectionAdminRequest.ListAliases().process(getSolrClient()).getAliasesAsLists().get(getAlias()));
+ }
+
+ int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
+
+ if (random().nextBoolean()) {
+ // Send in separate threads. Choose random collection & solrClient
+ try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
+ ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
+ new DefaultSolrThreadFactory(getSaferTestName()));
+ List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
+ for (SolrInputDocument solrInputDocument : solrInputDocuments) {
+ String col = collections.get(random().nextInt(collections.size()));
+ futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
+ }
+ for (Future<UpdateResponse> future : futures) {
+ assertUpdateResponse(future.get());
+ }
+ // at this point there shouldn't be any tasks running
+ assertEquals(0, exec.shutdownNow().size());
+ }
+ } else {
+ // send in a batch.
+ String col = collections.get(random().nextInt(collections.size()));
+ try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
+ assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
+ }
+ }
+ String col = collections.get(random().nextInt(collections.size()));
+ if (commitWithin == -1) {
+ getSolrClient().commit(col);
+ } else {
+ // check that it all got committed eventually
+ String docsQ =
+ "{!terms f=id}"
+ + Arrays.stream(solrInputDocuments).map(d -> d.getFieldValue("id").toString())
+ .collect(Collectors.joining(","));
+ int numDocs = queryNumDocs(docsQ);
+ if (numDocs == solrInputDocuments.length) {
+ System.err.println("Docs committed sooner than expected. Bug or slow test env?");
+ return;
+ }
+ // wait until it's committed
+ Thread.sleep(commitWithin);
+ for (int idx = 0; idx < 100; ++idx) { // Loop for up to 10 seconds waiting for commit to catch up
+ numDocs = queryNumDocs(docsQ);
+ if (numDocs == solrInputDocuments.length) break;
+ Thread.sleep(100);
+ }
+
+ assertEquals("not committed. Bug or a slow test?",
+ solrInputDocuments.length, numDocs);
+ }
+ }
+
+ void assertUpdateResponse(UpdateResponse rsp) {
+ // use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
+ List errors = (List) rsp.getResponseHeader().get("errors");
+ assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
+ }
+
+ private int queryNumDocs(String q) throws SolrServerException, IOException {
+ return (int) getSolrClient().query(getAlias(), params("q", q, "rows", "0")).getResults().getNumFound();
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index 4fc1d50..c6722f5 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -17,13 +17,10 @@
package org.apache.solr.update.processor;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -34,8 +31,6 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
@@ -51,7 +46,6 @@ import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.FieldStatsInfo;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.api.collections.RoutedAlias;
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
import org.apache.solr.common.SolrDocumentList;
@@ -61,7 +55,6 @@ import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -72,7 +65,6 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.UpdateCommand;
-import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.AfterClass;
@@ -81,8 +73,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13059")
-public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcessorTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String alias = "myalias";
@@ -731,18 +725,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
return strings.contains(collection);
}
- private void waitCol(int slices, String collection) {
- waitForState("waiting for collections to be created", collection,
- (liveNodes, collectionState) -> {
- if (collectionState == null) {
- // per predicate javadoc, this is what we get if the collection doesn't exist at all.
- return false;
- }
- Collection<Slice> activeSlices = collectionState.getActiveSlices();
- int size = activeSlices.size();
- return size == slices;
- });
- }
+
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
try {
@@ -761,76 +744,15 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
assertNull("" + errors, errors);
}
- /** Adds these documents and commits, returning when they are committed.
- * We randomly go about this in different ways. */
- private void addDocsAndCommit(boolean aliasOnly, SolrInputDocument... solrInputDocuments) throws Exception {
- // we assume all docs will be added (none too old/new to cause exception)
- Collections.shuffle(Arrays.asList(solrInputDocuments), random());
-
- // this is a list of the collections & the alias name. Use to pick randomly where to send.
- // (it doesn't matter where we send docs since the alias is honored at the URP level)
- List<String> collections = new ArrayList<>();
- collections.add(alias);
- if (!aliasOnly) {
- collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
- }
- int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
-
- if (random().nextBoolean()) {
- // Send in separate threads. Choose random collection & solrClient
- try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
- ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
- new DefaultSolrThreadFactory(getSaferTestName()));
- List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
- for (SolrInputDocument solrInputDocument : solrInputDocuments) {
- String col = collections.get(random().nextInt(collections.size()));
- futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
- }
- for (Future<UpdateResponse> future : futures) {
- assertUpdateResponse(future.get());
- }
- // at this point there shouldn't be any tasks running
- assertEquals(0, exec.shutdownNow().size());
- }
- } else {
- // send in a batch.
- String col = collections.get(random().nextInt(collections.size()));
- try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
- assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
- }
- }
- String col = collections.get(random().nextInt(collections.size()));
- if (commitWithin == -1) {
- solrClient.commit(col);
- } else {
- // check that it all got committed eventually
- String docsQ =
- "{!terms f=id}"
- + Arrays.stream(solrInputDocuments).map(d -> d.getFieldValue("id").toString())
- .collect(Collectors.joining(","));
- int numDocs = queryNumDocs(docsQ);
- if (numDocs == solrInputDocuments.length) {
- System.err.println("Docs committed sooner than expected. Bug or slow test env?");
- return;
- }
- // wait until it's committed
- Thread.sleep(commitWithin);
- for (int idx = 0; idx < 100; ++idx) { // Loop for up to 10 seconds waiting for commit to catch up
- numDocs = queryNumDocs(docsQ);
- if (numDocs == solrInputDocuments.length) break;
- Thread.sleep(100);
- }
-
- assertEquals("not committed. Bug or a slow test?",
- solrInputDocuments.length, numDocs);
- }
+ @Override
+ public String getAlias() {
+ return alias;
}
- private void assertUpdateResponse(UpdateResponse rsp) {
- // use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
- List errors = (List) rsp.getResponseHeader().get("errors");
- assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
+ @Override
+ public CloudSolrClient getSolrClient() {
+ return solrClient;
}
private int queryNumDocs(String q) throws SolrServerException, IOException {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 6f7af617..d0fc34b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1515,8 +1515,8 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
/**
- * Returns a SolrRequest to create a routed alias. Only time based routing is supported presently,
- * For time based routing, the start a standard Solr timestamp string (possibly with "date math").
+ * Returns a SolrRequest to create a time routed alias. For time based routing, the start
+ * should be a standard Solr timestamp string (possibly with "date math").
*
* @param aliasName the name of the alias to create.
* @param start the start of the routing. A standard Solr date: ISO-8601 or NOW with date math.
@@ -1614,6 +1614,59 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
}
+ /**
+ * Returns a SolrRequest to create a category routed alias.
+ *
+ * @param aliasName the name of the alias to create.
+ * @param routerField the document field to contain the timestamp to route on
+ * @param createCollTemplate Holds options to create a collection. The "name" is ignored.
+ */
+ public static CreateCategoryRoutedAlias createCategoryRoutedAlias(String aliasName,
+ String routerField,
+ Create createCollTemplate) {
+
+ return new CreateCategoryRoutedAlias(aliasName, routerField, createCollTemplate);
+ }
+
+ public static class CreateCategoryRoutedAlias extends AsyncCollectionAdminRequest {
+
+ public static final String ROUTER_TYPE_NAME = "router.name";
+ public static final String ROUTER_FIELD = "router.field";
+
+ private final String aliasName;
+ private final String routerField;
+
+ private final Create createCollTemplate;
+
+ public CreateCategoryRoutedAlias(String aliasName, String routerField, Create createCollTemplate) {
+ super(CollectionAction.CREATEALIAS);
+ this.aliasName = aliasName;
+ this.routerField = routerField;
+ this.createCollTemplate = createCollTemplate;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+ params.add(CommonParams.NAME, aliasName);
+ params.add(ROUTER_TYPE_NAME, "category");
+ params.add(ROUTER_FIELD, routerField);
+
+ // merge the above with collectionParams. Above takes precedence.
+ ModifiableSolrParams createCollParams = new ModifiableSolrParams(); // output target
+ final SolrParams collParams = createCollTemplate.getParams();
+ final Iterator<String> pIter = collParams.getParameterNamesIterator();
+ while (pIter.hasNext()) {
+ String key = pIter.next();
+ if (key.equals(CollectionParams.ACTION) || key.equals("name")) {
+ continue;
+ }
+ createCollParams.set("create-collection." + key, collParams.getParams(key));
+ }
+ return SolrParams.wrapDefaults(params, createCollParams);
+ }
+
+ }
/**
* Returns a SolrRequest to delete an alias