You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/03/13 05:33:35 UTC

[lucene-solr] branch master updated: SOLR-13131 Category Routed Aliases

This is an automated email from the ASF dual-hosted git repository.

gus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f2a02  SOLR-13131 Category Routed Aliases
d8f2a02 is described below

commit d8f2a02fdb11a484425f9fddfa7061711d2f0034
Author: Gus Heck <gu...@apache.org>
AuthorDate: Wed Mar 13 00:54:32 2019 -0400

    SOLR-13131 Category Routed Aliases
---
 solr/CHANGES.txt                                   |   6 +-
 .../solr/cloud/api/collections/AliasCmd.java       | 108 +++++
 .../cloud/api/collections/CategoryRoutedAlias.java | 265 +++++++++++
 .../solr/cloud/api/collections/CreateAliasCmd.java |  76 ++-
 .../MaintainCategoryRoutedAliasCmd.java            | 183 ++++++++
 ...iasCmd.java => MaintainTimeRoutedAliasCmd.java} |  87 +---
 .../OverseerCollectionMessageHandler.java          |  76 +--
 .../solr/cloud/api/collections/RoutedAlias.java    | 145 ++++++
 .../cloud/api/collections/TimeRoutedAlias.java     | 393 ++++++++++++++--
 .../java/org/apache/solr/core/CoreContainer.java   | 121 +++--
 .../solr/handler/admin/CollectionsHandler.java     |  57 ++-
 .../DistributedUpdateProcessorFactory.java         |   2 +-
 .../processor/RoutedAliasUpdateProcessor.java      | 252 ++++++++++
 .../processor/TimeRoutedAliasUpdateProcessor.java  | 507 ---------------------
 .../apache/solr/cloud/CreateRoutedAliasTest.java   |   2 +-
 .../CategoryRoutedAliasUpdateProcessorTest.java    | 476 +++++++++++++++++++
 .../processor/RoutedAliasUpdateProcessorTest.java  | 308 +++++++++++++
 .../TimeRoutedAliasUpdateProcessorTest.java        | 332 ++------------
 solr/solr-ref-guide/src/aliases.adoc               | 267 +++++++++++
 solr/solr-ref-guide/src/collections-api.adoc       |  60 ++-
 solr/solr-ref-guide/src/distributed-requests.adoc  |   1 +
 solr/solr-ref-guide/src/how-solrcloud-works.adoc   |   4 +-
 solr/solr-ref-guide/src/time-routed-aliases.adoc   | 119 -----
 .../solrj/request/CollectionAdminRequest.java      |  74 ++-
 .../solr/common/params/CollectionParams.java       |   3 +-
 .../resources/apispec/collections.Commands.json    |   8 +
 26 files changed, 2711 insertions(+), 1221 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5afd706..0206d44 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -38,10 +38,14 @@ Upgrade Notes
 * SOLR-12891: MacroExpander will no longer will expand URL parameters inside of the 'expr' parameter (used by streaming
   expressions) Additionally, users are advised to use the 'InjectionDefense' class when constructing streaming
   expressions that include user supplied data to avoid risks similar to SQL injection. The legacy behavior of
-  expanding the 'expr' parameter can be reinstated with -DStreamingExpressionMacros=true passed to the JVM at startup.
+  expanding the 'expr' parameter can be reinstated with -DStreamingExpressionMacros=true passed to the JVM at startup
+  (Gus Heck).
 
 New Features
 ----------------------
+* SOLR-13131: Category Routed Aliases are now available for data driven assignment of documents to collections based on
+  values of a field. The Ref Guide now has a page dedicated to explaining the different types of aliases. (Gus Heck,
+  Moshe Bla)
 
 ==================  8.1.0 ==================
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
new file mode 100644
index 0000000..05cca40
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.OverseerSolrResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+
+import static org.apache.solr.cloud.api.collections.RoutedAlias.CREATE_COLLECTION_PREFIX;
+import static org.apache.solr.cloud.api.collections.RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+/**
+ * Common superclass for commands that maintain or manipulate aliases. In the routed alias parlance, "maintain"
+ * means, given the current state of the alias and some information from a routed field in a document that
+ * may imply a need for changes, create, delete or otherwise modify collections as required.
+ */
+abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
+
+  /**
+   * Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
+   * If the collection already exists then this is not an error.<p>
+   */
+  static NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
+                                    String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
+    // Map alias metadata starting with a prefix to a create-collection API request
+    final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
+    for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
+      if (e.getKey().startsWith(CREATE_COLLECTION_PREFIX)) {
+        createReqParams.set(e.getKey().substring(CREATE_COLLECTION_PREFIX.length()), e.getValue());
+      }
+    }
+    if (createReqParams.get(COLL_CONF) == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "We require an explicit " + COLL_CONF);
+    }
+    createReqParams.set(NAME, createCollName);
+    createReqParams.set("property." + ROUTED_ALIAS_NAME_CORE_PROP, aliasName);
+    // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
+    //   Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
+    final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
+        new LocalSolrQueryRequest(null, createReqParams),
+        null,
+        ocmh.overseer.getCoreContainer().getCollectionsHandler());
+    createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
+
+    NamedList results = new NamedList();
+    try {
+      // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
+      // note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
+      //   already have a lock on the alias name which should be sufficient.
+      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
+    } catch (SolrException e) {
+      // The collection might already exist, and that's okay -- we can adopt it.
+      if (!e.getMessage().contains("collection already exists")) {
+        throw e;
+      }
+    }
+
+    CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
+        new OverseerSolrResponse(results));
+    return results;
+  }
+
+  void updateAlias(String aliasName, ZkStateReader.AliasesManager aliasesManager, String createCollName) {
+    aliasesManager.applyModificationAndExportToZk(curAliases -> {
+      final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
+      if (curTargetCollections.contains(createCollName)) {
+        return curAliases;
+      } else {
+        List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
+        // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
+        newTargetCollections.add(createCollName);
+        newTargetCollections.addAll(curTargetCollections);
+        return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
+      }
+    });
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
new file mode 100644
index 0000000..7d73149
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.AddUpdateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
+
+public class CategoryRoutedAlias implements RoutedAlias {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String COLLECTION_INFIX = "__CRA__";
+
+  // This constant is terribly annoying but a great many things fall apart if we allow an alias with
+  // no collections to be created. So this kludge seems better than reworking every request path that
+  // expects a collection but also works with an alias to handle or error out on empty alias. The
+  // collection with this constant as a suffix is automatically removed after the alias begins to
+  // receive data.
+  public static final String UNINITIALIZED = "NEW_CATEGORY_ROUTED_ALIAS_WAITING_FOR_DATA__TEMP";
+
+  @SuppressWarnings("WeakerAccess")
+  public static final String ROUTER_MAX_CARDINALITY = "router.maxCardinality";
+
+  /**
+   * Parameters required for creating a category routed alias
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static final Set<String> REQUIRED_ROUTER_PARAMS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+      CommonParams.NAME,
+      ROUTER_TYPE_NAME,
+      ROUTER_FIELD,
+      ROUTER_MAX_CARDINALITY
+  )));
+
+  @SuppressWarnings("WeakerAccess")
+  public static final String ROUTER_MUST_MATCH = "router.mustMatch";
+
+  /**
+   * Optional parameters for creating a category routed alias excluding parameters for collection creation.
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static final Set<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+      ROUTER_MAX_CARDINALITY,
+      ROUTER_MUST_MATCH)));
+
+  private Aliases parsedAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
+  private final String aliasName;
+  private final Map<String, String> aliasMetadata;
+  private final Integer maxCardinality;
+  private final Pattern mustMatch;
+
+  CategoryRoutedAlias(String aliasName, Map<String, String> aliasMetadata) {
+    this.aliasName = aliasName;
+    this.aliasMetadata = aliasMetadata;
+    this.maxCardinality = parseMaxCardinality(aliasMetadata.get(ROUTER_MAX_CARDINALITY));
+    final String mustMatch = this.aliasMetadata.get(ROUTER_MUST_MATCH);
+    this.mustMatch = mustMatch == null? null: compileMustMatch(mustMatch);
+  }
+
+  @Override
+  public boolean updateParsedCollectionAliases(ZkController zkController) {
+    final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
+    if (this.parsedAliases != aliases) {
+      if (this.parsedAliases != null) {
+        log.debug("Observing possibly updated alias: {}", getAliasName());
+      }
+      // slightly inefficient, but not easy to make changes to the return value of parseCollections
+      this.parsedAliases = aliases;
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public String getAliasName() {
+    return aliasName;
+  }
+
+  @Override
+  public String getRouteField() {
+    return aliasMetadata.get(ROUTER_FIELD);
+  }
+
+  @Override
+  public void validateRouteValue(AddUpdateCommand cmd) throws SolrException {
+    if (this.parsedAliases == null) {
+      updateParsedCollectionAliases(cmd.getReq().getCore().getCoreContainer().getZkController());
+    }
+
+    Object fieldValue = cmd.getSolrInputDocument().getFieldValue(getRouteField());
+    // possible future enhancement: allow specification of an "unknown" category name to where we can send
+    // docs that are uncategorized.
+    if (fieldValue == null) {
+      throw new SolrException(BAD_REQUEST,"Route value is null");
+    }
+
+    String dataValue = String.valueOf(fieldValue);
+
+    String candidateCollectionName = buildCollectionNameFromValue(dataValue);
+    List<String> cols = getCollectionList(this.parsedAliases);
+
+    if (cols.contains(candidateCollectionName)) {
+      return;
+    }
+
+    // this check will become very important for future work
+    int infix = candidateCollectionName.indexOf(COLLECTION_INFIX);
+    int valueStart = infix + COLLECTION_INFIX.length();
+    if (candidateCollectionName.substring(valueStart).contains(COLLECTION_INFIX)) {
+      throw new SolrException(BAD_REQUEST, "No portion of the route value may resolve to the 7 character sequence " +
+          "__CRA__");
+    }
+
+    if (mustMatch != null && !mustMatch.matcher(dataValue).matches()) {
+      throw new SolrException(BAD_REQUEST, "Route value " + dataValue
+          + " does not match " + ROUTER_MUST_MATCH + ": " + mustMatch);
+    }
+
+    if (cols.stream()
+        .filter(x -> !x.contains(UNINITIALIZED)).count() >= maxCardinality) {
+      throw new SolrException(BAD_REQUEST, "Max cardinality " + maxCardinality
+          + " reached for Category Routed Alias: " + getAliasName());
+    }
+  }
+
+  /**
+   * Calculate a safe collection name from a data value. Any non-word character is
+   * replace with an underscore
+   *
+   * @param dataValue a value from the route field for a particular document
+   * @return the suffix value for it's corresponding collection name.
+   */
+  private String safeKeyValue(String dataValue) {
+    return dataValue.trim().replaceAll("\\W", "_");
+  }
+
+  String buildCollectionNameFromValue(String value) {
+    return aliasName + COLLECTION_INFIX + safeKeyValue(value);
+  }
+
+  /**
+   * Method to possibly create a collection. It's possible that the collection will already have been created
+   * either by a prior invocation in this thread or another thread. This method is idempotent, multiple invocations
+   * are harmless.
+   *
+   * @param cmd The command that might cause collection creation
+   * @return the collection to which the the update should be directed, possibly a newly created collection.
+   */
+  @Override
+  public String createCollectionsIfRequired(AddUpdateCommand cmd) {
+    SolrQueryRequest req = cmd.getReq();
+    SolrCore core = req.getCore();
+    CoreContainer coreContainer = core.getCoreContainer();
+    CollectionsHandler collectionsHandler = coreContainer.getCollectionsHandler();
+    String dataValue = String.valueOf(cmd.getSolrInputDocument().getFieldValue(getRouteField()));
+    String candidateCollectionName = buildCollectionNameFromValue(dataValue);
+
+    try {
+      // Note: CRA's have no way to predict values that determine collection so preemptive async creation
+      // is not possible. We have no choice but to block and wait (to do otherwise would imperil the overseer).
+      do {
+        if (getCollectionList(this.parsedAliases).contains(candidateCollectionName)) {
+          return candidateCollectionName;
+        } else {
+          // this could time out in which case we simply let it throw an error
+          MaintainCategoryRoutedAliasCmd.remoteInvoke(collectionsHandler, getAliasName(), candidateCollectionName);
+          // It's possible no collection was created because of a race and that's okay... we'll retry.
+
+          // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
+          //  not yet know about the new alias (thus won't see the newly added collection to it), and we might think
+          //  we failed.
+          collectionsHandler.getCoreContainer().getZkController().getZkStateReader().aliasesManager.update();
+
+          // we should see some sort of update to our aliases
+          if (!updateParsedCollectionAliases(coreContainer.getZkController())) { // thus we didn't make progress...
+            // this is not expected, even in known failure cases, but we check just in case
+            throw new SolrException(ErrorCode.SERVER_ERROR,
+                "We need to create a new category routed collection but for unknown reasons were unable to do so.");
+          }
+        }
+      } while (true);
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private Integer parseMaxCardinality(String maxCardinality) {
+    try {
+      return Integer.valueOf(maxCardinality);
+    } catch (NumberFormatException e) {
+      throw new SolrException(BAD_REQUEST, ROUTER_MAX_CARDINALITY + " must be a valid Integer"
+          + ", instead got: " + maxCardinality);
+    }
+  }
+
+  private Pattern compileMustMatch(String mustMatch) {
+    try {
+      return Pattern.compile(mustMatch);
+    } catch (PatternSyntaxException e) {
+      throw new SolrException(BAD_REQUEST, ROUTER_MUST_MATCH + " must be a valid regular"
+          + " expression, instead got: " + mustMatch);
+    }
+  }
+
+  private List<String> getCollectionList(Aliases p) {
+    return p.getCollectionAliasListMap().get(this.aliasName);
+  }
+
+  @Override
+  public String computeInitialCollectionName() {
+    return buildCollectionNameFromValue(UNINITIALIZED);
+  }
+
+  @Override
+  public Map<String, String> getAliasMetadata() {
+    return aliasMetadata;
+  }
+
+  @Override
+  public Set<String> getRequiredParams() {
+    return REQUIRED_ROUTER_PARAMS;
+  }
+
+  @Override
+  public Set<String> getOptionalParams() {
+    return OPTIONAL_ROUTER_PARAMS;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
index 7117019..641c4ad 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
@@ -17,19 +17,16 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Sets;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -37,18 +34,19 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.util.DateMathParser;
 
 import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 
-public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+public class CreateAliasCmd extends AliasCmd {
 
   private final OverseerCollectionMessageHandler ocmh;
 
   private static boolean anyRoutingParams(ZkNodeProps message) {
-    return message.keySet().stream().anyMatch(k -> k.startsWith(TimeRoutedAlias.ROUTER_PREFIX));
+    return message.keySet().stream().anyMatch(k -> k.startsWith(RoutedAlias.ROUTER_PREFIX));
   }
 
+  @SuppressWarnings("WeakerAccess")
   public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
   }
@@ -83,7 +81,7 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
   private void callCreatePlainAlias(ZkNodeProps message, String aliasName, ZkStateReader zkStateReader) {
     final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
     final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
-    validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
+    validateAllCollectionsExistAndNoDuplicates(canonicalCollectionList, zkStateReader);
     zkStateReader.aliasesManager
         .applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
   }
@@ -101,50 +99,46 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
         .collect(Collectors.toList());
   }
 
+  @SuppressWarnings("unchecked")
   private void callCreateRoutedAlias(ZkNodeProps message, String aliasName, ZkStateReader zkStateReader, ClusterState state) throws Exception {
-    // Validate we got everything we need
-    if (!message.getProperties().keySet().containsAll(TimeRoutedAlias.REQUIRED_ROUTER_PARAMS)) {
-      throw new SolrException(BAD_REQUEST, "A routed alias requires these params: " + TimeRoutedAlias.REQUIRED_ROUTER_PARAMS
+    // Validate we got a basic minimum
+    if (!message.getProperties().keySet().containsAll(RoutedAlias.MINIMAL_REQUIRED_PARAMS)) {
+      throw new SolrException(BAD_REQUEST, "A routed alias requires these params: " + RoutedAlias.MINIMAL_REQUIRED_PARAMS
       + " plus some create-collection prefixed ones.");
     }
 
-    Map<String, String> aliasProperties = new LinkedHashMap<>();
-    message.getProperties().entrySet().stream()
-        .filter(entry -> TimeRoutedAlias.PARAM_IS_PROP.test(entry.getKey()))
-        .forEach(entry -> aliasProperties.put(entry.getKey(), (String) entry.getValue())); // way easier than .collect
-
-    TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasProperties); // validates as well
-
-    String start = message.getStr(TimeRoutedAlias.ROUTER_START);
-    Instant startTime = parseStart(start, timeRoutedAlias.getTimeZone());
+    // convert values to strings
+    Map<String, String> props = new LinkedHashMap<>();
+    message.getProperties().forEach((key, value) -> props.put(key, String.valueOf(value)));
 
-    String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, startTime);
-
-    // Create the collection
-    MaintainRoutedAliasCmd.createCollectionAndWait(state, aliasName, aliasProperties, initialCollectionName, ocmh);
-    validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);
+    // Further validation happens here
+    RoutedAlias routedAlias = RoutedAlias.fromProps(aliasName, props);
+    if (routedAlias == null) {
+      // should never happen here, but keep static analysis in IDE's happy...
+      throw new SolrException(SERVER_ERROR,"Tried to create a routed alias with no type!");
+    }
 
-    // Create/update the alias
-    zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> aliases
-        .cloneWithCollectionAlias(aliasName, initialCollectionName)
-        .cloneWithCollectionAliasProperties(aliasName, aliasProperties));
-  }
+    if (!props.keySet().containsAll(routedAlias.getRequiredParams())) {
+      throw new SolrException(BAD_REQUEST, "Not all required params were supplied. Missing params: " +
+          StrUtils.join(Sets.difference(routedAlias.getRequiredParams(), props.keySet()), ','));
+    }
 
-  private Instant parseStart(String str, TimeZone zone) {
-    Instant start = DateMathParser.parseMath(new Date(), str, zone).toInstant();
-    checkMilis(start);
-    return start;
+    // Create the first collection.
+    String initialColl = routedAlias.computeInitialCollectionName();
+      ensureAliasCollection(aliasName, zkStateReader, state, routedAlias.getAliasMetadata(), initialColl);
+      // Create/update the alias
+      zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> aliases
+          .cloneWithCollectionAlias(aliasName, initialColl)
+          .cloneWithCollectionAliasProperties(aliasName, routedAlias.getAliasMetadata()));
   }
 
-  private void checkMilis(Instant date) {
-    if (!date.truncatedTo(ChronoUnit.SECONDS).equals(date)) {
-      throw new SolrException(BAD_REQUEST,
-          "Date or date math for start time includes milliseconds, which is not supported. " +
-              "(Hint: 'NOW' used without rounding always has this problem)");
-    }
+  private void ensureAliasCollection(String aliasName, ZkStateReader zkStateReader, ClusterState state, Map<String, String> aliasProperties, String initialCollectionName) throws Exception {
+    // Create the collection
+    createCollectionAndWait(state, aliasName, aliasProperties, initialCollectionName, ocmh);
+    validateAllCollectionsExistAndNoDuplicates(Collections.singletonList(initialCollectionName), zkStateReader);
   }
 
-  private void validateAllCollectionsExistAndNoDups(List<String> collectionList, ZkStateReader zkStateReader) {
+  private void validateAllCollectionsExistAndNoDuplicates(List<String> collectionList, ZkStateReader zkStateReader) {
     final String collectionStr = StrUtils.join(collectionList, ',');
 
     if (new HashSet<>(collectionList).size() != collectionList.size()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
new file mode 100644
index 0000000..99c7ad0
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.CategoryRoutedAlias.UNINITIALIZED;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @SuppressWarnings("WeakerAccess")
+  public static final String IF_CATEGORY_COLLECTION_NOT_FOUND = "ifCategoryCollectionNotFound";
+
+  private static NamedSimpleSemaphore DELETE_LOCK = new NamedSimpleSemaphore();
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  MaintainCategoryRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  /**
+   * Invokes this command from the client.  If there's a problem it will throw an exception.
+   * Please note that is important to never add async to this invocation. This method must
+   * block (up to the standard OCP timeout) to prevent large batches of add's from sending a message
+   * to the overseer for every document added in RoutedAliasUpdateProcessor.
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static void remoteInvoke(CollectionsHandler collHandler, String aliasName, String categoryCollection)
+      throws Exception {
+    final String operation = CollectionParams.CollectionAction.MAINTAINCATEGORYROUTEDALIAS.toLower();
+    Map<String, Object> msg = new HashMap<>();
+    msg.put(Overseer.QUEUE_OPERATION, operation);
+    msg.put(CollectionParams.NAME, aliasName);
+    msg.put(IF_CATEGORY_COLLECTION_NOT_FOUND, categoryCollection);
+    final SolrResponse rsp = collHandler.sendToOCPQueue(new ZkNodeProps(msg));
+    if (rsp.getException() != null) {
+      throw rsp.getException();
+    }
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    //---- PARSE PRIMARY MESSAGE PARAMS
+    // important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us
+    final String aliasName = message.getStr(NAME);
+    // the client believes this collection name should exist.  Our goal is to ensure it does.
+    final String categoryRequired = message.getStr(IF_CATEGORY_COLLECTION_NOT_FOUND); // optional
+
+
+    //---- PARSE ALIAS INFO FROM ZK
+    final ZkStateReader.AliasesManager aliasesManager = ocmh.zkStateReader.aliasesManager;
+    final Aliases aliases = aliasesManager.getAliases();
+    final Map<String, String> aliasMetadata = aliases.getCollectionAliasProperties(aliasName);
+    if (aliasMetadata == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Alias " + aliasName + " does not exist."); // if it did exist, we'd have a non-null map
+    }
+    final CategoryRoutedAlias categoryRoutedAlias = (CategoryRoutedAlias) RoutedAlias.fromProps(aliasName, aliasMetadata);
+
+    if (categoryRoutedAlias == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, getClass() + " got alias metadata with an " +
+          "invalid routing type and produced null");
+    }
+
+
+    //---- SEARCH FOR REQUESTED COLL
+    Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
+
+    // if we found it the collection already exists and we're done (concurrent creation on another request)
+    // so this if does not need an else.
+    if (!collectionAliasListMap.get(aliasName).contains(categoryRequired)) {
+      //---- DETECT and REMOVE the initial place holder collection if it still exists:
+
+      String initialCollection = categoryRoutedAlias.buildCollectionNameFromValue(UNINITIALIZED);
+
+      // important not to delete the place holder collection it until after a second collection exists,
+      // otherwise we have a situation where the alias has no collections briefly and concurrent
+      // requests to the alias will fail with internal errors (incl. queries etc).
+
+      List<String> colList = new ArrayList<>(collectionAliasListMap.get(aliasName));
+      if (colList.contains(initialCollection) && colList.size() > 1 ) {
+
+        // need to run the delete async, otherwise we may deadlock with incoming updates that are attempting
+        // to create collections (they will have called getCore() but may be waiting on the overseer alias lock
+        // we hold and we will be waiting for the Core reference count to reach zero). By deleting asynchronously
+        // we allow this request to complete and the alias lock to be released, which allows the update to complete
+        // so that we can do the delete. Additionally we don't want to cause multiple delete operations during
+        // the time the delete is in progress, since that just wastes overseer cycles.
+        // TODO: check TRA's are protected against this
+
+        if (DELETE_LOCK.tryAcquire(aliasName)) {
+          // note that the overseer might not have any cores (and the unit test occasionally catches this)
+          ocmh.overseer.getCoreContainer().runAsync(() -> {
+            aliasesManager.applyModificationAndExportToZk(curAliases -> {
+              colList.remove(initialCollection);
+              final String collectionsToKeepStr = StrUtils.join(colList, ',');
+              return curAliases.cloneWithCollectionAlias(aliasName, collectionsToKeepStr);
+            });
+            final CollectionsHandler collHandler = ocmh.overseer.getCoreContainer().getCollectionsHandler();
+            final SolrParams reqParams = CollectionAdminRequest
+                .deleteCollection(initialCollection).getParams();
+            SolrQueryResponse rsp = new SolrQueryResponse();
+            try {
+              collHandler.handleRequestBody(new LocalSolrQueryRequest(null, reqParams), rsp);
+            } catch (Exception e) {
+              log.error("Could not delete initial collection from CRA", e);
+            }
+            //noinspection unchecked
+            results.add(UNINITIALIZED, rsp.getValues());
+            DELETE_LOCK.release(aliasName);
+          });
+        }
+      }
+
+      //---- CREATE THE COLLECTION
+      NamedList createResults = createCollectionAndWait(state, aliasName, aliasMetadata,
+          categoryRequired, ocmh);
+      if (createResults != null) {
+        //noinspection unchecked
+        results.add("create", createResults);
+      }
+      //---- UPDATE THE ALIAS WITH NEW COLLECTION
+      updateAlias(aliasName, aliasesManager, categoryRequired);
+    }
+  }
+
+  private static class NamedSimpleSemaphore {
+
+    private final HashMap<String, Semaphore> semaphores = new HashMap<>();
+
+    NamedSimpleSemaphore() {
+    }
+
+    boolean tryAcquire(String name) {
+      return semaphores.computeIfAbsent(name, s -> new Semaphore(1)).tryAcquire();
+    }
+
+    public void release(String name) {
+      semaphores.get(name).release();
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
similarity index 70%
rename from solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
rename to solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
index e5c5de6..cb95d76 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
@@ -20,7 +20,6 @@ package org.apache.solr.cloud.api.collections;
 import java.lang.invoke.MethodHandles;
 import java.text.ParseException;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -31,14 +30,12 @@ import java.util.Map;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
@@ -49,9 +46,6 @@ import org.apache.solr.util.DateMathParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CREATE_COLLECTION_PREFIX;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 /**
@@ -64,14 +58,14 @@ import static org.apache.solr.common.params.CommonParams.NAME;
  * @since 7.3
  * @lucene.internal
  */
-public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+public class MaintainTimeRoutedAliasCmd extends AliasCmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName"; //TODO rename to createAfter
 
   private final OverseerCollectionMessageHandler ocmh;
 
-  public MaintainRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
+  public MaintainTimeRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
   }
 
@@ -79,15 +73,15 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
    * Invokes this command from the client.  If there's a problem it will throw an exception.
    * Please note that is important to never add async to this invocation. This method must
    * block (up to the standard OCP timeout) to prevent large batches of add's from sending a message
-   * to the overseer for every document added in TimeRoutedAliasUpdateProcessor.
+   * to the overseer for every document added in RoutedAliasUpdateProcessor.
    */
   public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName)
       throws Exception {
-    final String operation = CollectionParams.CollectionAction.MAINTAINROUTEDALIAS.toLower();
+    final String operation = CollectionParams.CollectionAction.MAINTAINTIMEROUTEDALIAS.toLower();
     Map<String, Object> msg = new HashMap<>();
     msg.put(Overseer.QUEUE_OPERATION, operation);
     msg.put(CollectionParams.NAME, aliasName);
-    msg.put(MaintainRoutedAliasCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
+    msg.put(MaintainTimeRoutedAliasCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
     final SolrResponse rsp = collHandler.sendToOCPQueue(new ZkNodeProps(msg));
     if (rsp.getException() != null) {
       throw rsp.getException();
@@ -110,12 +104,13 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
     final Aliases aliases = aliasesManager.getAliases();
     final Map<String, String> aliasMetadata = aliases.getCollectionAliasProperties(aliasName);
     if (aliasMetadata == null) {
-      throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Alias " + aliasName + " does not exist."); // if it did exist, we'd have a non-null map
     }
     final TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata);
 
     final List<Map.Entry<Instant, String>> parsedCollections =
-        timeRoutedAlias.parseCollections(aliases, () -> newAliasMustExistException(aliasName));
+        timeRoutedAlias.parseCollections(aliases);
 
     //---- GET MOST RECENT COLL
     final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
@@ -157,18 +152,7 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
     }
 
     //---- UPDATE THE ALIAS WITH NEW COLLECTION
-    aliasesManager.applyModificationAndExportToZk(curAliases -> {
-      final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
-      if (curTargetCollections.contains(createCollName)) {
-        return curAliases;
-      } else {
-        List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
-        // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
-        newTargetCollections.add(createCollName);
-        newTargetCollections.addAll(curTargetCollections);
-        return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
-      }
-    });
+    updateAlias(aliasName, aliasesManager, createCollName);
 
   }
 
@@ -200,7 +184,7 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
       // note: we could re-parse the TimeRoutedAlias object from curAliases but I don't think there's a point to it.
 
       final List<Map.Entry<Instant, String>> parsedCollections =
-          timeRoutedAlias.parseCollections(curAliases, () -> newAliasMustExistException(aliasName));
+          timeRoutedAlias.parseCollections(curAliases);
 
       //iterating from newest to oldest, find the first collection that has a time <= "before".  We keep this collection
       // (and all newer to left) but we delete older collections, which are the ones that follow.
@@ -251,55 +235,4 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
     return results;
   }
 
-  /**
-   * Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
-   * If the collection already exists then this is not an error.
-   * IMPORTANT: Only call this from an {@link OverseerCollectionMessageHandler.Cmd}.
-   */
-  static NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
-                                           String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
-    // Map alias metadata starting with a prefix to a create-collection API request
-    final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
-    for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
-      if (e.getKey().startsWith(CREATE_COLLECTION_PREFIX)) {
-        createReqParams.set(e.getKey().substring(CREATE_COLLECTION_PREFIX.length()), e.getValue());
-      }
-    }
-    if (createReqParams.get(COLL_CONF) == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "We require an explicit " + COLL_CONF );
-    }
-    createReqParams.set(NAME, createCollName);
-    createReqParams.set("property." + ROUTED_ALIAS_NAME_CORE_PROP, aliasName);
-    // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
-    //   Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
-    final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
-        new LocalSolrQueryRequest(null, createReqParams),
-        null,
-        ocmh.overseer.getCoreContainer().getCollectionsHandler());
-    createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
-
-    NamedList results = new NamedList();
-    try {
-      // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
-      // note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
-      //   already have a lock on the alias name which should be sufficient.
-      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
-    } catch (SolrException e) {
-      // The collection might already exist, and that's okay -- we can adopt it.
-      if (!e.getMessage().contains("collection already exists")) {
-        throw e;
-      }
-    }
-
-    CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
-        new OverseerSolrResponse(results));
-    return results;
-  }
-
-  private SolrException newAliasMustExistException(String aliasName) {
-    return new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-        "Alias " + aliasName + " does not exist.");
-  }
-
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index c3e53c6..de7b3eb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -16,58 +16,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
-import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.UTILIZENODE;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
-
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -84,6 +32,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -143,7 +92,25 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
 
 /**
  * A {@link OverseerMessageHandler} that handles Collections API related
@@ -267,7 +234,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         .put(CREATEALIAS, new CreateAliasCmd(this))
         .put(DELETEALIAS, new DeleteAliasCmd(this))
         .put(ALIASPROP, new SetAliasPropCmd(this))
-        .put(MAINTAINROUTEDALIAS, new MaintainRoutedAliasCmd(this))
+        .put(MAINTAINTIMEROUTEDALIAS, new MaintainTimeRoutedAliasCmd(this))
+        .put(MAINTAINCATEGORYROUTEDALIAS, new MaintainCategoryRoutedAliasCmd(this))
         .put(OVERSEERSTATUS, new OverseerStatusCmd(this))
         .put(DELETESHARD, new DeleteShardCmd(this))
         .put(DELETEREPLICA, new DeleteReplicaCmd(this))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
new file mode 100644
index 0000000..8bb95cc
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.update.AddUpdateCommand;
+
+import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+
+public interface RoutedAlias {
+
+  /**
+   * Types supported. Every entry here must have a case in the switch statement in {@link #fromProps(String, Map)}
+   */
+  enum SupportedRouterTypes {
+    TIME,
+    CATEGORY
+  }
+
+  String ROUTER_PREFIX = "router.";
+  String ROUTER_TYPE_NAME = ROUTER_PREFIX + "name";
+  String ROUTER_FIELD = ROUTER_PREFIX + "field";
+  String CREATE_COLLECTION_PREFIX = "create-collection.";
+  Set<String> MINIMAL_REQUIRED_PARAMS = Sets.newHashSet(ROUTER_TYPE_NAME, ROUTER_FIELD);
+  String ROUTED_ALIAS_NAME_CORE_PROP = "routedAliasName"; // core prop
+
+  static SolrException newAliasMustExistException(String aliasName) {
+    throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+        "Routed alias " + aliasName + " appears to have been removed during request processing.");
+  }
+
+  /**
+   * Factory method for implementations of this interface. There should be no reason to construct instances
+   * elsewhere, and routed alias types are encouraged to have package private constructors.
+   *
+   * @param aliasName The alias name (will be returned by {@link #getAliasName()}
+   * @param props     The properties from an overseer message.
+   * @return An implementation appropriate for the supplied properties, or null if no type is specified.
+   * @throws SolrException If the properties are invalid or the router type is unknown.
+   */
+  static RoutedAlias fromProps(String aliasName, Map<String, String> props) throws SolrException {
+
+    String typeStr = props.get(ROUTER_TYPE_NAME);
+    if (typeStr == null) {
+      return null; // non-routed aliases are being created
+    }
+    SupportedRouterTypes routerType;
+    try {
+       routerType = SupportedRouterTypes.valueOf(typeStr.toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException e) {
+      throw new SolrException(BAD_REQUEST, "Router name: " + typeStr + " is not in supported types, "
+          + Arrays.asList(SupportedRouterTypes.values()));
+    }
+    switch (routerType) {
+      case TIME:
+        return new TimeRoutedAlias(aliasName, props);
+      case CATEGORY:
+        return new CategoryRoutedAlias(aliasName, props);
+      default:
+        // if we got a type not handled by the switch there's been a bogus implementation.
+        throw new SolrException(SERVER_ERROR, "Router " + routerType + " is not fully implemented. If you see this" +
+            "error in an official release please file a bug report. Available types were:"
+            + Arrays.asList(SupportedRouterTypes.values()));
+
+    }
+  }
+
+  /**
+   * Ensure our parsed version of the alias collection list is up to date. If it was modified, return true.
+   * Note that this will return true if some other alias was modified or if properties were modified. These
+   * are spurious and the caller should be written to be tolerant of no material changes.
+   */
+  boolean updateParsedCollectionAliases(ZkController zkController);
+
+  /**
+   * Create the initial collection for this RoutedAlias if applicable.
+   *
+   * Routed Aliases do not aggregate existing collections, instead they create collections on the fly. If the initial
+   * collection can be determined from initialization parameters it should be calculated here.
+   *
+   * @return optional string of initial collection name
+   */
+  String computeInitialCollectionName();
+
+
+  /**
+   * The name of the alias. This name is used in place of a collection name for both queries and updates.
+   *
+   * @return The name of the Alias.
+   */
+  String getAliasName();
+
+  String getRouteField();
+
+
+
+  /**
+   * Check that the value we will be routing on is legal for this type of routed alias.
+   *
+   * @param cmd the command containing the document
+   */
+  void validateRouteValue(AddUpdateCommand cmd) throws SolrException;
+
+  /**
+   * Create any required collections and return the name of the collection to which the current document should be sent.
+   *
+   * @param cmd The command that might cause collection creation
+   * @return The name of the proper destination collection for the document which may or may not be a
+   * newly created collection
+   */
+  String createCollectionsIfRequired(AddUpdateCommand cmd);
+
+  /**
+   * @return get alias related metadata
+   */
+  Map<String, String> getAliasMetadata();
+
+  Set<String> getRequiredParams();
+
+  Set<String> getOptionalParams();
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
index 2da81da..94a4a84 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
@@ -17,34 +17,49 @@
 
 package org.apache.solr.cloud.api.collections;
 
+import java.lang.invoke.MethodHandles;
 import java.text.ParseException;
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.temporal.ChronoField;
+import java.time.temporal.ChronoUnit;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
 
 import com.google.common.base.MoreObjects;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.RequiredSolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.processor.RoutedAliasUpdateProcessor;
 import org.apache.solr.util.DateMathParser;
 import org.apache.solr.util.TimeZoneUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CreationType.ASYNC_PREEMPTIVE;
+import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CreationType.NONE;
+import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CreationType.SYNCHRONOUS;
 import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.params.CommonParams.TZ;
 
@@ -52,49 +67,50 @@ import static org.apache.solr.common.params.CommonParams.TZ;
  * Holds configuration for a routed alias, and some common code and constants.
  *
  * @see CreateAliasCmd
- * @see MaintainRoutedAliasCmd
- * @see org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor
+ * @see MaintainTimeRoutedAliasCmd
+ * @see RoutedAliasUpdateProcessor
  */
-public class TimeRoutedAlias {
+public class TimeRoutedAlias implements RoutedAlias {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // This class is created once per request and the overseer methods prevent duplicate create requests
+  // from creating extra copies. All we need to track here is that we don't spam preemptive creates to
+  // the overseer multiple times from *this* request.
+  private volatile boolean preemptiveCreateOnceAlready = false;
+
+  // These two fields may be updated within the calling thread during processing but should
+  // never be updated by any async creation thread.
+  private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection.  Sorted descending
+  private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
 
   // These are parameter names to routed alias creation, AND are stored as metadata with the alias.
-  public static final String ROUTER_PREFIX = "router.";
-  public static final String ROUTER_TYPE_NAME = ROUTER_PREFIX + "name";
-  public static final String ROUTER_FIELD = ROUTER_PREFIX + "field";
   public static final String ROUTER_START = ROUTER_PREFIX + "start";
   public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval";
   public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "maxFutureMs";
-  public static final String ROUTER_PREEMPTIVE_CREATE_MATH = ROUTER_PREFIX + "preemptiveCreateMath";
   public static final String ROUTER_AUTO_DELETE_AGE = ROUTER_PREFIX + "autoDeleteAge";
-  public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
+  public static final String ROUTER_PREEMPTIVE_CREATE_MATH = ROUTER_PREFIX + "preemptiveCreateMath";
   // plus TZ and NAME
 
   /**
    * Parameters required for creating a routed alias
    */
-  public static final List<String> REQUIRED_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
+  public static final Set<String> REQUIRED_ROUTER_PARAMS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
       CommonParams.NAME,
       ROUTER_TYPE_NAME,
       ROUTER_FIELD,
       ROUTER_START,
-      ROUTER_INTERVAL));
+      ROUTER_INTERVAL)));
 
   /**
    * Optional parameters for creating a routed alias excluding parameters for collection creation.
    */
   //TODO lets find a way to remove this as it's harder to maintain than required list
-  public static final List<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
+  public static final Set<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
       ROUTER_MAX_FUTURE,
       ROUTER_AUTO_DELETE_AGE,
       ROUTER_PREEMPTIVE_CREATE_MATH,
-      TZ)); // kinda special
+      TZ))); // kinda special
 
-  static Predicate<String> PARAM_IS_PROP =
-      key -> key.equals(TZ) ||
-          (key.startsWith(ROUTER_PREFIX) && !key.equals(ROUTER_START)) || //TODO reconsider START special case
-          key.startsWith(CREATE_COLLECTION_PREFIX);
-
-  public static final String ROUTED_ALIAS_NAME_CORE_PROP = "routedAliasName"; // core prop
 
   // This format must be compatible with collection name limitations
   private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
@@ -104,38 +120,32 @@ public class TimeRoutedAlias {
       .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
       .toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC); // deliberate -- collection names disregard TZ
 
-  public static Instant parseInstantFromCollectionName(String aliasName, String collection) {
-    final String dateTimePart = collection.substring(aliasName.length() + 1);
-    return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
-  }
-
-  public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) {
-    String nextCollName = DATE_TIME_FORMATTER.format(timestamp);
-    for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours
-      if (nextCollName.endsWith("_00")) {
-        nextCollName = nextCollName.substring(0, nextCollName.length()-3);
-      }
-    }
-    assert DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp);
-    return aliasName + "_" + nextCollName;
-  }
-
-
   //
   // Instance data and methods
   //
 
   private final String aliasName;
+  private final Map<String, String> aliasMetadata;
   private final String routeField;
   private final String intervalMath; // ex: +1DAY
   private final long maxFutureMs;
   private final String preemptiveCreateMath;
   private final String autoDeleteAgeMath; // ex: /DAY-30DAYS  *optional*
   private final TimeZone timeZone;
+  private String start;
 
-  public TimeRoutedAlias(String aliasName, Map<String, String> aliasMetadata) {
+  TimeRoutedAlias(String aliasName, Map<String, String> aliasMetadata) throws SolrException {
+    // Validate we got everything we need
+    if (!aliasMetadata.keySet().containsAll(TimeRoutedAlias.REQUIRED_ROUTER_PARAMS)) {
+      throw new SolrException(BAD_REQUEST, "A time routed alias requires these params: " + TimeRoutedAlias.REQUIRED_ROUTER_PARAMS
+          + " plus some create-collection prefixed ones.");
+    }
+
+    this.aliasMetadata = aliasMetadata;
+
+    this.start = this.aliasMetadata.get(ROUTER_START);
     this.aliasName = aliasName;
-    final MapSolrParams params = new MapSolrParams(aliasMetadata); // for convenience
+    final MapSolrParams params = new MapSolrParams(this.aliasMetadata); // for convenience
     final RequiredSolrParams required = params.required();
     if (!"time".equals(required.get(ROUTER_TYPE_NAME))) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only 'time' routed aliases is supported right now.");
@@ -149,7 +159,7 @@ public class TimeRoutedAlias {
     String pcmTmp = params.get(ROUTER_PREEMPTIVE_CREATE_MATH);
     preemptiveCreateMath = pcmTmp != null ? (pcmTmp.startsWith("-") ? pcmTmp : "-" + pcmTmp) : null;
     autoDeleteAgeMath = params.get(ROUTER_AUTO_DELETE_AGE); // no default
-    timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
+    timeZone = TimeZoneUtils.parseTimezone(this.aliasMetadata.get(CommonParams.TZ));
 
     // More validation:
 
@@ -187,10 +197,61 @@ public class TimeRoutedAlias {
     }
   }
 
+  @Override
+  public String computeInitialCollectionName() {
+    return formatCollectionNameFromInstant(aliasName, parseStringAsInstant(this.start, timeZone));
+  }
+
+  public static Instant parseInstantFromCollectionName(String aliasName, String collection) {
+    final String dateTimePart = collection.substring(aliasName.length() + 1);
+    return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
+  }
+
+  public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) {
+    String nextCollName = DATE_TIME_FORMATTER.format(timestamp);
+    for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours
+      if (nextCollName.endsWith("_00")) {
+        nextCollName = nextCollName.substring(0, nextCollName.length()-3);
+      }
+    }
+    assert DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp);
+    return aliasName + "_" + nextCollName;
+  }
+
+  Instant parseStringAsInstant(String str, TimeZone zone) {
+    Instant start = DateMathParser.parseMath(new Date(), str, zone).toInstant();
+    checkMilis(start);
+    return start;
+  }
+
+  private void checkMilis(Instant date) {
+    if (!date.truncatedTo(ChronoUnit.SECONDS).equals(date)) {
+      throw new SolrException(BAD_REQUEST,
+          "Date or date math for start time includes milliseconds, which is not supported. " +
+              "(Hint: 'NOW' used without rounding always has this problem)");
+    }
+  }
+
+  @Override
+  public boolean updateParsedCollectionAliases(ZkController zkController) {
+    final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
+    if (this.parsedCollectionsAliases != aliases) {
+      if (this.parsedCollectionsAliases != null) {
+        log.debug("Observing possibly updated alias: {}", getAliasName());
+      }
+      this.parsedCollectionsDesc = parseCollections(aliases );
+      this.parsedCollectionsAliases = aliases;
+      return true;
+    }
+    return false;
+  }
+
+  @Override
   public String getAliasName() {
     return aliasName;
   }
 
+  @Override
   public String getRouteField() {
     return routeField;
   }
@@ -227,12 +288,13 @@ public class TimeRoutedAlias {
         .add("timeZone", timeZone)
         .toString();
   }
-
-  /** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
-  public List<Map.Entry<Instant,String>> parseCollections(Aliases aliases, Supplier<SolrException> aliasNotExist) {
+  /**
+   * Parses the elements of the collection list. Result is returned them in sorted order (most recent 1st)
+   */
+  List<Map.Entry<Instant,String>> parseCollections(Aliases aliases) {
     final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
     if (collections == null) {
-      throw aliasNotExist.get();
+      throw RoutedAlias.newAliasMustExistException(getAliasName());
     }
     // note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later
     List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
@@ -251,4 +313,247 @@ public class TimeRoutedAlias {
     assert nextCollTimestamp.isAfter(fromTimestamp);
     return nextCollTimestamp;
   }
+
+  @Override
+  public void validateRouteValue(AddUpdateCommand cmd) throws SolrException {
+    final Instant docTimestamp =
+        parseRouteKey(cmd.getSolrInputDocument().getFieldValue(getRouteField()));
+
+    // FUTURE: maybe in some cases the user would want to ignore/warn instead?
+    if (docTimestamp.isAfter(Instant.now().plusMillis(getMaxFutureMs()))) {
+      throw new SolrException(BAD_REQUEST,
+          "The document's time routed key of " + docTimestamp + " is too far in the future given " +
+              ROUTER_MAX_FUTURE + "=" + getMaxFutureMs());
+    }
+  }
+
+  @Override
+  public String createCollectionsIfRequired(AddUpdateCommand cmd) {
+    SolrQueryRequest req = cmd.getReq();
+    SolrCore core = req.getCore();
+    CoreContainer coreContainer = core.getCoreContainer();
+    CollectionsHandler collectionsHandler = coreContainer.getCollectionsHandler();
+    final Instant docTimestamp =
+        parseRouteKey(cmd.getSolrInputDocument().getFieldValue(getRouteField()));
+
+    // Even though it is possible that multiple requests hit this code in the 1-2 sec that
+    // it takes to create a collection, it's an established anti-pattern to feed data with a very large number
+    // of client connections. This in mind, we only guard against spamming the overseer within a batch of
+    // updates. We are intentionally tolerating a low level of redundant requests in favor of simpler code. Most
+    // super-sized installations with many update clients will likely be multi-tenant and multiple tenants
+    // probably don't write to the same alias. As such, we have deferred any solution to the "many clients causing
+    // collection creation simultaneously" problem until such time as someone actually has that problem in a
+    // real world use case that isn't just an anti-pattern.
+    Map.Entry<Instant, String> candidateCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
+    String candidateCollectionName = candidateCollectionDesc.getValue();
+
+    try {
+      switch (typeOfCreationRequired(docTimestamp, candidateCollectionDesc.getKey())) {
+        case SYNCHRONOUS:
+          // This next line blocks until all collections required by the current document have been created
+          return createAllRequiredCollections(docTimestamp, cmd, candidateCollectionDesc);
+        case ASYNC_PREEMPTIVE:
+          if (!preemptiveCreateOnceAlready) {
+            log.debug("Executing preemptive creation for {}", getAliasName());
+            // It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
+            // in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
+            // and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
+            // collections that existed when candidateCollectionDesc was created. If this class updates it's notion of
+            // parsedCollectionsDesc since candidateCollectionDesc was chosen, we could create collection n+2
+            // instead of collection n+1.
+            String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+
+            // This line does not block and the document can be added immediately
+            preemptiveAsync(() -> createNextCollection(mostRecentCollName, collectionsHandler), core);
+          }
+          return candidateCollectionName;
+        case NONE:
+          return candidateCollectionName; // could use fall through, but fall through is fiddly for later editors.
+        default:
+          throw unknownCreateType();
+      }
+      // do nothing if creationType == NONE
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  @Override
+  public Map<String, String> getAliasMetadata() {
+    return aliasMetadata;
+  }
+
+  @Override
+  public Set<String> getRequiredParams() {
+    return REQUIRED_ROUTER_PARAMS;
+  }
+
+  @Override
+  public Set<String> getOptionalParams() {
+    return OPTIONAL_ROUTER_PARAMS;
+  }
+
+  /**
+   * Create as many collections as required. This method loops to allow for the possibility that the docTimestamp
+   * requires more than one collection to be created. Since multiple threads may be invoking maintain on separate
+   * requests to the same alias, we must pass in the name of the collection that this thread believes to be the most
+   * recent collection. This assumption is checked when the command is executed in the overseer. When this method
+   * finds that all collections required have been created it returns the (possibly new) most recent collection.
+   * The return value is ignored by the calling code in the async preemptive case.
+   *
+   * @param docTimestamp the timestamp from the document that determines routing
+   * @param cmd the update command being processed
+   * @param targetCollectionDesc the descriptor for the presently selected collection which should also be
+   *                             the most recent collection in all cases where this method is invoked.
+   * @return The latest collection, including collections created during maintenance
+   */
+  private String createAllRequiredCollections( Instant docTimestamp, AddUpdateCommand cmd,
+                                               Map.Entry<Instant, String> targetCollectionDesc) {
+    SolrQueryRequest req = cmd.getReq();
+    SolrCore core = req.getCore();
+    CoreContainer coreContainer = core.getCoreContainer();
+    CollectionsHandler collectionsHandler = coreContainer.getCollectionsHandler();
+    do {
+      switch(typeOfCreationRequired(docTimestamp, targetCollectionDesc.getKey())) {
+        case NONE:
+          return targetCollectionDesc.getValue(); // we don't need another collection
+        case ASYNC_PREEMPTIVE:
+          // can happen when preemptive interval is longer than one time slice
+          String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+          preemptiveAsync(() -> createNextCollection(mostRecentCollName, collectionsHandler), core);
+          return targetCollectionDesc.getValue();
+        case SYNCHRONOUS:
+          createNextCollection(targetCollectionDesc.getValue(), collectionsHandler); // *should* throw if fails for some reason but...
+          ZkController zkController = coreContainer.getZkController();
+          if (!updateParsedCollectionAliases(zkController)) { // thus we didn't make progress...
+            // this is not expected, even in known failure cases, but we check just in case
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "We need to create a new time routed collection but for unknown reasons were unable to do so.");
+          }
+          // then retry the loop ... have to do find again in case other requests also added collections
+          // that were made visible when we called updateParsedCollectionAliases()
+          targetCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
+          break;
+        default:
+          throw unknownCreateType();
+
+      }
+    } while (true);
+  }
+
+  private SolrException unknownCreateType() {
+    return new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown creation type while adding " +
+        "document to a Time Routed Alias! This is a bug caused when a creation type has been added but " +
+        "not all code has been updated to handle it.");
+  }
+
+  private void createNextCollection(String mostRecentCollName, CollectionsHandler collHandler) {
+    // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name).  It will create the collection
+    //   and update the alias contingent on the most recent collection name being the same as
+    //   what we think so here, otherwise it will return (without error).
+    try {
+      MaintainTimeRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
+      // we don't care about the response.  It's possible no collection was created because
+      //  of a race and that's okay... we'll ultimately retry any way.
+
+      // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
+      //  not yet know about the new alias (thus won't see the newly added collection to it), and we might think
+      //  we failed.
+      collHandler.getCoreContainer().getZkController().getZkStateReader().aliasesManager.update();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private void preemptiveAsync(Runnable r, SolrCore core) {
+    preemptiveCreateOnceAlready = true;
+    core.runAsync(r);
+  }
+
+  /**
+   * Determine if the a new collection will be required based on the document timestamp. Passing null for
+   * preemptiveCreateInterval tells you if the document is beyond all existing collections with a response of
+   * {@link CreationType#NONE} or {@link CreationType#SYNCHRONOUS}, and passing a valid date math for
+   * preemptiveCreateMath additionally distinguishes the case where the document is close enough to the end of
+   * the TRA to trigger preemptive creation but not beyond all existing collections with a value of
+   * {@link CreationType#ASYNC_PREEMPTIVE}.
+   *
+   * @param docTimeStamp The timestamp from the document
+   * @param targetCollectionTimestamp The timestamp for the presently selected destination collection
+   * @return a {@code CreationType} indicating if and how to create a collection
+   */
+  private CreationType typeOfCreationRequired(Instant docTimeStamp, Instant targetCollectionTimestamp) {
+    final Instant nextCollTimestamp = computeNextCollTimestamp(targetCollectionTimestamp);
+
+    if (!docTimeStamp.isBefore(nextCollTimestamp)) {
+      // current document is destined for a collection that doesn't exist, must create the destination
+      // to proceed with this add command
+      return SYNCHRONOUS;
+    }
+
+    if (isNotBlank(getPreemptiveCreateWindow())) {
+      Instant preemptNextColCreateTime =
+          calcPreemptNextColCreateTime(getPreemptiveCreateWindow(), nextCollTimestamp);
+      if (!docTimeStamp.isBefore(preemptNextColCreateTime)) {
+        return ASYNC_PREEMPTIVE;
+      }
+    }
+
+    return NONE;
+  }
+
+  private Instant calcPreemptNextColCreateTime(String preemptiveCreateMath, Instant nextCollTimestamp) {
+    DateMathParser dateMathParser = new DateMathParser();
+    dateMathParser.setNow(Date.from(nextCollTimestamp));
+    try {
+      return dateMathParser.parseMath(preemptiveCreateMath).toInstant();
+    } catch (ParseException e) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Invalid Preemptive Create Window Math:'" + preemptiveCreateMath + '\'', e);
+    }
+  }
+
+  private Instant parseRouteKey(Object routeKey) {
+    final Instant docTimestamp;
+    if (routeKey instanceof Instant) {
+      docTimestamp = (Instant) routeKey;
+    } else if (routeKey instanceof Date) {
+      docTimestamp = ((Date)routeKey).toInstant();
+    } else if (routeKey instanceof CharSequence) {
+      docTimestamp = Instant.parse((CharSequence)routeKey);
+    } else {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
+    }
+    return docTimestamp;
+  }
+  /**
+   * Given the route key, finds the correct collection or returns the most recent collection if the doc
+   * is in the future. Future docs will potentially cause creation of a collection that does not yet exist
+   * or an error if they exceed the maxFutureMs setting.
+   *
+   * @throws SolrException if the doc is too old to be stored in the TRA
+   */
+  private Map.Entry<Instant, String> findCandidateGivenTimestamp(Instant docTimestamp, String printableId) {
+    // Lookup targetCollection given route key.  Iterates in reverse chronological order.
+    //    We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
+    for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
+      Instant colStartTime = entry.getKey();
+      if (!docTimestamp.isBefore(colStartTime)) {  // i.e. docTimeStamp is >= the colStartTime
+        return entry; //found it
+      }
+    }
+    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+        "Doc " + printableId + " couldn't be routed with " + getRouteField() + "=" + docTimestamp);
+  }
+
+  enum CreationType {
+    NONE,
+    ASYNC_PREEMPTIVE,
+    SYNCHRONOUS
+  }
+
 }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8c5e227..410e26e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,21 +16,6 @@
  */
 package org.apache.solr.core;
 
-import static java.util.Objects.requireNonNull;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_HISTORY_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
-import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -53,6 +38,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.http.auth.AuthSchemeProvider;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.config.Lookup;
@@ -131,8 +118,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
+import static java.util.Objects.requireNonNull;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_HISTORY_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
+import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
 
 /**
  *
@@ -227,6 +226,8 @@ public class CoreContainer {
 
   protected volatile AutoScalingHandler autoScalingHandler;
 
+  private ExecutorService coreContainerAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Container Async Task");
+
   private enum CoreInitFailedAction { fromleader, none }
 
   /**
@@ -420,7 +421,7 @@ public class CoreContainer {
       SolrHttpClientContextBuilder httpClientBuilder = new SolrHttpClientContextBuilder();
       if (builder.getCredentialsProviderProvider() != null) {
         httpClientBuilder.setDefaultCredentialsProvider(new CredentialsProviderProvider() {
-          
+
           @Override
           public CredentialsProvider getCredentialsProvider() {
             return builder.getCredentialsProviderProvider().getCredentialsProvider();
@@ -835,9 +836,9 @@ public class CoreContainer {
   }
 
   public void shutdown() {
-    log.info("Shutting down CoreContainer instance="
-        + System.identityHashCode(this));
+    log.info("Shutting down CoreContainer instance=" + System.identityHashCode(this));
 
+    ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor);
     ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
 
     isShutDown = true;
@@ -867,7 +868,7 @@ public class CoreContainer {
       }
 
       ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
-      
+
       // First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
       synchronized (solrCores.getModifyLock()) {
         solrCores.getModifyLock().notifyAll(); // wake up anyone waiting
@@ -899,7 +900,7 @@ public class CoreContainer {
       synchronized (solrCores.getModifyLock()) {
         solrCores.getModifyLock().notifyAll(); // wake up the thread
       }
-      
+
       customThreadPool.submit(() -> {
         replayUpdatesExecutor.shutdownAndAwaitTermination();
       });
@@ -1107,8 +1108,8 @@ public class CoreContainer {
 
       return core;
     } catch (Exception ex) {
-      // First clean up any core descriptor, there should never be an existing core.properties file for any core that 
-      // failed to be created on-the-fly. 
+      // First clean up any core descriptor, there should never be an existing core.properties file for any core that
+      // failed to be created on-the-fly.
       coresLocator.delete(this, cd);
       if (isZooKeeperAware() && !preExisitingZkEntry) {
         try {
@@ -1240,7 +1241,7 @@ public class CoreContainer {
   private ConfigSet getConfigSet(CoreDescriptor cd) {
     return coreConfigService.getConfig(cd);
   }
-  
+
   /**
    * Take action when we failed to create a SolrCore. If error is due to corrupt index, try to recover. Various recovery
    * strategies can be specified via system properties "-DCoreInitFailedAction={fromleader, none}"
@@ -1266,13 +1267,13 @@ public class CoreContainer {
         break;
       }
     }
-    
+
     // If no CorruptIndexException, nothing we can try here
     if (cause == null) throw original;
-    
+
     CoreInitFailedAction action = CoreInitFailedAction.valueOf(System.getProperty(CoreInitFailedAction.class.getSimpleName(), "none"));
     log.debug("CorruptIndexException while creating core, will attempt to repair via {}", action);
-    
+
     switch (action) {
       case fromleader: // Recovery from leader on a CorruptedIndexException
         if (isZooKeeperAware()) {
@@ -1372,13 +1373,13 @@ public class CoreContainer {
   }
 
   /**
-   * Returns an immutable Map of Exceptions that occurred when initializing 
-   * SolrCores (either at startup, or do to runtime requests to create cores) 
-   * keyed off of the name (String) of the SolrCore that had the Exception 
+   * Returns an immutable Map of Exceptions that occurred when initializing
+   * SolrCores (either at startup, or do to runtime requests to create cores)
+   * keyed off of the name (String) of the SolrCore that had the Exception
    * during initialization.
    * <p>
-   * While the Map returned by this method is immutable and will not change 
-   * once returned to the client, the source data used to generate this Map 
+   * While the Map returned by this method is immutable and will not change
+   * once returned to the client, the source data used to generate this Map
    * can be changed as various SolrCore operations are performed:
    * </p>
    * <ul>
@@ -1429,7 +1430,7 @@ public class CoreContainer {
    * Recreates a SolrCore.
    * While the new core is loading, requests will continue to be dispatched to
    * and processed by the old core
-   * 
+   *
    * @param name the name of the SolrCore to reload
    */
   public void reload(String name) {
@@ -1555,7 +1556,7 @@ public class CoreContainer {
   public void unload(String name, boolean deleteIndexDir, boolean deleteDataDir, boolean deleteInstanceDir) {
 
     CoreDescriptor cd = solrCores.getCoreDescriptor(name);
-    
+
     if (name != null) {
       // check for core-init errors first
       CoreLoadFailure loadFailure = coreInitFailures.remove(name);
@@ -1572,7 +1573,7 @@ public class CoreContainer {
         return;
       }
     }
-      
+
     if (cd == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot unload non-existent core [" + name + "]");
     }
@@ -1600,7 +1601,7 @@ public class CoreContainer {
         zkSys.getZkController().stopReplicationFromLeader(name);
       }
     }
-    
+
     core.unloadOnClose(cd, deleteIndexDir, deleteDataDir, deleteInstanceDir);
     if (close)
       core.closeAndWait();
@@ -1715,17 +1716,17 @@ public class CoreContainer {
   public BlobRepository getBlobRepository(){
     return blobRepository;
   }
-  
+
   /**
    * If using asyncSolrCoreLoad=true, calling this after {@link #load()} will
    * not return until all cores have finished loading.
-   * 
+   *
    * @param timeoutMs timeout, upon which method simply returns
    */
   public void waitForLoadingCoresToFinish(long timeoutMs) {
     solrCores.waitForLoadingCoresToFinish(timeoutMs);
   }
-  
+
   public void waitForLoadingCore(String name, long timeoutMs) {
     solrCores.waitForLoadingCoreToFinish(name, timeoutMs);
   }
@@ -1810,11 +1811,11 @@ public class CoreContainer {
   public boolean isZooKeeperAware() {
     return zkSys.getZkController() != null;
   }
-  
+
   public ZkController getZkController() {
     return zkSys.getZkController();
   }
-  
+
   public NodeConfig getConfig() {
     return cfg;
   }
@@ -1823,7 +1824,7 @@ public class CoreContainer {
   public ShardHandlerFactory getShardHandlerFactory() {
     return shardHandlerFactory;
   }
-  
+
   public UpdateShardHandler getUpdateShardHandler() {
     return updateShardHandler;
   }
@@ -1831,7 +1832,7 @@ public class CoreContainer {
   public SolrResourceLoader getResourceLoader() {
     return loader;
   }
-  
+
   public boolean isCoreLoading(String name) {
     return solrCores.isCoreLoading(name);
   }
@@ -1851,8 +1852,8 @@ public class CoreContainer {
   public long getStatus() {
     return status;
   }
-  
-  // Occasaionally we need to access the transient cache handler in places other than coreContainer.
+
+  // Occasionally we need to access the transient cache handler in places other than coreContainer.
   public TransientSolrCoreCache getTransientCache() {
     return solrCores.getTransientCacheHandler();
   }
@@ -1912,7 +1913,7 @@ public class CoreContainer {
     if (tragicException != null && isZooKeeperAware()) {
       getZkController().giveupLeadership(solrCore.getCoreDescriptor(), tragicException);
     }
-    
+
     return tragicException != null;
   }
 
@@ -1920,6 +1921,32 @@ public class CoreContainer {
     ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
   }
 
+  /**
+   * Run an arbitrary task in it's own thread. This is an expert option and is
+   * a method you should use with great care. It would be bad to run something that never stopped
+   * or run something that took a very long time. Typically this is intended for actions that take
+   * a few seconds, and therefore would be bad to wait for within a request, or actions that need to happen
+   * when a core has zero references, but but would not pose a significant hindrance to server shut down times.
+   * It is not intended for long running tasks and if you are using a Runnable with a loop in it, you are
+   * almost certainly doing it wrong.
+   * <p><br>
+   * WARNING: Solr wil not be able to shut down gracefully until this task completes!
+   * <p><br>
+   * A significant upside of using this method vs creating your own ExecutorService is that your code
+   * does not have to properly shutdown executors which typically is risky from a unit testing
+   * perspective since the test framework will complain if you don't carefully ensure the executor
+   * shuts down before the end of the test. Also the threads running this task are sure to have
+   * a proper MDC for logging.
+   * <p><br>
+   * Normally, one uses {@link SolrCore#runAsync(Runnable)} if possible, but in some cases
+   * you might need to execute a task asynchronously when you could be running on a node with no
+   * cores, and then use of this method is indicated.
+   *
+   * @param r the task to run
+   */
+  public void runAsync(Runnable r) {
+    coreContainerAsyncTaskExecutor.submit(r);
+  }
 }
 
 class CloserThread extends Thread {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index df173b6..70bcd1a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -52,6 +53,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController.NotInClusterStateException;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.api.collections.RoutedAlias;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
 import org.apache.solr.cloud.rule.Rule;
@@ -115,9 +117,7 @@ import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHan
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.REQUESTID;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARDS_PROP;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CREATE_COLLECTION_PREFIX;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.OPTIONAL_ROUTER_PARAMS;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.REQUIRED_ROUTER_PARAMS;
+import static org.apache.solr.cloud.api.collections.RoutedAlias.CREATE_COLLECTION_PREFIX;
 import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
@@ -567,20 +567,39 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       String alias = req.getParams().get(NAME);
       SolrIdentifierValidator.validateAliasName(alias);
       String collections = req.getParams().get("collections");
-      Map<String, Object> result = copy(req.getParams(), null, REQUIRED_ROUTER_PARAMS);
-      copy(req.getParams(), result, OPTIONAL_ROUTER_PARAMS);
+      RoutedAlias routedAlias = null;
+      Exception ex = null;
+      try {
+        // note that RA specific validation occurs here.
+        routedAlias = RoutedAlias.fromProps(alias, req.getParams().toMap(new HashMap<>()));
+      } catch (SolrException e) {
+        // we'll throw this later if we are in fact creating a routed alias.
+        ex = e;
+      }
       if (collections != null) {
-        if (result.size() > 1) { // (NAME should be there, and if it's not we will fail below)
-          throw new SolrException(BAD_REQUEST, "Collections cannot be specified when creating a time routed alias.");
+        if (routedAlias != null) {
+          throw new SolrException(BAD_REQUEST, "Collections cannot be specified when creating a routed alias.");
+        } else {
+          //////////////////////////////////////
+          // Regular alias creation indicated //
+          //////////////////////////////////////
+          return copy(req.getParams().required(), null, NAME, "collections");
         }
-        // regular alias creation...
-        return copy(req.getParams().required(), null, NAME, "collections");
       }
 
-      // Ok so we are creating a time routed alias from here
+      /////////////////////////////////////////////////
+      // We are creating a routed alias from here on //
+      /////////////////////////////////////////////////
+
+      // If our prior creation attempt had issues expose them now.
+      if (ex != null) {
+        throw ex;
+      }
+
+      // Now filter out just the parameters we care about from the request
+      Map<String, Object> result = copy(req.getParams(), null, routedAlias.getRequiredParams());
+      copy(req.getParams(), result, routedAlias.getOptionalParams());
 
-      // for validation....
-      copy(req.getParams().required(), null, REQUIRED_ROUTER_PARAMS);
       ModifiableSolrParams createCollParams = new ModifiableSolrParams(); // without prefix
 
       // add to result params that start with "create-collection.".
@@ -1273,14 +1292,14 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       log.info("Not waiting for active collection due to exception: " + createCollResponse.getResponse().get("exception"));
       return;
     }
-    
+
     int replicaFailCount;
     if (createCollResponse.getResponse().get("failure") != null) {
       replicaFailCount = ((NamedList) createCollResponse.getResponse().get("failure")).size();
     } else {
       replicaFailCount = 0;
     }
-    
+
     CloudConfig ccfg = cc.getConfig().getCloudConfig();
     Integer seconds = ccfg.getCreateCollectionWaitTimeTillActive();
     Boolean checkLeaderOnly = ccfg.isCreateCollectionCheckLeaderActive();
@@ -1294,7 +1313,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           // the collection was not created, don't wait
           return true;
         }
-        
+
         if (c.getSlices() != null) {
           Collection<Slice> shards = c.getSlices();
           int replicaNotAliveCnt = 0;
@@ -1322,13 +1341,13 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         return false;
       });
     } catch (TimeoutException | InterruptedException e) {
-   
+
       String  error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
       throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
     }
-    
+
   }
-  
+
   public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
     List l = (List) m.get(RULE);
     if (l != null) {
@@ -1364,7 +1383,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
     }
     return props;
   }
-  
+
   private static void verifyShardsParam(String shardsParam) {
     for (String shard : shardsParam.split(",")) {
       SolrIdentifierValidator.validateShardName(shard);
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 9f2a0ba..4addae0 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -50,7 +50,7 @@ public class DistributedUpdateProcessorFactory
   public UpdateRequestProcessor getInstance(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
     // note: will sometimes return DURP (no overhead) instead of wrapping
-    return TimeRoutedAliasUpdateProcessor.wrap(req,
+    return RoutedAliasUpdateProcessor.wrap(req,
         new DistributedUpdateProcessor(req, rsp, next));
   }
   
diff --git a/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
new file mode 100644
index 0000000..ab83e22
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.api.collections.RoutedAlias;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * Distributes update requests to a series of collections partitioned by a "routing" field.  Issues
+ * requests to create new collections on-demand.
+ *
+ * Depends on this core having a special core property that points to the alias name that this collection is a part of.
+ * And further requires certain properties on the Alias. Collections pointed to by the alias must be named for the alias
+ * plus underscored ('_') and a routing specifier specific to the type of routed alias. These collections should not be
+ * created by the user, but are created automatically by the routed alias.
+ *
+ * @since 7.2.0 (formerly known as TimeRoutedAliasUpdateProcessor)
+ */
+public class RoutedAliasUpdateProcessor extends UpdateRequestProcessor {
+
+  private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // refs to std infrastructure
+  private final SolrQueryRequest req;
+  private final SolrCmdDistributor cmdDistrib;
+  private final ZkController zkController;
+
+  // Stuff specific to this class
+  private final String thisCollection;
+  private final RoutedAlias routedAlias;
+  private final SolrParams outParamsToLeader;
+
+
+  public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
+    //TODO get from "Collection property"
+    final String aliasName = req.getCore().getCoreDescriptor()
+        .getCoreProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
+    final DistribPhase shardDistribPhase =
+        DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+    final DistribPhase aliasDistribPhase =
+        DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
+    if (aliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
+      // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
+      //    TODO this may eventually not be true but at the moment it is
+      // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
+      return next;
+    } else {
+      try {
+        RoutedAlias alias = RoutedAlias.fromProps(aliasName, getAliasProps(req, aliasName));
+        return new RoutedAliasUpdateProcessor(req, next, aliasDistribPhase, alias);
+      } catch (Exception e) { // ensure we throw SERVER_ERROR not BAD_REQUEST at this stage
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Routed alias has invalid properties: " + e, e);
+      }
+
+    }
+  }
+
+  private static Map<String, String> getAliasProps(SolrQueryRequest req, String aliasName) {
+    ZkController zkController = req.getCore().getCoreContainer().getZkController();
+    final Map<String, String> aliasProperties = zkController.getZkStateReader().getAliases().getCollectionAliasProperties(aliasName);
+    if (aliasProperties == null) {
+      throw RoutedAlias.newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
+    }
+    return aliasProperties;
+  }
+
+  private RoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
+                                     DistribPhase aliasDistribPhase, RoutedAlias routedAlias) {
+    super(next);
+    this.routedAlias = routedAlias;
+    assert aliasDistribPhase == DistribPhase.NONE;
+    final SolrCore core = req.getCore();
+    final CoreContainer cc = core.getCoreContainer();
+    this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    this.req = req;
+    this.zkController = cc.getZkController();
+    this.cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
+
+
+
+    ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
+    // Don't distribute these params; they will be distributed from the local processCommit separately.
+    //   (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
+    outParams.remove(UpdateParams.OPTIMIZE);
+    outParams.remove(UpdateParams.COMMIT);
+    outParams.remove(UpdateParams.SOFT_COMMIT);
+    outParams.remove(UpdateParams.PREPARE_COMMIT);
+    outParams.remove(UpdateParams.ROLLBACK);
+    // Add these...
+    //  Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
+    outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
+    //  Signal this is a distributed search from this URP (see #wrap())
+    outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+    outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
+    outParamsToLeader = outParams;
+  }
+
+  private String getAliasName() {
+    return routedAlias.getAliasName();
+  }
+
+  @Override
+  public void processAdd(AddUpdateCommand cmd) throws IOException {
+    routedAlias.validateRouteValue(cmd);
+
+    // to avoid potential for race conditions, this next method should not get called again unless
+    // we have created a collection synchronously
+    routedAlias.updateParsedCollectionAliases(this.zkController);
+
+    String targetCollection = routedAlias.createCollectionsIfRequired(cmd);
+
+    if (thisCollection.equals(targetCollection)) {
+      // pass on through; we've reached the right collection
+      super.processAdd(cmd);
+    } else {
+      // send to the right collection
+      SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
+      cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
+    }
+  }
+
+  @Override
+  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+    cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+  }
+
+  @Override
+  public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+    cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+    cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly?  It doesn't.
+  }
+
+// Not supported by SolrCmdDistributor and is sketchy any way
+//  @Override
+//  public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+//  }
+
+  @Override
+  public void finish() throws IOException {
+    try {
+      cmdDistrib.finish();
+      final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+      if (!errors.isEmpty()) {
+        throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
+      }
+    } finally {
+      super.finish();
+    }
+  }
+
+  @Override
+  protected void doClose() {
+    try {
+      cmdDistrib.close();
+    } finally {
+      super.doClose();
+    }
+  }
+
+  private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {
+    SchemaField uniqueKeyField = req.getSchema().getUniqueKeyField();
+    // schema might not have key field...
+    String idFieldName = uniqueKeyField == null ? null : uniqueKeyField.getName();
+    String idValue = uniqueKeyField == null ? null : doc.getFieldValue(idFieldName).toString();
+    DocCollection coll = zkController.getClusterState().getCollection(collection);
+    Slice slice = coll.getRouter().getTargetSlice(idValue, doc, null, req.getParams(), coll);
+    return getLeaderNode(collection, slice);
+  }
+
+  private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
+    final Aliases aliases = zkController.getZkStateReader().getAliases();
+    List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
+    if (collections == null) {
+      throw RoutedAlias.newAliasMustExistException(getAliasName());
+    }
+    return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
+  }
+
+  private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
+    final Slice[] activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlicesArr();
+    if (activeSlices.length == 0) {
+      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
+    }
+    final Slice slice = activeSlices[0];
+    return getLeaderNode(collection, slice);
+  }
+
+  private SolrCmdDistributor.Node getLeaderNode(String collection, Slice slice) {
+    //TODO when should we do StdNode vs RetryNode?
+    final Replica leader = slice.getLeader();
+    if (leader == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+          "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
+    }
+    return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
+        collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
+  }
+
+
+
+
+
+}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
deleted file mode 100644
index c28ac44..0000000
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update.processor;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.text.ParseException;
-import java.time.Instant;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.api.collections.MaintainRoutedAliasCmd;
-import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.schema.SchemaField;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.SolrCmdDistributor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-import org.apache.solr.util.DateMathParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.NONE;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.SYNCHRONOUS;
-
-/**
- * Distributes update requests to a rolling series of collections partitioned by a timestamp field.  Issues
- * requests to create new collections on-demand.
- *
- * Depends on this core having a special core property that points to the alias name that this collection is a part of.
- * And further requires certain properties on the Alias. Collections pointed to by the alias must be named for the alias
- * plus underscored ('_') and a time stamp of ISO_DATE plus optionally _HH_mm_ss. These collections should not be
- * created by the user, but are created automatically by the time partitioning system.
- *
- * @since 7.2.0
- */
-public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
-  //TODO do we make this more generic to others who want to partition collections using something else besides time?
-
-  private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  // refs to std infrastructure
-  private final SolrQueryRequest req;
-  private final SolrCmdDistributor cmdDistrib;
-  private final CollectionsHandler collHandler;
-  private final ZkController zkController;
-
-  // Stuff specific to this class
-  private final String thisCollection;
-  private final TimeRoutedAlias timeRoutedAlias;
-  private final SolrParams outParamsToLeader;
-
-  // These two fields may be updated within the calling thread during processing but should
-  // never be updated by any async creation thread.
-  private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection.  Sorted descending
-  private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
-
-  // This class is created once per request and the overseer methods prevent duplicate create requests
-  // from creating extra copies. All we need to track here is that we don't spam preemptive creates to
-  // the overseer multiple times from *this* request.
-  private volatile boolean preemptiveCreateOnceAlready = false;
-
-  public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
-    //TODO get from "Collection property"
-    final String aliasName = req.getCore().getCoreDescriptor()
-        .getCoreProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
-    final DistribPhase shardDistribPhase =
-        DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
-    final DistribPhase aliasDistribPhase =
-        DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
-    if (aliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
-      // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
-      //    TODO this may eventually not be true but at the moment it is
-      // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
-      return next;
-    } else {
-      return new TimeRoutedAliasUpdateProcessor(req, next, aliasName, aliasDistribPhase);
-    }
-  }
-
-  private TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
-                                         String aliasName,
-                                         DistribPhase aliasDistribPhase) {
-    super(next);
-    assert aliasDistribPhase == DistribPhase.NONE;
-    final SolrCore core = req.getCore();
-    this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    this.req = req;
-    CoreContainer cc = core.getCoreContainer();
-    zkController = cc.getZkController();
-    cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
-    collHandler = cc.getCollectionsHandler();
-
-    final Map<String, String> aliasProperties = zkController.getZkStateReader().getAliases().getCollectionAliasProperties(aliasName);
-    if (aliasProperties == null) {
-      throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
-    }
-    try {
-      this.timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasProperties);
-    } catch (Exception e) { // ensure we throw SERVER_ERROR not BAD_REQUEST at this stage
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Routed alias has invalid properties: " + e, e);
-    }
-
-    ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
-    // Don't distribute these params; they will be distributed from the local processCommit separately.
-    //   (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
-    outParams.remove(UpdateParams.OPTIMIZE);
-    outParams.remove(UpdateParams.COMMIT);
-    outParams.remove(UpdateParams.SOFT_COMMIT);
-    outParams.remove(UpdateParams.PREPARE_COMMIT);
-    outParams.remove(UpdateParams.ROLLBACK);
-    // Add these...
-    //  Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
-    outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
-    //  Signal this is a distributed search from this URP (see #wrap())
-    outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
-    outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
-    outParamsToLeader = outParams;
-  }
-
-  private String getAliasName() {
-    return timeRoutedAlias.getAliasName();
-  }
-
-  @Override
-  public void processAdd(AddUpdateCommand cmd) throws IOException {
-    final Instant docTimestamp =
-        parseRouteKey(cmd.getSolrInputDocument().getFieldValue(timeRoutedAlias.getRouteField()));
-
-    // TODO: maybe in some cases the user would want to ignore/warn instead?
-    if (docTimestamp.isAfter(Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs()))) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "The document's time routed key of " + docTimestamp + " is too far in the future given " +
-              TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
-    }
-
-    // to avoid potential for race conditions, this next method should not get called again unless
-    // we have created a collection synchronously
-    updateParsedCollectionAliases();
-
-    String targetCollection = createCollectionsIfRequired(docTimestamp, cmd);
-
-    if (thisCollection.equals(targetCollection)) {
-      // pass on through; we've reached the right collection
-      super.processAdd(cmd);
-    } else {
-      // send to the right collection
-      SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
-      cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
-    }
-  }
-
-  /**
-   * Create any required collections and return the name of the collection to which the current document should be sent.
-   *
-   * @param docTimestamp the date for the document taken from the field specified in the TRA config
-   * @param cmd The initial calculated destination collection.
-   * @return The name of the proper destination collection for the document which may or may not be a
-   *         newly created collection
-   */
-  private String createCollectionsIfRequired(Instant docTimestamp, AddUpdateCommand cmd) {
-    // Even though it is possible that multiple requests hit this code in the 1-2 sec that
-    // it takes to create a collection, it's an established anti-pattern to feed data with a very large number
-    // of client connections. This in mind, we only guard against spamming the overseer within a batch of
-    // updates. We are intentionally tolerating a low level of redundant requests in favor of simpler code. Most
-    // super-sized installations with many update clients will likely be multi-tenant and multiple tenants
-    // probably don't write to the same alias. As such, we have deferred any solution to the "many clients causing
-    // collection creation simultaneously" problem until such time as someone actually has that problem in a
-    // real world use case that isn't just an anti-pattern.
-    Map.Entry<Instant, String> candidateCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
-    String candidateCollectionName = candidateCollectionDesc.getValue();
-    try {
-      switch (typeOfCreationRequired(docTimestamp, candidateCollectionDesc.getKey())) {
-        case SYNCHRONOUS:
-          // This next line blocks until all collections required by the current document have been created
-          return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
-        case ASYNC_PREEMPTIVE:
-          if (!preemptiveCreateOnceAlready) {
-            log.info("EXECUTING preemptive creation for {}", timeRoutedAlias.getAliasName());
-            // It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
-            // in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
-            // and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
-            // collections that existed when candidateCollectionDesc was created. If this class updates it's notion of
-            // parsedCollectionsDesc since candidateCollectionDesc was chosen, we could create collection n+2
-            // instead of collection n+1.
-            String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
-
-            // This line does not block and the document can be added immediately
-            preemptiveAsync(() -> createNextCollection(mostRecentCollName));
-          }
-          return candidateCollectionName;
-        case NONE:
-          return candidateCollectionName; // could use fall through, but fall through is fiddly for later editors.
-        default:
-          throw unknownCreateType();
-      }
-      // do nothing if creationType == NONE
-    } catch (SolrException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-    }
-  }
-
-  private void preemptiveAsync(Runnable r) {
-    preemptiveCreateOnceAlready = true;
-    req.getCore().runAsync(r);
-  }
-
-  /**
-   * Determine if the a new collection will be required based on the document timestamp. Passing null for
-   * preemptiveCreateInterval tells you if the document is beyond all existing collections with a response of
-   * {@link CreationType#NONE} or {@link CreationType#SYNCHRONOUS}, and passing a valid date math for
-   * preemptiveCreateMath additionally distinguishes the case where the document is close enough to the end of
-   * the TRA to trigger preemptive creation but not beyond all existing collections with a value of
-   * {@link CreationType#ASYNC_PREEMPTIVE}.
-   *
-   * @param docTimeStamp The timestamp from the document
-   * @param targetCollectionTimestamp The timestamp for the presently selected destination collection
-   * @return a {@code CreationType} indicating if and how to create a collection
-   */
-  private CreationType typeOfCreationRequired(Instant docTimeStamp, Instant targetCollectionTimestamp) {
-    final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(targetCollectionTimestamp);
-
-    if (!docTimeStamp.isBefore(nextCollTimestamp)) {
-      // current document is destined for a collection that doesn't exist, must create the destination
-      // to proceed with this add command
-      return SYNCHRONOUS;
-    }
-
-    if (isNotBlank(timeRoutedAlias.getPreemptiveCreateWindow())) {
-      Instant preemptNextColCreateTime =
-          calcPreemptNextColCreateTime(timeRoutedAlias.getPreemptiveCreateWindow(), nextCollTimestamp);
-      if (!docTimeStamp.isBefore(preemptNextColCreateTime)) {
-        return ASYNC_PREEMPTIVE;
-      }
-    }
-
-    return NONE;
-  }
-
-  private Instant calcPreemptNextColCreateTime(String preemptiveCreateMath, Instant nextCollTimestamp) {
-    DateMathParser dateMathParser = new DateMathParser();
-    dateMathParser.setNow(Date.from(nextCollTimestamp));
-    try {
-      return dateMathParser.parseMath(preemptiveCreateMath).toInstant();
-    } catch (ParseException e) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "Invalid Preemptive Create Window Math:'" + preemptiveCreateMath + '\'', e);
-    }
-  }
-
-  private Instant parseRouteKey(Object routeKey) {
-    final Instant docTimestamp;
-    if (routeKey instanceof Instant) {
-      docTimestamp = (Instant) routeKey;
-    } else if (routeKey instanceof Date) {
-      docTimestamp = ((Date)routeKey).toInstant();
-    } else if (routeKey instanceof CharSequence) {
-      docTimestamp = Instant.parse((CharSequence)routeKey);
-    } else {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
-    }
-    return docTimestamp;
-  }
-
-  /**
-   * Ensure {@link #parsedCollectionsAliases} is up to date. If it was modified, return true.
-   * Note that this will return true if some other alias was modified or if properties were modified. These
-   * are spurious and the caller should be written to be tolerant of no material changes.
-   */
-  private boolean updateParsedCollectionAliases() {
-    final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
-    if (this.parsedCollectionsAliases != aliases) {
-      if (this.parsedCollectionsAliases != null) {
-        log.debug("Observing possibly updated alias: {}", getAliasName());
-      }
-      this.parsedCollectionsDesc = timeRoutedAlias.parseCollections(aliases, this::newAliasMustExistException);
-      this.parsedCollectionsAliases = aliases;
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Given the route key, finds the correct collection or returns the most recent collection if the doc
-   * is in the future. Future docs will potentially cause creation of a collection that does not yet exist
-   * or an error if they exceed the maxFutureMs setting.
-   *
-   * @throws SolrException if the doc is too old to be stored in the TRA
-   */
-  private Map.Entry<Instant, String> findCandidateGivenTimestamp(Instant docTimestamp, String printableId) {
-    // Lookup targetCollection given route key.  Iterates in reverse chronological order.
-    //    We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
-    for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
-      Instant colStartTime = entry.getKey();
-      if (!docTimestamp.isBefore(colStartTime)) {  // i.e. docTimeStamp is >= the colStartTime
-        return entry; //found it
-      }
-    }
-    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-        "Doc " + printableId + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + docTimestamp);
-  }
-
-  private void createNextCollection(String mostRecentCollName) {
-    // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name).  It will create the collection
-    //   and update the alias contingent on the most recent collection name being the same as
-    //   what we think so here, otherwise it will return (without error).
-    try {
-      MaintainRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
-      // we don't care about the response.  It's possible no collection was created because
-      //  of a race and that's okay... we'll ultimately retry any way.
-
-      // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
-      //  not yet know about the new alias (thus won't see the newly added collection to it), and we might think
-      //  we failed.
-      zkController.getZkStateReader().aliasesManager.update();
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-    }
-  }
-
-  private SolrException newAliasMustExistException() {
-    throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-        "Collection " + thisCollection + " created for use with alias " + getAliasName() + " which doesn't exist anymore." +
-            " You cannot write to this unless the alias exists.");
-  }
-
-  @Override
-  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
-    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
-    cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
-  }
-
-  @Override
-  public void processCommit(CommitUpdateCommand cmd) throws IOException {
-    final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
-    cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
-    cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly?  It doesn't.
-  }
-
-// Not supported by SolrCmdDistributor and is sketchy any way
-//  @Override
-//  public void processRollback(RollbackUpdateCommand cmd) throws IOException {
-//  }
-
-  @Override
-  public void finish() throws IOException {
-    try {
-      cmdDistrib.finish();
-      final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
-      if (!errors.isEmpty()) {
-        throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
-      }
-    } finally {
-      super.finish();
-    }
-  }
-
-  @Override
-  protected void doClose() {
-    try {
-      cmdDistrib.close();
-    } finally {
-      super.doClose();
-    }
-  }
-
-  private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {
-    SchemaField uniqueKeyField = req.getSchema().getUniqueKeyField();
-    // schema might not have key field...
-    String idFieldName = uniqueKeyField == null ? null : uniqueKeyField.getName();
-    String idValue = uniqueKeyField == null ? null : doc.getFieldValue(idFieldName).toString();
-    DocCollection coll = zkController.getClusterState().getCollection(collection);
-    Slice slice = coll.getRouter().getTargetSlice(idValue, doc, null, req.getParams(), coll);
-    return getLeaderNode(collection, slice);
-  }
-
-  private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
-    final Aliases aliases = zkController.getZkStateReader().getAliases();
-    List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
-    if (collections == null) {
-      throw newAliasMustExistException();
-    }
-    return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
-  }
-
-  private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
-    final Slice[] activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlicesArr();
-    if (activeSlices.length == 0) {
-      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
-    }
-    final Slice slice = activeSlices[0];
-    return getLeaderNode(collection, slice);
-  }
-
-  private SolrCmdDistributor.Node getLeaderNode(String collection, Slice slice) {
-    //TODO when should we do StdNode vs RetryNode?
-    final Replica leader = slice.getLeader();
-    if (leader == null) {
-      throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-          "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
-    }
-    return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
-        collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
-  }
-
-
-  /**
-   * Create as many collections as required. This method loops to allow for the possibility that the docTimestamp
-   * requires more than one collection to be created. Since multiple threads may be invoking maintain on separate
-   * requests to the same alias, we must pass in the name of the collection that this thread believes to be the most
-   * recent collection. This assumption is checked when the command is executed in the overseer. When this method
-   * finds that all collections required have been created it returns the (possibly new) most recent collection.
-   * The return value is ignored by the calling code in the async preemptive case.
-   *
-   * @param docTimestamp the timestamp from the document that determines routing
-   * @param printableId an identifier for the add command used in error messages
-   * @param targetCollectionDesc the descriptor for the presently selected collection which should also be
-   *                             the most recent collection in all cases where this method is invoked.
-   * @return The latest collection, including collections created during maintenance
-   */
-  private String createAllRequiredCollections( Instant docTimestamp, String printableId,
-                                               Map.Entry<Instant, String> targetCollectionDesc) {
-    do {
-      switch(typeOfCreationRequired(docTimestamp, targetCollectionDesc.getKey())) {
-        case NONE:
-          return targetCollectionDesc.getValue(); // we don't need another collection
-        case ASYNC_PREEMPTIVE:
-          // can happen when preemptive interval is longer than one time slice
-          if (!preemptiveCreateOnceAlready) {
-            String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
-            preemptiveAsync(() -> createNextCollection(mostRecentCollName));
-            return targetCollectionDesc.getValue();
-          }
-        case SYNCHRONOUS:
-          createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but...
-          if (!updateParsedCollectionAliases()) { // thus we didn't make progress...
-            // this is not expected, even in known failure cases, but we check just in case
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                "We need to create a new time routed collection but for unknown reasons were unable to do so.");
-          }
-          // then retry the loop ... have to do find again in case other requests also added collections
-          // that were made visible when we called updateParsedCollectionAliases()
-          targetCollectionDesc = findCandidateGivenTimestamp(docTimestamp, printableId);
-          break;
-        default:
-          throw unknownCreateType();
-      }
-    } while (true);
-  }
-
-  private SolrException unknownCreateType() {
-    return new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown creation type while adding " +
-        "document to a Time Routed Alias! This is a bug caused when a creation type has been added but " +
-        "not all code has been updated to handle it.");
-  }
-
-  enum CreationType {
-    NONE,
-    ASYNC_PREEMPTIVE,
-    SYNCHRONOUS
-  }
-
-
-}
diff --git a/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java b/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
index 2e18d18..85a755a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
@@ -282,7 +282,7 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
         "&router.interval=%2B30MINUTE" +
         "&create-collection.collection.configName=_default" +
         "&create-collection.numShards=1");
-    assertFailure(get, "Only 'time' routed aliases is supported right now");
+    assertFailure(get, " is not in supported types, ");
   }
 
   @Test
diff --git a/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
new file mode 100644
index 0000000..77ecb22
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/CategoryRoutedAliasUpdateProcessorTest.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.api.collections.CategoryRoutedAlias;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.util.LogLevel;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CategoryRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcessorTest {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // use this for example categories
+  private static final String[] SHIPS = {
+      "Constructor",
+      "Heart of Gold",
+      "Stunt Ship",
+      "B-ark",
+      "Bi$tromath"
+  };
+
+  private static final String categoryField = "ship_name_en";
+  private static final String intField = "integer_i";
+
+  private int lastDocId = 0;
+  private static CloudSolrClient solrClient;
+  private int numDocsDeletedOrFailed = 0;
+
+  @Before
+  public void doBefore() throws Exception {
+    configureCluster(4).configure();
+    solrClient = getCloudSolrClient(cluster);
+    //log this to help debug potential causes of problems
+    log.info("SolrClient: {}", solrClient);
+    log.info("ClusterStateProvider {}", solrClient.getClusterStateProvider());
+  }
+
+  @After
+  public void doAfter() throws Exception {
+    solrClient.close();
+    shutdownCluster();
+  }
+
+  @AfterClass
+  public static void finish() throws Exception {
+    IOUtils.close(solrClient);
+  }
+
+  public void testNonEnglish() throws Exception {
+    // test to document the expected behavior with non-english text for categories
+    // the present expectation is that non-latin text and many accented latin characters
+    // will get replaced with '_'. This is necessary to maintain collection naming
+    // conventions. The net effect is that documents get sorted by the number of characters
+    // in the category rather than the actual categories.
+
+    // This should be changed in an enhancement (wherein the category is RFC-4648 url-safe encoded).
+    // For now document it as an expected limitation.
+
+    String somethingInChinese = "中文的东西";      // 5 chars
+    String somethingInHebrew = "משהו בסינית";      // 11 chars
+    String somethingInThai = "บางอย่างในภาษาจีน";   // 17 chars
+    String somethingInArabic = "شيء في الصينية"; // 14 chars
+    String somethingInGreek = "κάτι κινεζικό";   // 13 chars
+    String somethingInGujarati = "િનીમાં કંઈક";       // 11 chars (same as hebrew)
+
+    String ONE_   = "_";
+    String TWO_   = "__";
+    String THREE_ = "___";
+    String FOUR_  = "____";
+    String FIVE_  = "_____";
+
+    String collectionChinese  = getAlias() + "__CRA__" + FIVE_;
+    String collectionHebrew   = getAlias() + "__CRA__" + FIVE_ + FIVE_ + ONE_;
+    String collectionThai     = getAlias() + "__CRA__" + FIVE_ + FIVE_ + FIVE_ + TWO_;
+    String collectionArabic   = getAlias() + "__CRA__" + FIVE_ + FIVE_ + FOUR_;
+    String collectionGreek    = getAlias() + "__CRA__" + FIVE_ + FIVE_ + THREE_;
+    // Note Gujarati not listed, because it duplicates hebrew.
+
+    String configName = getSaferTestName();
+    createConfigSet(configName);
+
+    List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+    List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+    // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+    assertTrue("We expect at least 2 configSets",
+        retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+    assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+    CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField, 20,
+        CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+            .setMaxShardsPerNode(2))
+        .process(solrClient);
+    addDocsAndCommit(true,
+        newDoc(somethingInChinese),
+        newDoc(somethingInHebrew),
+        newDoc(somethingInThai),
+        newDoc(somethingInArabic),
+        newDoc(somethingInGreek),
+        newDoc(somethingInGujarati));
+
+    // Note Gujarati not listed, because it duplicates hebrew.
+    assertInvariants(collectionChinese, collectionHebrew, collectionThai, collectionArabic, collectionGreek);
+
+    assertColHasDocCount(collectionChinese, 1);
+    assertColHasDocCount(collectionHebrew, 2);
+    assertColHasDocCount(collectionThai, 1);
+    assertColHasDocCount(collectionArabic, 1);
+    assertColHasDocCount(collectionGreek, 1);
+
+  }
+
+  private void assertColHasDocCount(String collectionChinese, int expected) throws SolrServerException, IOException {
+    final QueryResponse colResponse = solrClient.query(collectionChinese, params(
+        "q", "*:*",
+        "rows", "0"));
+    long aliasNumFound = colResponse.getResults().getNumFound();
+    assertEquals(expected,aliasNumFound);
+  }
+
+  @Slow
+  @Test
+  public void test() throws Exception {
+    String configName = getSaferTestName();
+    createConfigSet(configName);
+
+    // Start with one collection manually created (and use higher numShards & replicas than we'll use for others)
+    //  This tests we may pre-create the collection and it's acceptable.
+    final String colVogon = getAlias() + "__CRA__" + SHIPS[0];
+
+    // we expect changes ensuring a legal collection name.
+    final String colHoG = getAlias() + "__CRA__" + noSpaces(SHIPS[1]);
+    final String colStunt = getAlias() + "__CRA__" + noSpaces(SHIPS[2]);
+    final String colArk = getAlias() + "__CRA__" + noDashes(SHIPS[3]);
+    final String colBistro = getAlias() + "__CRA__" + noDollar(SHIPS[4]);
+
+    List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+    List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+    // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+    assertTrue("We expect at least 2 configSets",
+        retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+    assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+    CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField, 20,
+        CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+            .setMaxShardsPerNode(2))
+        .process(solrClient);
+
+    // now we index a document
+    addDocsAndCommit(true, newDoc(SHIPS[0]));
+    //assertDocRoutedToCol(lastDocId, col23rd);
+
+    String uninitialized = getAlias() + "__CRA__" + CategoryRoutedAlias.UNINITIALIZED;
+    assertInvariants(colVogon, uninitialized);
+
+    addDocsAndCommit(true,
+        newDoc(SHIPS[1]),
+        newDoc(SHIPS[2]),
+        newDoc(SHIPS[3]),
+        newDoc(SHIPS[4]));
+
+    assertInvariants(colVogon, colHoG, colStunt, colArk, colBistro);
+
+    // make sure we fail if we have no value to route on.
+    testFailedDocument(newDoc(null), "Route value is null");
+    testFailedDocument(newDoc("foo__CRA__bar"), "7 character sequence __CRA__");
+    testFailedDocument(newDoc("fóóCRAóóbar"), "7 character sequence __CRA__");
+
+  }
+
+  private String noSpaces(String ship) {
+    return ship.replaceAll("\\s", "_");
+  }
+  private String noDashes(String ship) {
+    return ship.replaceAll("-", "_");
+  }
+  private String noDollar(String ship) {
+    return ship.replaceAll("\\$", "_");
+  }
+
+  @Slow
+  @Test
+  public void testMustMatch() throws Exception {
+    String configName = getSaferTestName();
+    createConfigSet(configName);
+    final String mustMatchRegex = "HHS\\s.+_solr";
+
+    final int maxCardinality = Integer.MAX_VALUE; // max cardinality for current test
+
+    // Start with one collection manually created (and use higher numShards & replicas than we'll use for others)
+    //  This tests we may pre-create the collection and it's acceptable.
+    final String colVogon = getAlias() + "__CRA__" + noSpaces("HHS "+ SHIPS[0]) + "_solr";
+
+    // we expect changes ensuring a legal collection name.
+    final String colHoG = getAlias() + "__CRA__" + noSpaces("HHS "+ SHIPS[1]) + "_solr";
+
+    List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+    List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+    // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+    assertTrue("We expect at least 2 configSets",
+        retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+    assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+    CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField, maxCardinality,
+        CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+            .setMaxShardsPerNode(2))
+        .setMustMatch(mustMatchRegex)
+        .process(solrClient);
+
+    // now we index a document
+    addDocsAndCommit(true, newDoc("HHS " + SHIPS[0] + "_solr"));
+    //assertDocRoutedToCol(lastDocId, col23rd);
+
+    String uninitialized = getAlias() + "__CRA__" + CategoryRoutedAlias.UNINITIALIZED;
+    assertInvariants(colVogon, uninitialized);
+
+    addDocsAndCommit(true, newDoc("HHS "+ SHIPS[1] + "_solr"));
+
+    assertInvariants(colVogon, colHoG);
+
+    // should fail since max cardinality is reached
+    testFailedDocument(newDoc(SHIPS[2]), "does not match " + CategoryRoutedAlias.ROUTER_MUST_MATCH);
+    assertInvariants(colVogon, colHoG);
+  }
+
+  @Slow
+  @Test
+  public void testInvalidMustMatch() throws Exception {
+    String configName = getSaferTestName();
+    createConfigSet(configName);
+    // Not a valid regex
+    final String mustMatchRegex = "+_solr";
+
+    final int maxCardinality = Integer.MAX_VALUE; // max cardinality for current test
+
+    List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+    List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+    // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+    assertTrue("We expect at least 2 configSets",
+        retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+    assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+    SolrException e = expectThrows(SolrException.class, () -> CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField, maxCardinality,
+        CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+            .setMaxShardsPerNode(2))
+        .setMustMatch(mustMatchRegex)
+        .process(solrClient)
+    );
+
+    assertTrue("Create Alias should fail since router.mustMatch must be a valid regular expression",
+        e.getMessage().contains("router.mustMatch must be a valid regular expression"));
+  }
+
+  @Slow
+  @Test
+  public void testMaxCardinality() throws Exception {
+    String configName = getSaferTestName();
+    createConfigSet(configName);
+
+    final int maxCardinality = 2; // max cardinality for current test
+
+    // Start with one collection manually created (and use higher numShards & replicas than we'll use for others)
+    //  This tests we may pre-create the collection and it's acceptable.
+    final String colVogon = getAlias() + "__CRA__" + SHIPS[0];
+
+    // we expect changes ensuring a legal collection name.
+    final String colHoG = getAlias() + "__CRA__" + SHIPS[1].replaceAll("\\s", "_");
+
+    List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+    List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
+
+    // config sets leak between tests so we can't be any more specific than this on the next 2 asserts
+    assertTrue("We expect at least 2 configSets",
+        retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
+    assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
+
+    CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField, maxCardinality,
+        CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
+            .setMaxShardsPerNode(2))
+        .process(solrClient);
+
+    // now we index a document
+    addDocsAndCommit(true, newDoc(SHIPS[0]));
+    //assertDocRoutedToCol(lastDocId, col23rd);
+
+    String uninitialized = getAlias() + "__CRA__" + CategoryRoutedAlias.UNINITIALIZED;
+    assertInvariants(colVogon, uninitialized);
+
+    addDocsAndCommit(true, newDoc(SHIPS[1]));
+
+    assertInvariants(colVogon, colHoG);
+
+    // should fail since max cardinality is reached
+    testFailedDocument(newDoc(SHIPS[2]), "Max cardinality");
+    assertInvariants(colVogon, colHoG);
+  }
+
+
+  /**
+   * Test that the Update Processor Factory routes documents to leader shards and thus
+   * avoids the possibility of introducing an extra hop to find the leader.
+   *
+   * @throws Exception when it blows up unexpectedly :)
+   */
+  @Slow
+  @Test
+  @LogLevel("org.apache.solr.update.processor.TrackingUpdateProcessorFactory=DEBUG")
+  public void testSliceRouting() throws Exception {
+    String configName = getSaferTestName();
+    createConfigSet(configName);
+
+    // each collection has 4 shards with 3 replicas for 12 possible destinations
+    // 4 of which are leaders, and 8 of which should fail this test.
+    final int numShards = 1 + random().nextInt(4);
+    final int numReplicas = 1 + random().nextInt(3);
+    CollectionAdminRequest.createCategoryRoutedAlias(getAlias(), categoryField, 20,
+        CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
+            .setMaxShardsPerNode(numReplicas))
+        .process(solrClient);
+
+    // cause some collections to be created
+    assertUpdateResponse(solrClient.add(getAlias(), new SolrInputDocument("id","1",categoryField, SHIPS[0])));
+    assertUpdateResponse(solrClient.add(getAlias(), new SolrInputDocument("id","2",categoryField, SHIPS[1])));
+    assertUpdateResponse(solrClient.add(getAlias(), new SolrInputDocument("id","3",categoryField, SHIPS[2])));
+    assertUpdateResponse(solrClient.commit(getAlias()));
+
+    // wait for all the collections to exist...
+
+    waitColAndAlias(getAlias(), "__CRA__", SHIPS[0], numShards);
+    waitColAndAlias(getAlias(), "__CRA__", noSpaces(SHIPS[1]), numShards);
+    waitColAndAlias(getAlias(), "__CRA__", noSpaces(SHIPS[2]), numShards);
+
+    // at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
+    // 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
+    // leader randomly and not causing a failure if the code is broken, but as a whole this test will therefore only have
+    // about a 3.6% false positive rate (0.33^3). If that's not good enough, add more docs or more replicas per shard :).
+
+    final String trackGroupName = getTrackUpdatesGroupName();
+    final List<UpdateCommand> updateCommands;
+    try {
+      TrackingUpdateProcessorFactory.startRecording(trackGroupName);
+
+      ModifiableSolrParams params = params("post-processor", "tracking-" + trackGroupName);
+      List<SolrInputDocument> list = Arrays.asList(
+          sdoc("id", "4", categoryField, SHIPS[0]),
+          sdoc("id", "5", categoryField, SHIPS[1]),
+          sdoc("id", "6", categoryField, SHIPS[2]));
+      Collections.shuffle(list, random()); // order should not matter here
+      assertUpdateResponse(add(getAlias(), list,
+          params));
+    } finally {
+      updateCommands = TrackingUpdateProcessorFactory.stopRecording(trackGroupName);
+    }
+    assertRouting(numShards, updateCommands);
+  }
+
+
+  private void assertInvariants(String... expectedColls) throws IOException, SolrServerException {
+    final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
+
+    List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(getAlias());
+    cols = new ArrayList<>(cols);
+    cols.sort(String::compareTo); // don't really care about the order here.
+    assert !cols.isEmpty();
+
+    int totalNumFound = 0;
+    for (String col : cols) {
+      final QueryResponse colResponse = solrClient.query(col, params(
+          "q", "*:*",
+          "rows", "0"));
+      long numFound = colResponse.getResults().getNumFound();
+      if (numFound > 0) {
+        totalNumFound += numFound;
+      }
+    }
+    final QueryResponse colResponse = solrClient.query(getAlias(), params(
+        "q", "*:*",
+        "rows", "0"));
+    long aliasNumFound = colResponse.getResults().getNumFound();
+    List<String> actual = Arrays.asList(expectedColls);
+    actual.sort(String::compareTo);
+    assertArrayEquals("Expected " + expectedColls.length + " collections, found " + cols.size() + ":\n" +
+            cols + " vs \n" + actual, expectedColls, cols.toArray());
+    assertEquals("Expected collections and alias to have same number of documents",
+        aliasNumFound, totalNumFound);
+    assertEquals("Expected to find " + expectNumFound + " docs but found " + aliasNumFound,
+        expectNumFound, aliasNumFound);
+  }
+
+  private SolrInputDocument newDoc(String routedValue) {
+    if (routedValue != null) {
+      return sdoc("id", Integer.toString(++lastDocId),
+          categoryField, routedValue,
+          intField, "0"); // always 0
+    } else {
+      return sdoc("id", Integer.toString(++lastDocId),
+          intField, "0"); // always 0
+    }
+  }
+
+  @Override
+  public String getAlias() {
+    return "myAlias";
+  }
+
+  @Override
+  public CloudSolrClient getSolrClient() {
+    return solrClient;
+  }
+
+
+  public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
+
+    @Override
+    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+      return FieldValueMutatingUpdateProcessor.valueMutator(getSelector(), next,
+          (src) -> Integer.valueOf(src.toString()) + 1);
+    }
+  }
+
+  private void testFailedDocument(SolrInputDocument sdoc, String errorMsg) throws SolrServerException, IOException {
+    try {
+      final UpdateResponse resp = solrClient.add(getAlias(), sdoc);
+      // if we have a TolerantUpdateProcessor then we see it there)
+      final Object errors = resp.getResponseHeader().get("errors"); // Tolerant URP
+      assertNotNull(errors);
+      assertTrue("Expected to find " + errorMsg + " in errors: " + errors.toString(),errors.toString().contains(errorMsg));
+    } catch (SolrException e) {
+      assertTrue(e.getMessage().contains(errorMsg));
+    }
+    ++numDocsDeletedOrFailed;
+  }
+
+}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
new file mode 100644
index 0000000..a15f8c0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.junit.Ignore;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+@Ignore  // don't try too run abstract base class
+public abstract class RoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+
+  private static final String intField = "integer_i";
+
+  void waitColAndAlias(String alias, String separator, final String suffix, int slices) throws InterruptedException {
+    // collection to exist
+    String collection = alias + separator + suffix;
+    waitCol(slices, collection);
+    // and alias to be aware of collection
+    long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
+    while (!haveCollection(alias, collection)) {
+      if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
+        fail("took over 10 seconds after collection creation to update aliases");
+      } else {
+        Thread.sleep(500);
+      }
+    }
+  }
+
+  private boolean haveCollection(String alias, String collection) {
+    // separated into separate lines to make it easier to track down an NPE that occurred once
+    // 3000 runs if it shows up again...
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    ZkStateReader zkStateReader = solrClient.getZkStateReader();
+    Aliases aliases = zkStateReader.getAliases();
+    Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
+    List<String> strings = collectionAliasListMap.get(alias);
+    return strings.contains(collection);
+  }
+
+  /** @see TrackingUpdateProcessorFactory */
+  String getTrackUpdatesGroupName() {
+    return getSaferTestName();
+  }
+
+  void createConfigSet(String configName) throws SolrServerException, IOException {
+    // First create a configSet
+    // Then we create a collection with the name of the eventual config.
+    // We configure it, and ultimately delete the collection, leaving a modified config-set behind.
+    // Later we create the "real" collections referencing this modified config-set.
+    assertEquals(0, new ConfigSetAdminRequest.Create()
+        .setConfigSetName(configName)
+        .setBaseConfigSetName("_default")
+        .process(getSolrClient()).getStatus());
+
+    CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(getSolrClient());
+
+    // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
+    waitCol(1,configName);
+    // manipulate the config...
+    checkNoError(getSolrClient().request(new V2Request.Builder("/collections/" + configName + "/config")
+        .withMethod(SolrRequest.METHOD.POST)
+        .withPayload("{" +
+            "  'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
+            "  'add-updateprocessor' : {" +
+            "    'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
+            "  }," +
+            // See TrackingUpdateProcessorFactory javadocs for details...
+            "  'add-updateprocessor' : {" +
+            "    'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
+            "  }," +
+            "  'add-updateprocessor' : {" + // for testing
+            "    'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
+            "    'fieldName':'" + getIntField() + "'" +
+            "  }," +
+            "}").build()));
+    // only sometimes test with "tolerant" URP:
+    final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
+    checkNoError(getSolrClient().request(new V2Request.Builder("/collections/" + configName + "/config/params")
+        .withMethod(SolrRequest.METHOD.POST)
+        .withPayload("{" +
+            "  'set' : {" +
+            "    '_UPDATE' : {'processor':'" + urpNames + "'}" +
+            "  }" +
+            "}").build()));
+
+    CollectionAdminRequest.deleteCollection(configName).process(getSolrClient());
+    assertTrue(
+        new ConfigSetAdminRequest.List().process(getSolrClient()).getConfigSets()
+            .contains(configName)
+    );
+  }
+
+  String getIntField() {
+    return intField;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  void checkNoError(NamedList<Object> response) {
+    Object errors = response.get("errorMessages");
+    assertNull("" + errors, errors);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  Set<String> getLeaderCoreNames(ClusterState clusterState) {
+    Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
+    List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+    for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
+      List<CoreDescriptor> coreDescriptors = jettySolrRunner.getCoreContainer().getCoreDescriptors();
+      for (CoreDescriptor core : coreDescriptors) {
+        String nodeName = jettySolrRunner.getNodeName();
+        String collectionName = core.getCollectionName();
+        DocCollection collectionOrNull = clusterState.getCollectionOrNull(collectionName);
+        List<Replica> leaderReplicas = collectionOrNull.getLeaderReplicas(nodeName);
+        if (leaderReplicas != null) {
+          for (Replica leaderReplica : leaderReplicas) {
+            leaders.add(leaderReplica.getCoreName());
+          }
+        }
+      }
+    }
+    return leaders;
+  }
+
+  void assertRouting(int numShards, List<UpdateCommand> updateCommands) throws IOException {
+    try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
+      ClusterStateProvider clusterStateProvider = cloudSolrClient.getClusterStateProvider();
+      clusterStateProvider.connect();
+      Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
+      assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
+
+      assertEquals(3, updateCommands.size());
+      for (UpdateCommand updateCommand : updateCommands) {
+        String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
+        assertTrue("Update was not routed to a leader (" + node + " not in list of leaders" + leaders, leaders.contains(node));
+      }
+    }
+  }
+
+  public abstract String getAlias() ;
+
+  public abstract CloudSolrClient getSolrClient() ;
+
+
+  @SuppressWarnings("WeakerAccess")
+  void waitCol(int slices, String collection) {
+    waitForState("waiting for collections to be created", collection,
+        (liveNodes, collectionState) -> {
+          if (collectionState == null) {
+            // per predicate javadoc, this is what we get if the collection doesn't exist at all.
+            return false;
+          }
+          Collection<Slice> activeSlices = collectionState.getActiveSlices();
+          int size = activeSlices.size();
+          return size == slices;
+        });
+  }
+
+  /** Adds these documents and commits, returning when they are committed.
+   * We randomly go about this in different ways. */
+  void addDocsAndCommit(boolean aliasOnly, SolrInputDocument... solrInputDocuments) throws Exception {
+    // we assume all docs will be added (none too old/new to cause exception)
+    Collections.shuffle(Arrays.asList(solrInputDocuments), random());
+
+    // this is a list of the collections & the alias name.  Use to pick randomly where to send.
+    //   (it doesn't matter where we send docs since the alias is honored at the URP level)
+    List<String> collections = new ArrayList<>();
+    collections.add(getAlias());
+    if (!aliasOnly) {
+      collections.addAll(new CollectionAdminRequest.ListAliases().process(getSolrClient()).getAliasesAsLists().get(getAlias()));
+    }
+
+    int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
+
+    if (random().nextBoolean()) {
+      // Send in separate threads. Choose random collection & solrClient
+      try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
+        ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
+            new DefaultSolrThreadFactory(getSaferTestName()));
+        List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
+        for (SolrInputDocument solrInputDocument : solrInputDocuments) {
+          String col = collections.get(random().nextInt(collections.size()));
+          futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
+        }
+        for (Future<UpdateResponse> future : futures) {
+          assertUpdateResponse(future.get());
+        }
+        // at this point there shouldn't be any tasks running
+        assertEquals(0, exec.shutdownNow().size());
+      }
+    } else {
+      // send in a batch.
+      String col = collections.get(random().nextInt(collections.size()));
+      try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
+        assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
+      }
+    }
+    String col = collections.get(random().nextInt(collections.size()));
+    if (commitWithin == -1) {
+      getSolrClient().commit(col);
+    } else {
+      // check that it all got committed eventually
+      String docsQ =
+          "{!terms f=id}"
+          + Arrays.stream(solrInputDocuments).map(d -> d.getFieldValue("id").toString())
+              .collect(Collectors.joining(","));
+      int numDocs = queryNumDocs(docsQ);
+      if (numDocs == solrInputDocuments.length) {
+        System.err.println("Docs committed sooner than expected.  Bug or slow test env?");
+        return;
+      }
+      // wait until it's committed
+      Thread.sleep(commitWithin);
+      for (int idx = 0; idx < 100; ++idx) { // Loop for up to 10 seconds waiting for commit to catch up
+        numDocs = queryNumDocs(docsQ);
+        if (numDocs == solrInputDocuments.length) break;
+        Thread.sleep(100);
+      }
+
+      assertEquals("not committed.  Bug or a slow test?",
+          solrInputDocuments.length, numDocs);
+    }
+  }
+
+  void assertUpdateResponse(UpdateResponse rsp) {
+    // use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
+    List errors = (List) rsp.getResponseHeader().get("errors");
+    assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
+  }
+
+  private int queryNumDocs(String q) throws SolrServerException, IOException {
+    return (int) getSolrClient().query(getAlias(), params("q", q, "rows", "0")).getResults().getNumFound();
+  }
+
+  /** Adds the docs to Solr via {@link #getSolrClient()} with the params */
+  @SuppressWarnings("SameParameterValue")
+  protected UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
+    UpdateRequest req = new UpdateRequest();
+    if (params != null) {
+      req.setParams(new ModifiableSolrParams(params));// copy because will be modified
+    }
+    req.add(docs);
+    return req.process(getSolrClient(), collection);
+  }
+
+  public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
+
+    @Override
+    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+      return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
+          (src) -> Integer.valueOf(src.toString()) + 1);
+    }
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index 485064c..c1d25a6 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -17,79 +17,53 @@
 
 package org.apache.solr.update.processor;
 
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
 
-import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.FieldStatsInfo;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.api.collections.RoutedAlias;
 import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.update.UpdateCommand;
-import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.LogLevel;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
 @LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13059")
-public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+public class TimeRoutedAliasUpdateProcessorTest extends RoutedAliasUpdateProcessorTest {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String alias = "myalias";
   private static final String alias2 = "myalias2";
   private static final String timeField = "timestamp_dt";
-  private static final String intField = "integer_i";
 
-  private static CloudSolrClient solrClient;
+  private  CloudSolrClient solrClient;
 
   private int lastDocId = 0;
   private int numDocsDeletedOrFailed = 0;
@@ -109,10 +83,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     shutdownCluster();
   }
 
-  @AfterClass
-  public static void finish() throws Exception {
-    IOUtils.close(solrClient);
-  }
+
 
   @Slow
   @Test
@@ -127,7 +98,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     final String col23rd = alias + "_2017-10-23";
     CollectionAdminRequest.createCollection(col23rd, configName, 2, 2)
         .setMaxShardsPerNode(2)
-        .withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
+        .withProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
         .process(solrClient);
 
     cluster.waitForActiveCollection(col23rd, 2, 4);
@@ -140,7 +111,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
         retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
     assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
 
-    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
         CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
             .setMaxShardsPerNode(2))
         .process(solrClient);
@@ -161,7 +132,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     //   destined for this collection, Solr will see it already exists and add it to the alias.
     final String col24th = alias + "_2017-10-24";
     CollectionAdminRequest.createCollection(col24th, configName,  1, 1) // more shards and replicas now
-        .withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
+        .withProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
         .process(solrClient);
 
     // index 3 documents in a random fashion
@@ -173,7 +144,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertInvariants(col24th, col23rd);
 
     // assert that the IncrementURP has updated all '0' to '1'
-    final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
+    final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + getIntField() + ":1")).getResults();
     assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
 
     //delete a random document id; ensure we don't find it
@@ -211,7 +182,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
         .process(solrClient);
 
     // add more docs, creating one new collection, but trigger ones prior to
-    int numDocsToBeAutoDeleted = queryNumDocs(timeField+":[* TO \"2017-10-26T00:00:00Z\"}");
+    int numDocsToBeAutoDeleted = queryNumDocs(getTimeField() +":[* TO \"2017-10-26T00:00:00Z\"}");
     addDocsAndCommit(true, // send these to alias only
         newDoc(Instant.parse("2017-10-26T07:00:00Z")), // existing
         newDoc(Instant.parse("2017-10-27T08:00:00Z")) // new
@@ -220,56 +191,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertInvariants(alias + "_2017-10-27", alias + "_2017-10-26");
   }
 
-  private void createConfigSet(String configName) throws SolrServerException, IOException {
-    // First create a configSet
-    // Then we create a collection with the name of the eventual config.
-    // We configure it, and ultimately delete the collection, leaving a modified config-set behind.
-    // Later we create the "real" collections referencing this modified config-set.
-    assertEquals(0, new ConfigSetAdminRequest.Create()
-        .setConfigSetName(configName)
-        .setBaseConfigSetName("_default")
-        .process(solrClient).getStatus());
-
-    CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
-
-    // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
-    waitCol(1,configName);
-    // manipulate the config...
-    checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
-        .withMethod(SolrRequest.METHOD.POST)
-        .withPayload("{" +
-            "  'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
-            "  'add-updateprocessor' : {" +
-            "    'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
-            "  }," +
-            // See TrackingUpdateProcessorFactory javadocs for details...
-            "  'add-updateprocessor' : {" +
-            "    'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
-            "  }," +
-            "  'add-updateprocessor' : {" + // for testing
-            "    'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
-            "    'fieldName':'" + intField + "'" +
-            "  }," +
-            "}").build()));
-    // only sometimes test with "tolerant" URP:
-    final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
-    checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
-        .withMethod(SolrRequest.METHOD.POST)
-        .withPayload("{" +
-            "  'set' : {" +
-            "    '_UPDATE' : {'processor':'" + urpNames + "'}" +
-            "  }" +
-            "}").build()));
-
-    CollectionAdminRequest.deleteCollection(configName).process(solrClient);
-    assertTrue(
-        new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()
-            .contains(configName)
-    );
-  }
-
   /**
-   * Test that the Tracking Update Processor Factory routes documents to leader shards and thus
+   * Test that the Update Processor Factory routes documents to leader shards and thus
    * avoids the possibility of introducing an extra hop to find the leader.
    *
    * @throws Exception when it blows up unexpectedly :)
@@ -285,7 +208,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     // 4 of which are leaders, and 8 of which should fail this test.
     final int numShards = 1 + random().nextInt(4);
     final int numReplicas = 1 + random().nextInt(3);
-    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
         CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
             .setMaxShardsPerNode(numReplicas))
         .process(solrClient);
@@ -295,9 +218,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertUpdateResponse(solrClient.commit(alias));
 
     // wait for all the collections to exist...
-    waitColAndAlias("2017-10-23", numShards, alias);
-    waitColAndAlias("2017-10-24", numShards, alias);
-    waitColAndAlias("2017-10-25", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-10-23", numShards);
+    waitColAndAlias(alias, "_", "2017-10-24", numShards);
+    waitColAndAlias(alias, "_", "2017-10-25", numShards);
 
     // at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
     // 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
@@ -321,23 +244,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
       updateCommands = TrackingUpdateProcessorFactory.stopRecording(trackGroupName);
     }
 
-    try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
-      ClusterStateProvider clusterStateProvider = cloudSolrClient.getClusterStateProvider();
-      clusterStateProvider.connect();
-      Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
-      assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
-
-      assertEquals(3, updateCommands.size());
-      for (UpdateCommand updateCommand : updateCommands) {
-        String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
-        assertTrue("Update was not routed to a leader (" + node + " not in list of leaders" + leaders, leaders.contains(node));
-      }
-    }
-  }
-
-  /** @see TrackingUpdateProcessorFactory */
-  private String getTrackUpdatesGroupName() {
-    return getSaferTestName();
+    assertRouting(numShards, updateCommands);
   }
 
   @Test
@@ -348,13 +255,13 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
 
     final int numShards = 1 ;
     final int numReplicas = 1 ;
-    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
         CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
             .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
         .process(solrClient);
 
     // needed to verify that preemptive creation in one alias doesn't inhibit preemptive creation in another
-    CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+    CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
         CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
             .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
         .process(solrClient);
@@ -416,8 +323,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertTrue(threadFinished[1]);
 
     // if one of these times out then the test has failed due to interference between aliases
-    waitColAndAlias("2017-10-26", numShards, alias);
-    waitColAndAlias("2017-10-26", numShards, alias2);
+    waitColAndAlias(alias, "_", "2017-10-26", numShards);
+    waitColAndAlias(alias2, "_", "2017-10-26", numShards);
 
     // after this we can ignore alias2
     checkPreemptiveCase1(alias);
@@ -432,12 +339,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     //
     // Start and stop some cores that have TRA's... 2x2 used to ensure every jetty gets at least one
 
-    CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", timeField,
+    CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", getTimeField(),
         CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
             .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
         .process(solrClient);
 
-    waitColAndAlias("2017-10-23",2, "foo");
+    waitColAndAlias("foo", "_", "2017-10-23",2);
     waitCoreCount("foo_2017-10-23", 1); // prove this works, for confidence in deletion checking below.
     assertUpdateResponse(solrClient.add("foo",
         sdoc("id","1","timestamp_dt", "2017-10-23T00:00:00Z") // no extra collections should be created
@@ -464,7 +371,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
         sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-27 now
         params));
     assertUpdateResponse(solrClient.commit(alias));
-    waitColAndAlias("2017-10-27", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-10-27", numShards);
 
     cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
     assertEquals(5,cols.size()); // only one created in async case
@@ -478,8 +385,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
         sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-28 now
         params));
     assertUpdateResponse(solrClient.commit(alias));
-    waitColAndAlias("2017-10-27", numShards, alias);
-    waitColAndAlias("2017-10-28", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-10-27", numShards);
+    waitColAndAlias(alias, "_", "2017-10-28", numShards);
 
     cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
     assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
@@ -511,7 +418,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
         sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation
         params));
     assertUpdateResponse(solrClient.commit(alias));
-    waitColAndAlias("2017-10-29", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-10-29", numShards);
 
     cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
     assertEquals(7,cols.size());
@@ -533,8 +440,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
         sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky?
         params));
     assertUpdateResponse(solrClient.commit(alias));
-    waitColAndAlias("2017-10-30", numShards, alias);
-    waitColAndAlias("2017-10-31", numShards, alias); // spooky! async case arising in middle of sync creation!!
+    waitColAndAlias(alias, "_", "2017-10-30", numShards);
+    waitColAndAlias(alias, "_", "2017-10-31", numShards); // spooky! async case arising in middle of sync creation!!
 
     cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
     assertEquals(9,cols.size());
@@ -556,17 +463,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertUpdateResponse(add(alias, Collections.singletonList(
         sdoc("id", "14", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-01
         params));
-    waitColAndAlias("2017-11-01", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-11-01", numShards);
 
     assertUpdateResponse(add(alias, Collections.singletonList(
         sdoc("id", "15", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-02
         params));
-    waitColAndAlias("2017-11-02", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-11-02", numShards);
 
     assertUpdateResponse(add(alias, Collections.singletonList(
         sdoc("id", "16", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-03
         params));
-    waitColAndAlias("2017-11-03", numShards, alias);
+    waitColAndAlias(alias, "_", "2017-11-03", numShards);
 
     assertUpdateResponse(add(alias, Collections.singletonList(
         sdoc("id", "17", "timestamp_dt", "2017-10-31T23:01:00Z")), // should NOT cause preemptive creation 11-04
@@ -578,7 +485,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertUpdateResponse(add(alias, Collections.singletonList(
         sdoc("id", "18", "timestamp_dt", "2017-11-01T23:01:00Z")), // should cause preemptive creation 11-04
         params));
-    waitColAndAlias("2017-11-04",numShards, alias);
+    waitColAndAlias(alias, "_", "2017-11-04",numShards);
 
   }
 
@@ -636,7 +543,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
 
     // Here we quickly add another doc in a separate request, before the collection creation has completed.
     // This has the potential to incorrectly cause preemptive collection creation to run twice and create a
-    // second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
+    // second collection. RoutedAliasUpdateProcessor is meant to guard against this race condition.
     assertUpdateResponse(add(alias, Collections.singletonList(
         sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
         params));
@@ -656,6 +563,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertNumDocs("2017-10-26", 0, alias);
   }
 
+  @SuppressWarnings("SameParameterValue")
   private void addOneDocSynchCreation(int numShards, String alias) throws SolrServerException, IOException, InterruptedException {
     // cause some collections to be created
     assertUpdateResponse(solrClient.add(alias,
@@ -664,9 +572,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertUpdateResponse(solrClient.commit(alias));
 
     // wait for all the collections to exist...
-    waitColAndAlias("2017-10-23", numShards, alias); // This one should have already existed from the alias creation
-    waitColAndAlias("2017-10-24", numShards, alias); // Create 1
-    waitColAndAlias("2017-10-25", numShards, alias); // Create 2nd synchronously (ensure this is not broken)
+    waitColAndAlias(alias, "_", "2017-10-23", numShards); // This one should have already existed from the alias creation
+    waitColAndAlias(alias, "_", "2017-10-24", numShards); // Create 1
+    waitColAndAlias(alias, "_", "2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken)
 
     // normal update, nothing special, no collection creation required.
     List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
@@ -684,64 +592,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     assertEquals(expected, resp.getResults().getNumFound());
   }
 
-  private Set<String> getLeaderCoreNames(ClusterState clusterState) {
-    Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
-    List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
-    for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
-      List<CoreDescriptor> coreDescriptors = jettySolrRunner.getCoreContainer().getCoreDescriptors();
-      for (CoreDescriptor core : coreDescriptors) {
-        String nodeName = jettySolrRunner.getNodeName();
-        String collectionName = core.getCollectionName();
-        DocCollection collectionOrNull = clusterState.getCollectionOrNull(collectionName);
-        List<Replica> leaderReplicas = collectionOrNull.getLeaderReplicas(nodeName);
-        if (leaderReplicas != null) {
-          for (Replica leaderReplica : leaderReplicas) {
-            leaders.add(leaderReplica.getCoreName());
-          }
-        }
-      }
-    }
-    return leaders;
-  }
-
-  private void waitColAndAlias(final String datePart, int slices, String alias) throws InterruptedException {
-    // collection to exist
-    String collection = alias + "_" + datePart;
-    waitCol(slices, collection);
-    // and alias to be aware of collection
-    long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
-    while (!haveCollection(alias, collection)) {
-      if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
-        fail("took over 10 seconds after collection creation to update aliases");
-      } else {
-        Thread.sleep(500);
-      }
-    }
-  }
-
-  private boolean haveCollection(String alias, String collection) {
-    // separated into separate lines to make it easier to track down an NPE that occurred once
-    // 3000 runs if it shows up again...
-    CloudSolrClient solrClient = cluster.getSolrClient();
-    ZkStateReader zkStateReader = solrClient.getZkStateReader();
-    Aliases aliases = zkStateReader.getAliases();
-    Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
-    List<String> strings = collectionAliasListMap.get(alias);
-    return strings.contains(collection);
-  }
-
-  private void waitCol(int slices, String collection) {
-    waitForState("waiting for collections to be created", collection,
-        (liveNodes, collectionState) -> {
-          if (collectionState == null) {
-            // per predicate javadoc, this is what we get if the collection doesn't exist at all.
-            return false;
-          }
-          Collection<Slice> activeSlices = collectionState.getActiveSlices();
-          int size = activeSlices.size();
-          return size == slices;
-        });
-  }
 
   private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
     try {
@@ -755,81 +605,15 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     numDocsDeletedOrFailed++;
   }
 
-  private void checkNoError(NamedList<Object> response) { //TODO rename
-    Object errors = response.get("errorMessages");
-    assertNull("" + errors, errors);
-  }
-
-  /** Adds these documents and commits, returning when they are committed.
-   * We randomly go about this in different ways. */
-  private void addDocsAndCommit(boolean aliasOnly, SolrInputDocument... solrInputDocuments) throws Exception {
-    // we assume all docs will be added (none too old/new to cause exception)
-    Collections.shuffle(Arrays.asList(solrInputDocuments), random());
-
-    // this is a list of the collections & the alias name.  Use to pick randomly where to send.
-    //   (it doesn't matter where we send docs since the alias is honored at the URP level)
-    List<String> collections = new ArrayList<>();
-    collections.add(alias);
-    if (!aliasOnly) {
-      collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
-    }
-
-    int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
-
-    if (random().nextBoolean()) {
-      // Send in separate threads. Choose random collection & solrClient
-      try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
-        ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
-            new DefaultSolrThreadFactory(getSaferTestName()));
-        List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
-        for (SolrInputDocument solrInputDocument : solrInputDocuments) {
-          String col = collections.get(random().nextInt(collections.size()));
-          futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
-        }
-        for (Future<UpdateResponse> future : futures) {
-          assertUpdateResponse(future.get());
-        }
-        // at this point there shouldn't be any tasks running
-        assertEquals(0, exec.shutdownNow().size());
-      }
-    } else {
-      // send in a batch.
-      String col = collections.get(random().nextInt(collections.size()));
-      try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
-        assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
-      }
-    }
-    String col = collections.get(random().nextInt(collections.size()));
-    if (commitWithin == -1) {
-      solrClient.commit(col);
-    } else {
-      // check that it all got committed eventually
-      String docsQ =
-          "{!terms f=id}"
-          + Arrays.stream(solrInputDocuments).map(d -> d.getFieldValue("id").toString())
-              .collect(Collectors.joining(","));
-      int numDocs = queryNumDocs(docsQ);
-      if (numDocs == solrInputDocuments.length) {
-        System.err.println("Docs committed sooner than expected.  Bug or slow test env?");
-        return;
-      }
-      // wait until it's committed
-      Thread.sleep(commitWithin);
-      for (int idx = 0; idx < 100; ++idx) { // Loop for up to 10 seconds waiting for commit to catch up
-        numDocs = queryNumDocs(docsQ);
-        if (numDocs == solrInputDocuments.length) break;
-        Thread.sleep(100);
-      }
 
-      assertEquals("not committed.  Bug or a slow test?",
-          solrInputDocuments.length, numDocs);
-    }
+  @Override
+  public String getAlias() {
+    return alias;
   }
 
-  private void assertUpdateResponse(UpdateResponse rsp) {
-    // use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
-    List errors = (List) rsp.getResponseHeader().get("errors");
-    assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
+  @Override
+  public CloudSolrClient getSolrClient() {
+    return solrClient;
   }
 
   private int queryNumDocs(String q) throws SolrServerException, IOException {
@@ -854,11 +638,11 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
           "q", "*:*",
           "rows", "0",
           "stats", "true",
-          "stats.field", timeField));
+          "stats.field", getTimeField()));
       long numFound = colStatsResp.getResults().getNumFound();
       if (numFound > 0) {
         totalNumFound += numFound;
-        final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
+        final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(getTimeField());
         assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
         if (colEndInstant != null) {
           assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
@@ -873,19 +657,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
 
   private SolrInputDocument newDoc(Instant timestamp) {
     return sdoc("id", Integer.toString(++lastDocId),
-        timeField, timestamp.toString(),
-        intField, "0"); // always 0
+        getTimeField(), timestamp.toString(),
+        getIntField(), "0"); // always 0
   }
 
-  /** Adds the docs to Solr via {@link #solrClient} with the params */
-  @SuppressWarnings("SameParameterValue")
-  private static UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
-    UpdateRequest req = new UpdateRequest();
-    if (params != null) {
-      req.setParams(new ModifiableSolrParams(params));// copy because will be modified
-    }
-    req.add(docs);
-    return req.process(solrClient, collection);
+  private String getTimeField() {
+    return timeField;
   }
 
   @Test
@@ -900,13 +677,4 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
       TimeRoutedAlias.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
   }
 
-  public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
-
-    @Override
-    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
-      return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
-          (src) -> Integer.valueOf(src.toString()) + 1);
-    }
-  }
-
 }
diff --git a/solr/solr-ref-guide/src/aliases.adoc b/solr/solr-ref-guide/src/aliases.adoc
new file mode 100644
index 0000000..52ce75d
--- /dev/null
+++ b/solr/solr-ref-guide/src/aliases.adoc
@@ -0,0 +1,267 @@
+= Aliases
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+== Standard Aliases
+
+Since version 6, SolrCloud has had the ability to query one or more collections via an alternative name. These
+alternative names for collections are known as aliases, and are useful when you want to:
+
+1. Atomically switch to using a newly (re)indexed collection with zero down time (by re-defining the alias)
+1. Insulate the client programming versus changes in collection names
+1. Issue a single query against several collections with identical schemas
+
+It's also possible to send update commands to aliases, but this is rarely useful if the
+  alias refers to more than one collection (as in case 3 above).
+Since there is no logic by which to distribute documents among the collections, all updates will simply be
+  directed to the first collection in the list.
+
+Standard aliases are created and updated using the <<collections-api.adoc#createalias,CREATEALIAS>> command.
+The current list of collections that are members of an alias can be verified via the
+  <<collections-api.adoc#clusterstatus,CLUSTERSTATUS>> command.
+The full definition of all aliases including metadata about that alias (in the case of routed aliases, see below)
+  can be verified via the <<collections-api.adoc#listaliases,LISTALIASES>> command.
+Alternatively this information is available by checking `/aliases.json` in zookeeper via a zookeeper
+  client or in the <<cloud-screens.adoc#tree-view,tree page>> of the cloud menu in the admin UI.
+Aliases may be deleted via the <<collections-api.adoc#deletealias,DELETEALIAS>> command.
+The underlying collections are *unaffected* by this command.
+
+TIP: Any alias (standard or routed) that references multiple collections may complicate relevancy.
+By default, SolrCloud scores documents on a per shard basis.
+With multiple collections in an alias this is always a problem, so if you have a use case for which BM25 or
+  TF/IDF relevancy is important you will want to turn on one of the
+  <<distributed-requests.adoc#distributedidf,ExactStatsCache>> implementations.
+However, for analytical use cases where results are sorted on numeric, date or alphanumeric field values rather
+  than relevancy calculations this is not a problem.
+
+== Routed Aliases
+
+To address the update limitations associated with standard aliases and provide additional useful features, the concept of
+  RoutedAliases has been developed.
+There are presently two types of Routed Alias time routed and category routed. These are described in detail below,
+  but share some common behavior.
+
+When processing an update for a routed alias, Solr initializes its
+  <<update-request-processors.adoc#update-request-processors,UpdateRequestProcessor>> chain as usual, but
+  when `DistributedUpdateProcessor` (DUP) initializes, it detects that the update targets a routed alias and injects
+  `RoutedAliasUpdateProcessor` (RAUP) in front of itself.
+RAUP, in coordination with the Overseer, is the main part of a routed alias, and must immediately precede DUP. It is not
+  possible to configure custom chains with other types of UpdateRequestProcessors between RAUP and DUP.
+
+Ideally, as a user of a routed alias, you needn't concern yourself with the particulars of the collection naming pattern
+  since both queries and updates may be done via the alias.
+When adding data, you should usually direct documents to the alias (e.g., reference the alias name instead of any collection).
+The Solr server and CloudSolrClient will direct an update request to the first collection that an alias points to.
+Once the server receives the data it will perform the necessary routing.
+
+WARNING: It is possible to update the collections
+  directly, but there is no safeguard against putting data in the incorrect collection if the alias is circumvented
+  in this manner.
+
+CAUTION: It's probably a bad idea to use "data driven" mode with routed aliases, as duplicate schema mutations might happen
+concurrently leading to errors.
+
+
+== Time Routed Aliases
+
+Starting in Solr 7.4, Time Routed Aliases (TRAs) are a SolrCloud feature that manages an alias and a time sequential
+ series of collections.
+
+It automatically creates new collections and (optionally) deletes old ones as it routes documents to the correct
+  collection based on its timestamp.
+This approach allows for indefinite indexing of data without degradation of performance otherwise experienced due to the
+  continuous growth of a single index.
+
+If you need to store a lot of timestamped data in Solr, such as logs or IoT sensor data, then this feature probably
+  makes more sense than creating one sharded hash-routed collection.
+
+=== How It Works
+
+First you create a time routed aliases using the <<collections-api.adoc#createalias,CREATEALIAS>> command with some
+  router settings.
+Most of the settings are editable at a later time using the <<collections-api.adoc#aliasprop,ALIASPROP>> command.
+
+The first collection will be created automatically, along with an alias pointing to it.
+Each underlying Solr "core" in a collection that is a member of a TRA has a special core property referencing the alias.
+The name of each collection is comprised of the TRA name and the start timestamp (UTC), with trailing zeros and symbols
+  truncated.
+
+The collections list for a TRA is always reverse sorted, and thus the connection path of the request will route to the
+  lead collection. Using CloudSolrClient is preferable as it can reduce the number of underlying physical HTTP requests by one.
+If you know that a particular set of documents to be delivered is going to a particular older collection then you could
+  direct it there from the client side as an optimization but it's not necessary. CloudSolrClient does not (yet) do this.
+
+
+TRUP first reads TRA configuration from the alias properties when it is initialized.  As it sees each document, it checks for
+  changes to TRA properties, updates its cached configuration if needed and then determines which collection the
+  document belongs to:
+
+* If TRUP needs to send it to a time segment represented by a collection other than the one that
+  the client chose to communicate with, then it will do so using mechanisms shared with DUP.
+  Once the document is forwarded to the correct collection (i.e., the correct TRA time segment), it skips directly to
+  DUP on the target collection and continues normally, potentially being routed again to the correct shard & replica
+  within the target collection.
+
+* If it belongs in the current collection (which is usually the case if processing events as they occur), the document
+  passes through to DUP. DUP does it's normal collection-level processing that may involve routing the document
+  to another shard & replica.
+
+* If the time stamp on the document is more recent than the most recent TRA segment, then a new collection needs to be
+  added at the front of the TRA.
+  TRUP will create this collection, add it to the alias and then forward the document to the collection it just created.
+  This can happen recursively if more than one collection needs to be created.
++
+Each time a new collection is added, the oldest collections in the TRA are examined for possible deletion, if that has
+    been configured.
+All this happens synchronously, potentially adding seconds to the update request and indexing latency.
+If `router.preemptiveCreateMath` is configured and if the document arrives within this window then it will occur
+asynchronously.
+
+Any other type of update like a commit or delete is routed by TRUP to all collections.
+Generally speaking, this is not a performance concern. When Solr receives a delete or commit wherein nothing is deleted
+or nothing needs to be committed, then it's pretty cheap.
+
+
+=== Limitations & Assumptions
+
+* Only *time* routed aliases are supported.  If you instead have some other sequential number, you could fake it
+  as a time (e.g., convert to a timestamp assuming some epoch and increment).
++
+The smallest possible interval is one second.
+No other routing scheme is supported, although this feature was developed with considerations that it could be
+  extended/improved to other schemes.
+
+* The underlying collections form a contiguous sequence without gaps.  This will not be suitable when there are
+  large gaps in the underlying data, as Solr will insist that there be a collection for each increment.  This
+  is due in part on Solr calculating the end time of each interval collection based on the timestamp of
+  the next collection, since it is otherwise not stored in any way.
+
+* Avoid sending updates to the oldest collection if you have also configured that old collections should be
+  automatically deleted.  It could lead to exceptions bubbling back to the indexing client.
+
+== Category Routed Aliases
+
+Starting in Solr 8.1, Category Routed Aliases (CRAs) are a feature to manage aliases and a set of dependent collections
+based on the value of a single field.
+
+CRAs automatically create new collections but because the partitioning is on categorical information rather than continuous
+numerically based values there's no logic for automatic deletion. This approach allows for simplified indexing of data
+that must be segregated into collections for cluster management or security reasons.
+
+=== How It Works
+
+First you create a time routed aliases using the <<collections-api.adoc#createalias,CREATEALIAS>> command with some
+  router settings.
+ Most of the settings are editable at a later time using the <<collections-api.adoc#aliasprop,ALIASPROP>> command.
+
+The alias will be created with a special place-holder collection which will always be named
+ `myAlias__CRA__NEW_CATEGORY_ROUTED_ALIAS_WAITING_FOR_DATA__TEMP`. The first document indexed into the CRA
+ will create a second collection named `myAlias__CRA__foo` (for a routed field value of `foo`). The second document
+ indexed will cause the temporary place holder collection to be deleted. Thereafter collections will be created whenever
+ a new value for the field is encountered.
+
+CAUTION: To guard against runaway collection creation options for limiting the total number of categories, and for
+rejecting values that don't match a regular expression are provided (see <<collections-api.adoc#createalias,CREATEALIAS>> for
+details). Note that by providing very large or very permissive values for these options you are accepting the risk that
+garbled data could potentially create thousands of collections and bring your cluster to a grinding halt.
+
+Please note that the values (and thus the collection names) are case sensitive. As elsewhere in Solr manipulation and
+cleaning of the data is expected to be done by external processes before data is sent to Solr with one exception.
+Throughout Solr there are limitations on the allowable characters in collection names. Any characters other than ASCII
+alphanumeric characters (`A-Za-z0-9`), hyphen (`-`) or underscore (`_`) are replaced with an underscore when calculating
+the collection name for a category. For a CRA named `myAlias` the following table shows how collection names would be
+calculated:
+
+|===
+|Value |CRA Collection Name
+
+|foo
+|+myAlias__CRA__foo+
+
+|Foo
+|+myAlias__CRA__Foo+
+
+|foo bar
+|+myAlias__CRA__foo_bar+
+
+|+FOÓB&R+
+|+myAlias__CRA__FO_B_R+
+
+|+中文的东西+
+|+myAlias__CRA_______+
+
+|+foo__CRA__bar+
+|*Causes 400 Bad Request*
+
+|+<null>+
+|*Causes 400 Bad Request*
+
+|===
+
+Since collection creation can take upwards of 1-3 seconds, systems inserting data in a CRA should be
+ constructed to handle such pauses whenever a new collection is created.
+Unlike time routed aliases, there is no way to predict the next value so such pauses are unavoidable.
+
+There is no automated means of removing a category. If a category needs to be removed from a CRA
+the following procedure is recommended:
+
+1. Ensure that no documents with the value corresponding to the category to be removed will be sent
+   either by stopping indexing or by fixing the incoming data stream
+1. Modify the alias definition in zookeeper, removing the collection corresponding to the category.
+1. Delete the collection corresponding to the category. Note that if the collection is not removed
+   from the alias first, this step will fail.
+
+=== Limitations & Assumptions
+
+* CRAs are presently unsuitable for non-english data values due to the limits on collection names.
+  This can be worked around by duplicating the route value to a *_url safe_* base 64 encoded field
+  and routing on that value instead.
+
+* The check for the __CRA__ infix is independent of the regular expression validation and occurs after
+  the name of the collection to be created has been calculated. It may not be avoided and is necessary
+  to support future features.
+
+== Improvement Possibilities
+
+Routed aliases are a relatively new feature of SolrCloud that can be expected to be improved.
+Some _potential_ areas for improvement that _are not implemented yet_ are:
+
+* *TRAs*: Searches with time filters should only go to applicable collections.
+
+* *TRAs*: Ways to automatically optimize (or reduce the resources of) older collections that aren't expected to receive more
+  updates, and might have less search demand.
+
+* *CRAs*: Intrinsic support for non-english text via base64 encoding
+
+* *CRAs*: Supply an initial list of values for cases where these are known before hand to reduce pauses during indexing
+
+* CloudSolrClient could route documents to the correct collection based on the route value instead always picking the
+  latest/first.
+
+* Presently only updates are routed and queries are distributed to all collections in the alias, but future
+  features might enable routing of the query to the single appropriate collection based on a special parameter or perhaps
+  a filter on the routed field.
+
+* Collections might be constrained by their size instead of or in addition to time or category value.
+  This might be implemented as another type of routed alias, or possibly as an option on the existing routed aliases
+
+* Compatibility with CDCR.
+
+* Option for deletion of aliases that also deletes the underlying collections in one step. Routed Aliases may quickly
+  create more collections than expected during initial testing. Removing them after such events is overly tedious.
+
+As always, patches and pull requests are welcome!
\ No newline at end of file
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index a66ddfa..c5f68cc 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -562,7 +562,7 @@ Aliases come in 2 flavors: standard and routed.
 *Standard aliases* are simple:  CREATEALIAS registers the alias name with the names of one or more collections provided
   by the command.
 If an existing alias exists, it is replaced/updated.
-A standard alias can serve to have the appearance of renaming a collection, and can be used to atomically swap
+A standard alias can serve as a means to rename a collection, and can be used to atomically swap
 which backing/underlying collection is "live" for various purposes.
 When Solr searches an alias pointing to multiple collections, Solr will search all shards of all the collections as an
   aggregated whole.
@@ -571,23 +571,18 @@ While it is possible to send updates to an alias spanning multiple collections,
 
 `/admin/collections?action=CREATEALIAS&name=_name_&collections=_collectionlist_`
 
-*Routed aliases* are aliases with additional capabilities to act as a kind of super-collection -- routing
-  updates to the correct collection.
-Since the only routing strategy at present is time oriented, these are also called *Time Routed Aliases* (TRAs).
-A TRA manages an alias and a time sequential series of collections that it will both create and optionally delete on-demand.
-See <<time-routed-aliases.adoc#time-routed-aliases,Time Routed Aliases>> for some important high-level information
+*Routed aliases* are aliases with additional capabilities to act as a kind of super-collection that route
+  updates to the correct collection. Routing is data driven and may be based on a temporal field or on categories
+  specified in a field (normally string based).
+See <<aliases.adoc#routed-aliases,Routed Aliases>> for some important high-level information
   before getting started.
 
-NOTE: Presently this is only supported for temporal fields stored as a
-<<field-types-included-with-solr.adoc#field-types-included-with-solr,DatePointField or TrieDateField>> type. Other
-well ordered field types may be added in future versions.
-
 [source,text]
 ----
 localhost:8983/solr/admin/collections?action=CREATEALIAS&name=timedata&router.start=NOW/DAY&router.field=evt_dt&router.name=time&router.interval=%2B1DAY&router.maxFutureMs=3600000&create-collection.collection.configName=myConfig&create-collection.numShards=2
 ----
 
-If run on Jan 15, 2018, the above will create an alias named timedata, that contains collections with names prefixed
+If run on Jan 15, 2018, the above will create an time routed alias named timedata, that contains collections with names prefixed
 with `timedata` and an initial collection named `timedata_2018_01_15` will be created immediately. Updates sent to this
 alias with a (required) value in `evt_dt` that is before or after 2018-01-15 will be rejected, until the last 60
 minutes of 2018-01-15. After 2018-01-15T23:00:00 documents for either 2018-01-15 or 2018-01-16 will be accepted.
@@ -629,6 +624,21 @@ prohibited. If routing parameters are present this parameter is prohibited.
 
 Most routed alias parameters become _alias properties_ that can subsequently be inspected and <<aliasprop,modified>>.
 
+`router.name`::
+The type of routing to use. Presently only `time` and `category` are valid.  This parameter is required.
+
+`router.field`::
+The field to inspect to determine which underlying collection an incoming document should be routed to.
+This field is required on all incoming documents.
+
+`create-collection.*`::
+The * wildcard can be replaced with any parameter from the <<create,CREATE>> command except `name`. All other fields
+are identical in requirements and naming except that we insist that the configset be explicitly specified.
+The configset must be created beforehand, either uploaded or copied and modified.
+It's probably a bad idea to use "data driven" mode as schema mutations might happen concurrently leading to errors.
+
+==== Time Routed Alias Parameters
+
 `router.start`::
 The start date/time of data for this time routed alias in Solr's standard date/time format (i.e., ISO-8601 or "NOW"
 optionally with <<working-with-dates.adoc#date-math,date math>>).
@@ -649,13 +659,6 @@ myAlias_2018-01-15_01 collection (assuming an interval of +1HOUR).
 +
 The default timezone is UTC.
 
-`router.field`::
-The date field to inspect to determine which underlying collection an incoming document should be routed to.
-This field is required on all incoming documents.
-
-`router.name`::
-The type of routing to use. Presently only `time` is valid.  This parameter is required.
-
 `router.interval`::
 A date math expression that will be appended to a timestamp to determine the next collection in the series.
 Any date math expression that can be evaluated if appended to a timestamp of the form 2018-01-15T16:17:18 will
@@ -701,11 +704,18 @@ Example: `/DAY-90DAYS`.
 +
 The default is not to delete.
 
-`create-collection.*`::
-The * wildcard can be replaced with any parameter from the <<create,CREATE>> command except `name`. All other fields
-are identical in requirements and naming except that we insist that the configset be explicitly specified.
-The configset must be created beforehand, either uploaded or copied and modified.
-It's probably a bad idea to use "data driven" mode as schema mutations might happen concurrently leading to errors.
+==== Category Routed Alias Parameters
+
+`router.maxCardinality`::
+The maximum number of categories allowed for this alias.
+This setting safeguards against the inadvertent creation of an infinite number of collections in the event of bad data.
+
+`router.mustMatch`::
+A regular expression that the value of the field specified by `router.field` must match before a corresponding
+collection will be created. Note that changing this setting after data has been added will not alter the data already
+indexed. Any valid Java regular expression pattern may be specified. This expression is pre-compiled at the start of
+each request so batching of updates is strongly recommended. Overly complex patterns will produce cpu
+or garbage collecting overhead during indexing as determined by the JVM's implementation of regular expressions.
 
 === CREATEALIAS Response
 
@@ -864,6 +874,10 @@ The `ALIASPROP` action modifies the properties (metadata) on an alias. If a key
 
 `/admin/collections?action=ALIASPROP&name=_name_&property.someKey=somevalue`
 
+WARNING: This command allows you to revise any property. No alias specific validation is performed.
+         Routed aliases may cease to function, function incorrectly or cause errors if property values
+         are set carelessly.
+
 === ALIASPROP Parameters
 
 `name`::
diff --git a/solr/solr-ref-guide/src/distributed-requests.adoc b/solr/solr-ref-guide/src/distributed-requests.adoc
index be129ed..3422540 100644
--- a/solr/solr-ref-guide/src/distributed-requests.adoc
+++ b/solr/solr-ref-guide/src/distributed-requests.adoc
@@ -127,6 +127,7 @@ If you need to disable this feature for backwards compatibility, you can set the
 +
 NOTE: In SolrCloud mode, if at least one node is included in the whitelist, then the `live_nodes` will no longer be used as source for the list. This means that if you need to do a cross-cluster request using the `shards` parameter in SolrCloud mode (in addition to regular within-cluster requests), you'll need to add all nodes (local cluster + remote nodes) to the whitelist.
 
+[[distributedidf]]
 == Configuring statsCache (Distributed IDF)
 
 Document and term statistics are needed in order to calculate relevancy. Solr provides four implementations out of the box when it comes to document stats calculation:
diff --git a/solr/solr-ref-guide/src/how-solrcloud-works.adoc b/solr/solr-ref-guide/src/how-solrcloud-works.adoc
index b0e42e4..7f4db3b 100644
--- a/solr/solr-ref-guide/src/how-solrcloud-works.adoc
+++ b/solr/solr-ref-guide/src/how-solrcloud-works.adoc
@@ -1,5 +1,5 @@
 = How SolrCloud Works
-:page-children: shards-and-indexing-data-in-solrcloud, distributed-requests, time-routed-aliases
+:page-children: shards-and-indexing-data-in-solrcloud, distributed-requests, aliases
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -21,7 +21,7 @@ The following sections cover provide general information about how various SolrC
 
 * <<shards-and-indexing-data-in-solrcloud.adoc#shards-and-indexing-data-in-solrcloud,Shards and Indexing Data in SolrCloud>>
 * <<distributed-requests.adoc#distributed-requests,Distributed Requests>>
-* <<time-routed-aliases.adoc#time-routed-aliases,Time Routed Aliases>>
+* <<aliases.adoc#aliases,Standard and Routed Aliases>>
 
 If you are already familiar with SolrCloud concepts and basic functionality, you can skip to the section covering <<solrcloud-configuration-and-parameters.adoc#solrcloud-configuration-and-parameters,SolrCloud Configuration and Parameters>>.
 
diff --git a/solr/solr-ref-guide/src/time-routed-aliases.adoc b/solr/solr-ref-guide/src/time-routed-aliases.adoc
deleted file mode 100644
index 3ef0e19..0000000
--- a/solr/solr-ref-guide/src/time-routed-aliases.adoc
+++ /dev/null
@@ -1,119 +0,0 @@
-= Time Routed Aliases
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-Time Routed Aliases (TRAs) is a SolrCloud feature that manages an alias and a time sequential series of collections.
-
-It automatically creates new collections and (optionally) deletes old ones as it routes documents to the correct
-  collection based on its timestamp.
-This approach allows for indefinite indexing of data without degradation of performance otherwise experienced due to the
-  continuous growth of a single index.
-
-If you need to store a lot of timestamped data in Solr, such as logs or IoT sensor data, then this feature probably
-  makes more sense than creating one sharded hash-routed collection.
-
-== How It Works
-
-First you create a time routed aliases using the <<collections-api.adoc#createalias,CREATEALIAS>> command with some
-  router settings.
-Most of the settings are editable at a later time using the <<collections-api.adoc#aliasprop,ALIASPROP>> command.
-
-The first collection will be created automatically, along with an alias pointing to it.
-Each underlying Solr "core" in a collection that is a member of a TRA has a special core property referencing the alias.
-The name of each collection is comprised of the TRA name and the start timestamp (UTC), with trailing zeros and symbols
-  truncated.
-Ideally, as a user of this feature, you needn't concern yourself with the particulars of the collection naming pattern
-  since both queries and updates may be done via the alias.
-
-When adding data, you should usually direct documents to the alias (e.g., reference the alias name instead of any collection).
-The Solr server and CloudSolrClient will direct an update request to the first collection that an alias points to.
-
-The collections list for a TRA is always reverse sorted, and thus the connection path of the request will route to the
-  lead collection. Using CloudSolrClient is preferable as it can reduce the number of underlying physical HTTP requests by one.
-If you know that a particular set of documents to be delivered is going to a particular older collection then you could
-  direct it there from the client side as an optimization but it's not necessary. CloudSolrClient does not (yet) do this.
-
-When processing an update for a TRA, Solr initializes its
-  <<update-request-processors.adoc#update-request-processors,UpdateRequestProcessor>> chain as usual, but
-  when `DistributedUpdateProcessor` (DUP) initializes, it detects that the update targets a TRA and injects
-  `TimeRoutedUpdateProcessor` (TRUP) in front of itself.
-TRUP, in coordination with the Overseer, is the main part of a TRA, and must immediately precede DUP. It is not
-  possible to configure custom chains with other types of UpdateRequestProcessors between TRUP and DUP.
-
-TRUP first reads TRA configuration from the alias properties when it is initialized.  As it sees each document, it checks for
-  changes to TRA properties, updates its cached configuration if needed and then determines which collection the
-  document belongs to:
-
-* If TRUP needs to send it to a time segment represented by a collection other than the one that
-  the client chose to communicate with, then it will do so using mechanisms shared with DUP.
-  Once the document is forwarded to the correct collection (i.e., the correct TRA time segment), it skips directly to
-  DUP on the target collection and continues normally, potentially being routed again to the correct shard & replica
-  within the target collection.
-
-* If it belongs in the current collection (which is usually the case if processing events as they occur), the document
-  passes through to DUP. DUP does it's normal collection-level processing that may involve routing the document
-  to another shard & replica.
-
-* If the time stamp on the document is more recent than the most recent TRA segment, then a new collection needs to be
-  added at the front of the TRA.
-  TRUP will create this collection, add it to the alias and then forward the document to the collection it just created.
-  This can happen recursively if more than one collection needs to be created.
-+
-Each time a new collection is added, the oldest collections in the TRA are examined for possible deletion, if that has
-    been configured.
-All this happens synchronously, potentially adding seconds to the update request and indexing latency.
-If `router.preemptiveCreateMath` is configured and if the document arrives within this window then it will occur
-asynchronously.
-
-Any other type of update like a commit or delete is routed by TRUP to all collections.
-Generally speaking, this is not a performance concern. When Solr receives a delete or commit wherein nothing is deleted
-or nothing needs to be committed, then it's pretty cheap.
-
-== Improvement Possibilities
-
-This is a new feature of SolrCloud that can be expected to be improved.
-Some _potential_ areas for improvement that _are not implemented yet_ are:
-
-* Searches with time filters should only go to applicable collections.
-
-* Collections ought to be constrained by their size instead of or in addition to time.
-  Based on the underlying design, this would only apply to the lead collection.
-
-* Ways to automatically optimize (or reduce the resources of) older collections that aren't expected to receive more
-  updates, and might have less search demand.
-
-* CloudSolrClient could route documents to the correct collection based on a timestamp instead always picking the
-  latest.
-
-* Compatibility with CDCR.
-
-== Limitations & Assumptions
-
-* Only *time* routed aliases are supported.  If you instead have some other sequential number, you could fake it
-  as a time (e.g., convert to a timestamp assuming some epoch and increment).
-+
-The smallest possible interval is one second.
-No other routing scheme is supported, although this feature was developed with considerations that it could be
-  extended/improved to other schemes.
-
-* The underlying collections form a contiguous sequence without gaps.  This will not be suitable when there are
-  large gaps in the underlying data, as Solr will insist that there be a collection for each increment.  This
-  is due in part on Solr calculating the end time of each interval collection based on the timestamp of
-  the next collection, since it is otherwise not stored in any way.
-
-* Avoid sending updates to the oldest collection if you have also configured that old collections should be
-  automatically deleted.  It could lead to exceptions bubbling back to the indexing client.
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index cbff8d4..1654689 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1565,8 +1565,8 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   }
 
   /**
-   * Returns a SolrRequest to create a routed alias. Only time based routing is supported presently,
-   * For time based routing, the start a standard Solr timestamp string (possibly with "date math").
+   * Returns a SolrRequest to create a time routed alias. For time based routing, the start
+   * should be a standard Solr timestamp string (possibly with "date math").
    *
    * @param aliasName the name of the alias to create.
    * @param start the start of the routing.  A standard Solr date: ISO-8601 or NOW with date math.
@@ -1664,6 +1664,76 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     }
 
   }
+  /**
+   * Returns a SolrRequest to create a category routed alias.
+   *
+   * @param aliasName the name of the alias to create.
+   * @param routerField the document field to contain the timestamp to route on
+   * @param maxCardinality the maximum number of collections under this CRA
+   * @param createCollTemplate Holds options to create a collection.  The "name" is ignored.
+   */
+  public static CreateCategoryRoutedAlias createCategoryRoutedAlias(String aliasName,
+                                                            String routerField,
+                                                            int maxCardinality,
+                                                            Create createCollTemplate) {
+
+    return new CreateCategoryRoutedAlias(aliasName, routerField, maxCardinality, createCollTemplate);
+  }
+
+  public static class CreateCategoryRoutedAlias extends AsyncCollectionAdminRequest {
+
+    public static final String ROUTER_TYPE_NAME = "router.name";
+    public static final String ROUTER_FIELD = "router.field";
+    public static final String ROUTER_MAX_CARDINALITY = "router.maxCardinality";
+    public static final String ROUTER_MUST_MATCH = "router.mustMatch";
+
+    private final String aliasName;
+    private final String routerField;
+    private Integer maxCardinality;
+    private String mustMatch;
+
+    private final Create createCollTemplate;
+
+    public CreateCategoryRoutedAlias(String aliasName, String routerField, int maxCardinality, Create createCollTemplate) {
+      super(CollectionAction.CREATEALIAS);
+      this.aliasName = aliasName;
+      this.routerField = routerField;
+      this.maxCardinality = maxCardinality;
+      this.createCollTemplate = createCollTemplate;
+    }
+
+    public CreateCategoryRoutedAlias setMustMatch(String regex) {
+      this.mustMatch = regex;
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.add(CommonParams.NAME, aliasName);
+      params.add(ROUTER_TYPE_NAME, "category");
+      params.add(ROUTER_FIELD, routerField);
+      params.add(ROUTER_MAX_CARDINALITY, maxCardinality.toString());
+
+      if (mustMatch != null) {
+        params.add(ROUTER_MUST_MATCH, mustMatch);
+      }
+
+      // merge the above with collectionParams.  Above takes precedence.
+      ModifiableSolrParams createCollParams = new ModifiableSolrParams(); // output target
+      final SolrParams collParams = createCollTemplate.getParams();
+      final Iterator<String> pIter = collParams.getParameterNamesIterator();
+      while (pIter.hasNext()) {
+        String key = pIter.next();
+        if (key.equals(CollectionParams.ACTION) || key.equals("name")) {
+          continue;
+        }
+        createCollParams.set("create-collection." + key, collParams.getParams(key));
+      }
+      return SolrParams.wrapDefaults(params, createCollParams);
+    }
+
+  }
 
   /**
    * Returns a SolrRequest to delete an alias
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index a9e00e9..de6b247 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -79,7 +79,8 @@ public interface CollectionParams {
     DELETEALIAS(true, LockLevel.COLLECTION),
     ALIASPROP(true, LockLevel.COLLECTION),
     LISTALIASES(false, LockLevel.NONE),
-    MAINTAINROUTEDALIAS(true, LockLevel.COLLECTION),
+    MAINTAINTIMEROUTEDALIAS(true, LockLevel.COLLECTION), // internal use only
+    MAINTAINCATEGORYROUTEDALIAS(true, LockLevel.COLLECTION), // internal use only
     DELETEROUTEDALIASCOLLECTIONS(true, LockLevel.COLLECTION),
     SPLITSHARD(true, LockLevel.SHARD),
     DELETESHARD(true, LockLevel.SHARD),
diff --git a/solr/solrj/src/resources/apispec/collections.Commands.json b/solr/solrj/src/resources/apispec/collections.Commands.json
index dc8e251..f24fe72 100644
--- a/solr/solrj/src/resources/apispec/collections.Commands.json
+++ b/solr/solrj/src/resources/apispec/collections.Commands.json
@@ -175,6 +175,14 @@
             "autoDeleteAge": {
               "type": "string",
               "description": "A date math expressions yielding a time in the past. Collections covering a period of time entirely before this age will be automatically deleted."
+            },
+            "maxCardinality": {
+              "type": "integer",
+              "description": "The maximum number of categories allowed for this alias."
+            },
+            "mustMatch": {
+              "type": "string",
+              "description": "A regular expression that the value of the field specified by `router.field` must match before a corresponding collection will be created."
             }
           }
         },