You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/09/17 09:44:22 UTC

[36/47] lucene-solr:jira/solr-12709: SOLR-12357: TRA preemptiveCreateMath option. Simplified test utility TrackingUpdateProcessorFactory. Reverted some attempts the TRA used to make in avoiding overseer communication (too complicated). Closes #433

SOLR-12357: TRA preemptiveCreateMath option.
Simplified test utility TrackingUpdateProcessorFactory.
Reverted some attempts the TRA used to make in avoiding overseer communication (too complicated).
Closes #433


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/21d130c3
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/21d130c3
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/21d130c3

Branch: refs/heads/jira/solr-12709
Commit: 21d130c3edf8bfb21a3428fc95e5b67d6be757e7
Parents: 9e04375
Author: David Smiley <ds...@apache.org>
Authored: Thu Sep 6 23:38:44 2018 -0400
Committer: David Smiley <ds...@apache.org>
Committed: Thu Sep 6 23:38:44 2018 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../api/collections/MaintainRoutedAliasCmd.java |   7 +-
 .../cloud/api/collections/TimeRoutedAlias.java  |  19 +
 .../DistributedUpdateProcessorFactory.java      |   2 +-
 .../TimeRoutedAliasUpdateProcessor.java         | 355 ++++++++++++-------
 .../TimeRoutedAliasUpdateProcessorTest.java     | 237 +++++++++++--
 .../TrackingUpdateProcessorFactory.java         | 136 ++-----
 solr/solr-ref-guide/src/collections-api.adoc    |  21 ++
 .../solrj/request/CollectionAdminRequest.java   |  10 +
 .../resources/apispec/collections.Commands.json |   4 +
 10 files changed, 546 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 78bfb33..e5596e1 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -216,6 +216,9 @@ New Features
 * SOLR-12612: Cluster properties restriction of known keys only is relaxed, and now unknown properties starting with "ext."
   will be allowed. This allows custom to plugins set their own cluster properties. (Jeffery Yuan via Tomás Fernández Löbbe)
 
+* SOLR-12357: Time Routed Aliases now have a preemptiveCreateMath option to preemptively and asynchronously create the
+  next collection in advance as new data gets within this time window of the end. (Gus Heck, David Smiley)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
index 083571c..e5c5de6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
@@ -75,7 +75,12 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
     this.ocmh = ocmh;
   }
 
-  /** Invokes this command from the client.  If there's a problem it will throw an exception. */
+  /**
+   * Invokes this command from the client.  If there's a problem it will throw an exception.
+   * Please note that is important to never add async to this invocation. This method must
+   * block (up to the standard OCP timeout) to prevent large batches of add's from sending a message
+   * to the overseer for every document added in TimeRoutedAliasUpdateProcessor.
+   */
   public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName)
       throws Exception {
     final String operation = CollectionParams.CollectionAction.MAINTAINROUTEDALIAS.toLower();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
index 312e736..1fb3d9e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.cloud.api.collections;
 
+import java.text.ParseException;
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
@@ -63,6 +64,7 @@ public class TimeRoutedAlias {
   public static final String ROUTER_START = ROUTER_PREFIX + "start";
   public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval";
   public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "maxFutureMs";
+  public static final String ROUTER_PREEMPTIVE_CREATE_MATH = ROUTER_PREFIX + "preemptiveCreateMath";
   public static final String ROUTER_AUTO_DELETE_AGE = ROUTER_PREFIX + "autoDeleteAge";
   public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
   // plus TZ and NAME
@@ -84,6 +86,7 @@ public class TimeRoutedAlias {
   public static final List<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
       ROUTER_MAX_FUTURE,
       ROUTER_AUTO_DELETE_AGE,
+      ROUTER_PREEMPTIVE_CREATE_MATH,
       TZ)); // kinda special
 
   static Predicate<String> PARAM_IS_PROP =
@@ -126,6 +129,7 @@ public class TimeRoutedAlias {
   private final String routeField;
   private final String intervalMath; // ex: +1DAY
   private final long maxFutureMs;
+  private final String preemptiveCreateMath;
   private final String autoDeleteAgeMath; // ex: /DAY-30DAYS  *optional*
   private final TimeZone timeZone;
 
@@ -141,6 +145,9 @@ public class TimeRoutedAlias {
 
     //optional:
     maxFutureMs = params.getLong(ROUTER_MAX_FUTURE, TimeUnit.MINUTES.toMillis(10));
+    // the date math configured is an interval to be subtracted from the most recent collection's time stamp
+    String pcmTmp = params.get(ROUTER_PREEMPTIVE_CREATE_MATH);
+    preemptiveCreateMath = pcmTmp != null ? (pcmTmp.startsWith("-") ? pcmTmp : "-" + pcmTmp) : null;
     autoDeleteAgeMath = params.get(ROUTER_AUTO_DELETE_AGE); // no default
     timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
 
@@ -167,6 +174,13 @@ public class TimeRoutedAlias {
         throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_AUTO_DELETE_AGE + ", " + e, e);
       }
     }
+    if (preemptiveCreateMath != null) {
+      try {
+        new DateMathParser().parseMath(preemptiveCreateMath);
+      } catch (ParseException e) {
+        throw new SolrException(BAD_REQUEST, "Invalid date math for preemptiveCreateMath:" + preemptiveCreateMath);
+      }
+    }
 
     if (maxFutureMs < 0) {
       throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be >= 0");
@@ -189,6 +203,10 @@ public class TimeRoutedAlias {
     return maxFutureMs;
   }
 
+  public String getPreemptiveCreateWindow() {
+    return preemptiveCreateMath;
+  }
+
   public String getAutoDeleteAgeMath() {
     return autoDeleteAgeMath;
   }
@@ -204,6 +222,7 @@ public class TimeRoutedAlias {
         .add("routeField", routeField)
         .add("intervalMath", intervalMath)
         .add("maxFutureMs", maxFutureMs)
+        .add("preemptiveCreateMath", preemptiveCreateMath)
         .add("autoDeleteAgeMath", autoDeleteAgeMath)
         .add("timeZone", timeZone)
         .toString();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 1930c08..9f2a0ba 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -50,7 +50,7 @@ public class DistributedUpdateProcessorFactory
   public UpdateRequestProcessor getInstance(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
     // note: will sometimes return DURP (no overhead) instead of wrapping
-    return TimeRoutedAliasUpdateProcessor.wrap(req, rsp,
+    return TimeRoutedAliasUpdateProcessor.wrap(req,
         new DistributedUpdateProcessor(req, rsp, next));
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index d9d1da1..cc1ddb8 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -19,17 +19,15 @@ package org.apache.solr.update.processor;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
-import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.api.collections.MaintainRoutedAliasCmd;
 import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
@@ -47,19 +45,24 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.admin.CollectionsHandler;
 import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.solr.common.util.ExecutorUtil.newMDCAwareSingleThreadExecutor;
 import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE;
+import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.NONE;
+import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.SYNCHRONOUS;
 
 /**
  * Distributes update requests to a rolling series of collections partitioned by a timestamp field.  Issues
@@ -73,33 +76,31 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
  * @since 7.2.0
  */
 public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
-  //TODO do we make this more generic to others who want to partition collections using something else?
-
-  public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
+  //TODO do we make this more generic to others who want to partition collections using something else besides time?
 
+  private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  // To avoid needless/redundant concurrent communication with the Overseer from this JVM, we
-  //   maintain a Semaphore from an alias name keyed ConcurrentHashMap.
-  //   Alternatively a Lock or CountDownLatch could have been used but they didn't seem
-  //   to make it any easier.
-  private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
+  // refs to std infrastructure
+  private final SolrQueryRequest req;
+  private final SolrCmdDistributor cmdDistrib;
+  private final CollectionsHandler collHandler;
+  private final ZkController zkController;
 
+  // Stuff specific to this class
   private final String thisCollection;
-
   private final TimeRoutedAlias timeRoutedAlias;
-
-  private final ZkController zkController;
-  private final SolrCmdDistributor cmdDistrib;
-  private final CollectionsHandler collHandler;
   private final SolrParams outParamsToLeader;
-  private final CloudDescriptor cloudDesc;
 
+  // These two fields may be updated within the calling thread during processing but should
+  // never be updated by any async creation thread.
   private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection.  Sorted descending
   private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
-  private SolrQueryRequest req;
 
-  public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+  // This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details
+  private volatile ExecutorService preemptiveCreationExecutor;
+
+  public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
     //TODO get from "Collection property"
     final String aliasName = req.getCore().getCoreDescriptor()
         .getCoreProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
@@ -113,18 +114,17 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
       // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
       return next;
     } else {
-      return new TimeRoutedAliasUpdateProcessor(req, rsp, next, aliasName, aliasDistribPhase);
+      return new TimeRoutedAliasUpdateProcessor(req, next, aliasName, aliasDistribPhase);
     }
   }
 
-  protected TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next,
-                                           String aliasName,
-                                           DistribPhase aliasDistribPhase) {
+  private TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
+                                         String aliasName,
+                                         DistribPhase aliasDistribPhase) {
     super(next);
     assert aliasDistribPhase == DistribPhase.NONE;
     final SolrCore core = req.getCore();
-    cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-    this.thisCollection = cloudDesc.getCollectionName();
+    this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
     this.req = req;
     CoreContainer cc = core.getCoreContainer();
     zkController = cc.getZkController();
@@ -164,71 +164,141 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
 
   @Override
   public void processAdd(AddUpdateCommand cmd) throws IOException {
-    SolrInputDocument solrInputDocument = cmd.getSolrInputDocument();
-    final Object routeValue = solrInputDocument.getFieldValue(timeRoutedAlias.getRouteField());
-    final Instant routeTimestamp = parseRouteKey(routeValue);
+    final Instant docTimestamp =
+        parseRouteKey(cmd.getSolrInputDocument().getFieldValue(timeRoutedAlias.getRouteField()));
+
+    // TODO: maybe in some cases the user would want to ignore/warn instead?
+    if (docTimestamp.isAfter(Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs()))) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "The document's time routed key of " + docTimestamp + " is too far in the future given " +
+              TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
+    }
 
+    // to avoid potential for race conditions, this next method should not get called again unless
+    // we have created a collection synchronously
     updateParsedCollectionAliases();
-    String targetCollection;
-    do { // typically we don't loop; it's only when we need to create a collection
-      targetCollection = findTargetCollectionGivenTimestamp(routeTimestamp);
-
-      if (targetCollection == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Doc " + cmd.getPrintableId() + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + routeTimestamp);
-      }
-
-      // Note: the following rule is tempting but not necessary and is not compatible with
-      // only using this URP when the alias distrib phase is NONE; otherwise a doc may be routed to from a non-recent
-      // collection to the most recent only to then go there directly instead of realizing a new collection is needed.
-      //      // If it's going to some other collection (not "this") then break to just send it there
-      //      if (!thisCollection.equals(targetCollection)) {
-      //        break;
-      //      }
-      // Also tempting but not compatible:  check that we're the leader, if not then break
-
-      // If the doc goes to the most recent collection then do some checks below, otherwise break the loop.
-      final Instant mostRecentCollTimestamp = parsedCollectionsDesc.get(0).getKey();
-      final String mostRecentCollName = parsedCollectionsDesc.get(0).getValue();
-      if (!mostRecentCollName.equals(targetCollection)) {
-        break;
-      }
-
-      // Check the doc isn't too far in the future
-      final Instant maxFutureTime = Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs());
-      if (routeTimestamp.isAfter(maxFutureTime)) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "The document's time routed key of " + routeValue + " is too far in the future given " +
-                TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
-      }
-
-      // Create a new collection?
-      final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(mostRecentCollTimestamp);
-      if (routeTimestamp.isBefore(nextCollTimestamp)) {
-        break; // thus we don't need another collection
-      }
 
-      createCollectionAfter(mostRecentCollName); // *should* throw if fails for some reason but...
-      final boolean updated = updateParsedCollectionAliases();
-      if (!updated) { // thus we didn't make progress...
-        // this is not expected, even in known failure cases, but we check just in case
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "We need to create a new time routed collection but for unknown reasons were unable to do so.");
-      }
-      // then retry the loop ...
-    } while(true);
-    assert targetCollection != null;
+    String targetCollection = createCollectionsIfRequired(docTimestamp, cmd);
 
     if (thisCollection.equals(targetCollection)) {
       // pass on through; we've reached the right collection
       super.processAdd(cmd);
     } else {
       // send to the right collection
-      SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, solrInputDocument);
+      SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
       cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
     }
   }
 
+  /**
+   * Create any required collections and return the name of the collection to which the current document should be sent.
+   *
+   * @param docTimestamp the date for the document taken from the field specified in the TRA config
+   * @param cmd The initial calculated destination collection.
+   * @return The name of the proper destination collection for the document which may or may not be a
+   *         newly created collection
+   */
+  private String createCollectionsIfRequired(Instant docTimestamp, AddUpdateCommand cmd) {
+    // Even though it is possible that multiple requests hit this code in the 1-2 sec that
+    // it takes to create a collection, it's an established anti-pattern to feed data with a very large number
+    // of client connections. This in mind, we only guard against spamming the overseer within a batch of
+    // updates. We are intentionally tolerating a low level of redundant requests in favor of simpler code. Most
+    // super-sized installations with many update clients will likely be multi-tenant and multiple tenants
+    // probably don't write to the same alias. As such, we have deferred any solution to the "many clients causing
+    // collection creation simultaneously" problem until such time as someone actually has that problem in a
+    // real world use case that isn't just an anti-pattern.
+    Map.Entry<Instant, String> candidateCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
+    String candidateCollectionName = candidateCollectionDesc.getValue();
+    try {
+      switch (typeOfCreationRequired(docTimestamp, candidateCollectionDesc.getKey())) {
+        case SYNCHRONOUS:
+          // This next line blocks until all collections required by the current document have been created
+          return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
+        case ASYNC_PREEMPTIVE:
+          if (preemptiveCreationExecutor == null) {
+            // It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
+            // in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
+            // and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
+            // collections that existed when candidateCollectionDesc was created. If this class updates it's notion of
+            // parsedCollectionsDesc since candidateCollectionDesc was chosen, we could create collection n+2
+            // instead of collection n+1.
+            String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+
+            // This line does not block and the document can be added immediately
+            preemptiveAsync(() -> createNextCollection(mostRecentCollName));
+          }
+          return candidateCollectionName;
+        case NONE:
+          return candidateCollectionName; // could use fall through, but fall through is fiddly for later editors.
+        default:
+          throw unknownCreateType();
+      }
+      // do nothing if creationType == NONE
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private void preemptiveAsync(Runnable r) {
+    // Note: creating an executor and throwing it away is slightly expensive, but this is only likely to happen
+    // once per hour/day/week (depending on time slice size for the TRA). If the executor were retained, it
+    // would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
+    // more complicated from a code maintenance and readability stand point. An executor must used instead of a
+    // thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
+    DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
+    preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
+    preemptiveCreationExecutor.execute(() -> {
+      r.run();
+      preemptiveCreationExecutor.shutdown();
+      preemptiveCreationExecutor = null;
+    });
+  }
+
+  /**
+   * Determine if the a new collection will be required based on the document timestamp. Passing null for
+   * preemptiveCreateInterval tells you if the document is beyond all existing collections with a response of
+   * {@link CreationType#NONE} or {@link CreationType#SYNCHRONOUS}, and passing a valid date math for
+   * preemptiveCreateMath additionally distinguishes the case where the document is close enough to the end of
+   * the TRA to trigger preemptive creation but not beyond all existing collections with a value of
+   * {@link CreationType#ASYNC_PREEMPTIVE}.
+   *
+   * @param docTimeStamp The timestamp from the document
+   * @param targetCollectionTimestamp The timestamp for the presently selected destination collection
+   * @return a {@code CreationType} indicating if and how to create a collection
+   */
+  private CreationType typeOfCreationRequired(Instant docTimeStamp, Instant targetCollectionTimestamp) {
+    final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(targetCollectionTimestamp);
+
+    if (!docTimeStamp.isBefore(nextCollTimestamp)) {
+      // current document is destined for a collection that doesn't exist, must create the destination
+      // to proceed with this add command
+      return SYNCHRONOUS;
+    }
+
+    if (isNotBlank(timeRoutedAlias.getPreemptiveCreateWindow())) {
+      Instant preemptNextColCreateTime =
+          calcPreemptNextColCreateTime(timeRoutedAlias.getPreemptiveCreateWindow(), nextCollTimestamp);
+      if (!docTimeStamp.isBefore(preemptNextColCreateTime)) {
+        return ASYNC_PREEMPTIVE;
+      }
+    }
+
+    return NONE;
+  }
+
+  private Instant calcPreemptNextColCreateTime(String preemptiveCreateMath, Instant nextCollTimestamp) {
+    DateMathParser dateMathParser = new DateMathParser();
+    dateMathParser.setNow(Date.from(nextCollTimestamp));
+    try {
+      return dateMathParser.parseMath(preemptiveCreateMath).toInstant();
+    } catch (ParseException e) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Invalid Preemptive Create Window Math:'" + preemptiveCreateMath + '\'', e);
+    }
+  }
+
   private Instant parseRouteKey(Object routeKey) {
     final Instant docTimestamp;
     if (routeKey instanceof Instant) {
@@ -261,61 +331,43 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
     return false;
   }
 
-  /** Given the route key, finds the collection.  Returns null if too old to go in last one. */
-  private String findTargetCollectionGivenTimestamp(Instant docTimestamp) {
+  /**
+   * Given the route key, finds the correct collection or returns the most recent collection if the doc
+   * is in the future. Future docs will potentially cause creation of a collection that does not yet exist
+   * or an error if they exceed the maxFutureMs setting.
+   *
+   * @throws SolrException if the doc is too old to be stored in the TRA
+   */
+  private Map.Entry<Instant, String> findCandidateGivenTimestamp(Instant docTimestamp, String printableId) {
     // Lookup targetCollection given route key.  Iterates in reverse chronological order.
     //    We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
     for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
       Instant colStartTime = entry.getKey();
       if (!docTimestamp.isBefore(colStartTime)) {  // i.e. docTimeStamp is >= the colStartTime
-        return entry.getValue(); //found it
+        return entry; //found it
       }
     }
-    return null; //not found
+    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+        "Doc " + printableId + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + docTimestamp);
   }
 
-  private void createCollectionAfter(String mostRecentCollName) {
+  private void createNextCollection(String mostRecentCollName) {
     // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name).  It will create the collection
     //   and update the alias contingent on the most recent collection name being the same as
     //   what we think so here, otherwise it will return (without error).
-
-    // (see docs on aliasToSemaphoreMap)
-    final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(getAliasName(), n -> new Semaphore(1));
-    if (semaphore.tryAcquire()) {
-      try {
-        MaintainRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
-        // we don't care about the response.  It's possible no collection was created because
-        //  of a race and that's okay... we'll ultimately retry any way.
-
-        // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
-        //  not yet know about the new alias (thus won't see the newly added collection to it), and we might think
-        //  we failed.
-        zkController.getZkStateReader().aliasesManager.update();
-      } catch (RuntimeException e) {
-        throw e;
-      } catch (Exception e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } finally {
-        semaphore.release(); // to signal we're done to anyone waiting on it
-      }
-
-    } else {
-      // Failed to acquire permit because another URP instance on this JVM is creating a collection.
-      // So wait till it's available
-      log.debug("Collection creation is already in progress so we'll wait then try again.");
-      try {
-        if (semaphore.tryAcquire(DEFAULT_COLLECTION_OP_TIMEOUT, TimeUnit.MILLISECONDS)) {
-          semaphore.release(); // we don't actually want a permit so give it back
-          // return to continue...
-        } else {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-              "Waited too long for another update thread to be done with collection creation.");
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Interrupted waiting on collection creation.", e); // if we were interrupted, give up.
-      }
+    try {
+      MaintainRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
+      // we don't care about the response.  It's possible no collection was created because
+      //  of a race and that's okay... we'll ultimately retry any way.
+
+      // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
+      //  not yet know about the new alias (thus won't see the newly added collection to it), and we might think
+      //  we failed.
+      zkController.getZkStateReader().aliasesManager.update();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
   }
 
@@ -404,4 +456,61 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
         collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
   }
 
+
+  /**
+   * Create as many collections as required. This method loops to allow for the possibility that the docTimestamp
+   * requires more than one collection to be created. Since multiple threads may be invoking maintain on separate
+   * requests to the same alias, we must pass in the name of the collection that this thread believes to be the most
+   * recent collection. This assumption is checked when the command is executed in the overseer. When this method
+   * finds that all collections required have been created it returns the (possibly new) most recent collection.
+   * The return value is ignored by the calling code in the async preemptive case.
+   *
+   * @param docTimestamp the timestamp from the document that determines routing
+   * @param printableId an identifier for the add command used in error messages
+   * @param targetCollectionDesc the descriptor for the presently selected collection which should also be
+   *                             the most recent collection in all cases where this method is invoked.
+   * @return The latest collection, including collections created during maintenance
+   */
+  private String createAllRequiredCollections( Instant docTimestamp, String printableId,
+                                               Map.Entry<Instant, String> targetCollectionDesc) {
+    do {
+      switch(typeOfCreationRequired(docTimestamp, targetCollectionDesc.getKey())) {
+        case NONE:
+          return targetCollectionDesc.getValue(); // we don't need another collection
+        case ASYNC_PREEMPTIVE:
+          // can happen when preemptive interval is longer than one time slice
+          String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+          preemptiveAsync(() -> createNextCollection(mostRecentCollName));
+          return targetCollectionDesc.getValue();
+        case SYNCHRONOUS:
+          createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but...
+          if (!updateParsedCollectionAliases()) { // thus we didn't make progress...
+            // this is not expected, even in known failure cases, but we check just in case
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "We need to create a new time routed collection but for unknown reasons were unable to do so.");
+          }
+          // then retry the loop ... have to do find again in case other requests also added collections
+          // that were made visible when we called updateParsedCollectionAliases()
+          targetCollectionDesc = findCandidateGivenTimestamp(docTimestamp, printableId);
+          break;
+        default:
+          throw unknownCreateType();
+
+      }
+    } while (true);
+  }
+
+  private SolrException unknownCreateType() {
+    return new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown creation type while adding " +
+        "document to a Time Routed Alias! This is a bug caused when a creation type has been added but " +
+        "not all code has been updated to handle it.");
+  }
+
+  enum CreationType {
+    NONE,
+    ASYNC_PREEMPTIVE,
+    SYNCHRONOUS
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index 048cf5b..5c9fc94 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -18,6 +18,7 @@
 package org.apache.solr.update.processor;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
@@ -33,7 +34,6 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import org.apache.lucene.util.IOUtils;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -54,6 +54,7 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -69,15 +70,20 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  static final String configName = "timeConfig";
-  static final String alias = "myalias";
-  static final String timeField = "timestamp_dt";
-  static final String intField = "integer_i";
+  private static final String configName = "timeConfig";
+  private static final String alias = "myalias";
+  private static final String timeField = "timestamp_dt";
+  private static final String intField = "integer_i";
 
-  static SolrClient solrClient;
+  private static CloudSolrClient solrClient;
 
   private int lastDocId = 0;
   private int numDocsDeletedOrFailed = 0;
@@ -88,13 +94,11 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
   }
 
   @Before
-  public void doBefore() throws Exception {
+  public void doBefore() {
     solrClient = getCloudSolrClient(cluster);
     //log this to help debug potential causes of problems
-    System.out.println("SolrClient: " + solrClient);
-    if (solrClient instanceof CloudSolrClient) {
-      System.out.println(((CloudSolrClient) solrClient).getClusterStateProvider());
-    }
+    log.info("SolrClient: {}", solrClient);
+    log.info("ClusterStateProvider {}",solrClient.getClusterStateProvider());
   }
 
   @After
@@ -231,10 +235,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
             "  'add-updateprocessor' : {" +
             "    'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
             "  }," +
-            // if other tracking tests are written, add another TUPF with a unique group name, don't re-use this one!
             // See TrackingUpdateProcessorFactory javadocs for details...
             "  'add-updateprocessor' : {" +
-            "    'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'testSliceRouting'" +
+            "    'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
             "  }," +
             "  'add-updateprocessor' : {" + // for testing
             "    'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
@@ -265,8 +268,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
    * @throws Exception when it blows up unexpectedly :)
    */
   @Slow
-  @Nightly
   @Test
+  @LogLevel("org.apache.solr.update.processor.TrackingUpdateProcessorFactory=DEBUG")
   public void testSliceRouting() throws Exception {
     String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName();
     createConfigSet(configName);
@@ -294,8 +297,10 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     // leader randomly and not causing a failure if the code is broken, but as a whole this test will therefore only have
     // about a 3.6% false positive rate (0.33^3). If that's not good enough, add more docs or more replicas per shard :).
 
+    final String trackGroupName = getTrackUpdatesGroupName();
+    final List<UpdateCommand> updateCommands;
     try {
-      TrackingUpdateProcessorFactory.startRecording(getTestName());
+      TrackingUpdateProcessorFactory.startRecording(trackGroupName);
 
       // cause some collections to be created
 
@@ -306,7 +311,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
           sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z")),
           params));
     } finally {
-      TrackingUpdateProcessorFactory.stopRecording(getTestName());
+      updateCommands = TrackingUpdateProcessorFactory.stopRecording(trackGroupName);
     }
 
     try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
@@ -315,7 +320,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
       Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
       assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
 
-      List<UpdateCommand> updateCommands = TrackingUpdateProcessorFactory.commandsForGroup(getTestName());
       assertEquals(3, updateCommands.size());
       for (UpdateCommand updateCommand : updateCommands) {
         String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
@@ -324,6 +328,179 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     }
   }
 
+  /** @see TrackingUpdateProcessorFactory */
+  private String getTrackUpdatesGroupName() {
+    return getTestName();
+  }
+
+  @Test
+  @Slow
+  public void testPreemptiveCreation() throws Exception {
+    String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName();
+    createConfigSet(configName);
+
+    final int numShards = 1 ;
+    final int numReplicas = 1 ;
+    CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
+        CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
+            .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
+        .process(solrClient);
+
+    // cause some collections to be created
+    assertUpdateResponse(solrClient.add(alias,
+        sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z")
+    ));
+    assertUpdateResponse(solrClient.commit(alias));
+
+    // wait for all the collections to exist...
+    waitCol("2017-10-23", numShards); // This one should have already existed from the alias creation
+    waitCol("2017-10-24", numShards); // Create 1
+    waitCol("2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken)
+
+    // normal update, nothing special, no collection creation required.
+    List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(3,cols.size());
+
+    assertNumDocs("2017-10-23", 0);
+    assertNumDocs("2017-10-24", 0);
+    assertNumDocs("2017-10-25", 1);
+
+    // cause some collections to be created
+
+    ModifiableSolrParams params = params();
+    assertUpdateResponse(add(alias, Arrays.asList(
+        sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
+        sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
+        sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"),
+        sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation
+        params));
+    assertUpdateResponse(solrClient.commit(alias));
+
+    cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(3,cols.size());
+    assertNumDocs("2017-10-23", 1);
+    assertNumDocs("2017-10-24", 1);
+    assertNumDocs("2017-10-25", 3);
+
+    assertUpdateResponse(add(alias, Collections.singletonList(
+        sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
+        params));
+    assertUpdateResponse(solrClient.commit(alias));
+
+    waitCol("2017-10-26", numShards);
+    cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(4,cols.size());
+    assertNumDocs("2017-10-23", 1);
+    assertNumDocs("2017-10-24", 1);
+    assertNumDocs("2017-10-25", 4);
+    assertNumDocs("2017-10-26", 0);
+
+    // now test with pre-create window longer than time slice, and forcing multiple creations.
+    CollectionAdminRequest.setAliasProperty(alias)
+        .addProperty(TimeRoutedAlias.ROUTER_PREEMPTIVE_CREATE_MATH, "3DAY").process(solrClient);
+
+    assertUpdateResponse(add(alias, Collections.singletonList(
+        sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation now
+        params));
+    assertUpdateResponse(solrClient.commit(alias));
+    waitCol("2017-10-27", numShards);
+
+    cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(5,cols.size()); // only one created in async case
+    assertNumDocs("2017-10-23", 1);
+    assertNumDocs("2017-10-24", 1);
+    assertNumDocs("2017-10-25", 5);
+    assertNumDocs("2017-10-26", 0);
+    assertNumDocs("2017-10-27", 0);
+
+    assertUpdateResponse(add(alias, Collections.singletonList(
+        sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation now
+        params));
+    assertUpdateResponse(solrClient.commit(alias));
+    waitCol("2017-10-27", numShards);
+    waitCol("2017-10-28", numShards);
+
+    cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
+    assertNumDocs("2017-10-23", 1);
+    assertNumDocs("2017-10-24", 1);
+    assertNumDocs("2017-10-25", 6);
+    assertNumDocs("2017-10-26", 0);
+    assertNumDocs("2017-10-27", 0);
+    assertNumDocs("2017-10-28", 0);
+
+    QueryResponse resp;
+    resp = solrClient.query(alias, params(
+        "q", "*:*",
+        "rows", "10"));
+    assertEquals(8, resp.getResults().getNumFound());
+
+    assertUpdateResponse(add(alias, Arrays.asList(
+        sdoc("id", "9", "timestamp_dt", "2017-10-27T23:01:00Z"), // should cause preemptive creation
+
+        // If these are not ignored properly this test will fail during cleanup with a message about router.name being
+        // required. This happens because the test finishes while overseer threads are still trying to invoke maintain
+        // after the @After method has deleted collections and emptied out the aliases.... this leaves the maintain
+        // command cloning alias properties Aliases.EMPTY and thus not getting a value from router.name
+        // (normally router.name == 'time') The check for non-blank router.name  happens to be the first validation.
+        // There is a small chance this could slip through without a fail occasionally, but it was 100% with just one
+        // of these.
+        sdoc("id", "10", "timestamp_dt", "2017-10-28T23:01:00Z"),  // should be ignored due to in progress creation
+        sdoc("id", "11", "timestamp_dt", "2017-10-28T23:02:00Z"),  // should be ignored due to in progress creation
+        sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation
+        params));
+    assertUpdateResponse(solrClient.commit(alias));
+    waitCol("2017-10-29", numShards);
+
+    cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(7,cols.size());
+    assertNumDocs("2017-10-23", 1);
+    assertNumDocs("2017-10-24", 1);
+    assertNumDocs("2017-10-25", 6);
+    assertNumDocs("2017-10-26", 0);
+    assertNumDocs("2017-10-27", 1);
+    assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
+    assertNumDocs("2017-10-29", 0);
+
+    resp = solrClient.query(alias, params(
+        "q", "*:*",
+        "rows", "0"));
+    assertEquals(12, resp.getResults().getNumFound());
+
+    // Sych creation with an interval longer than the time slice for the alias..
+    assertUpdateResponse(add(alias, Collections.singletonList(
+        sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky?
+        params));
+    assertUpdateResponse(solrClient.commit(alias));
+    waitCol("2017-10-30", numShards);
+    waitCol("2017-10-31", numShards); // spooky! async case arising in middle of sync creation!!
+
+    cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+    assertEquals(9,cols.size());
+    assertNumDocs("2017-10-23", 1);
+    assertNumDocs("2017-10-24", 1);
+    assertNumDocs("2017-10-25", 6);
+    assertNumDocs("2017-10-26", 0);
+    assertNumDocs("2017-10-27", 1);
+    assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
+    assertNumDocs("2017-10-29", 0);
+    assertNumDocs("2017-10-30", 1);
+    assertNumDocs("2017-10-31", 0);
+
+    resp = solrClient.query(alias, params(
+        "q", "*:*",
+        "rows", "0"));
+    assertEquals(13, resp.getResults().getNumFound());
+
+  }
+
+  private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException {
+    QueryResponse resp = solrClient.query(alias + "_" + datePart, params(
+        "q", "*:*",
+        "rows", "10"));
+    assertEquals(expected, resp.getResults().getNumFound());
+  }
+
   private Set<String> getLeaderCoreNames(ClusterState clusterState) {
     Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
     List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
@@ -344,9 +521,28 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
     return leaders;
   }
 
-  private void waitCol(final String datePart, int slices) {
-    waitForState("waiting for collections to be created",alias + "_" + datePart,
-        (liveNodes, collectionState) -> collectionState.getActiveSlices().size() == slices);
+  private void waitCol(final String datePart, int slices) throws InterruptedException {
+    // collection to exist
+    String collection = alias + "_" + datePart;
+    waitForState("waiting for collections to be created", collection,
+        (liveNodes, collectionState) -> {
+          if (collectionState == null) {
+            // per predicate javadoc, this is what we get if the collection doesn't exist at all.
+            return false;
+          }
+          Collection<Slice> activeSlices = collectionState.getActiveSlices();
+          int size = activeSlices.size();
+          return size == slices;
+        });
+    // and alias to be aware of collection
+    long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
+    while (!cluster.getSolrClient().getZkStateReader().getAliases().getCollectionAliasListMap().get(alias).contains(collection)) {
+      if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
+        fail("took over 10 seconds after collection creation to update aliases");
+      } else {
+        Thread.sleep(500);
+      }
+    }
   }
 
   private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
@@ -484,6 +680,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
   }
 
   /** Adds the docs to Solr via {@link #solrClient} with the params */
+  @SuppressWarnings("SameParameterValue")
   private static UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
     UpdateRequest req = new UpdateRequest();
     if (params != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java b/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
index 3e8f53b..06a72ea 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
@@ -16,18 +16,17 @@
  */
 package org.apache.solr.update.processor;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.mina.util.ConcurrentHashSet;
-import org.apache.solr.common.SolrException;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
@@ -42,113 +41,60 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This Factory is similar to {@link RecordingUpdateProcessorFactory}, but with the goal of
- * tracking requests across multiple collections/shards/replicas in a CloudSolrTestCase.
+ * tracking requests across multiple collections/shards/replicas in a {@link SolrCloudTestCase}.
  * It can optionally save references to the commands it receives inm a single global
  * Map&lt;String,BlockingQueue&gt; keys in the map are arbitrary, but the intention is that tests
  * generate a key that is unique to that test, and configure the factory with the key as "group name"
  * to avoid cross talk between tests. Tests can poll for requests from a group to observe that the expected
  * commands are executed.  By default, this factory does nothing except return the "next"
- * processor from the chain unless it's told to {@link #startRecording()} in which case all factories
- * with the same group will begin recording. It is critical that tests utilizing this
- * processor call {@link #close()} on at least one group member after the test finishes. The requests associated with
- * the commands are also provided with a
+ * processor from the chain unless it's told to {@link #startRecording(String)} in which case all factories
+ * with the same group will begin recording.
  *
  * This class is only for unit test purposes and should not be used in any production capacity. It presumes all nodes
- * exist within the same JVM (i. e. MiniSolrCloudCluster).
+ * exist within the same JVM (i.e. {@link MiniSolrCloudCluster}).
  */
 public final class TrackingUpdateProcessorFactory
-  extends UpdateRequestProcessorFactory implements Closeable {
+    extends UpdateRequestProcessorFactory {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String REQUEST_COUNT = "TrackingUpdateProcessorRequestCount";
   public static final String REQUEST_NODE = "TrackingUpdateProcessorRequestNode";
 
-  private final static Map<String,Set<TrackingUpdateProcessorFactory>> groupMembership = new ConcurrentHashMap<>();
-  private final static Map<String,AtomicInteger> groupSerialNums = new ConcurrentHashMap<>();
-
   /**
    * The map of group queues containing commands that were recorded
    * @see #startRecording
    */
-  private final static Map<String, List<UpdateCommand>> commandQueueMap = new ConcurrentHashMap<>();
+  private final static Map<String, List<UpdateCommand>> groupToCommands = new ConcurrentHashMap<>();
 
-  private static final Object memoryConsistency = new Object();
+  private String group = "default";
 
-  private volatile boolean recording = false;
+  public static void startRecording(String group) {
+    final List<UpdateCommand> updateCommands = groupToCommands.get(group);
+    assert updateCommands == null || updateCommands.isEmpty();
 
-  private String group = "default";
+    List<UpdateCommand> existing = groupToCommands.put(group, Collections.synchronizedList(new ArrayList<>()));
+    assert existing == null : "Test cross-talk?";
+  }
 
   /**
-   * Get a copy of the queue for the group.
    *
    * @param group the name of the group to fetch
-   * @return A cloned queue containing the same elements as the queue held in commandQueueMap
+   * @return A cloned queue containing the same elements as the queue held in groupToCommands
    */
-  public static ArrayList<UpdateCommand> commandsForGroup(String group) {
-    synchronized (memoryConsistency) {
-      return new ArrayList<>(commandQueueMap.get(group));
-    }
-  }
-
-  public static void startRecording(String group) {
-    synchronized (memoryConsistency) {
-      Set<TrackingUpdateProcessorFactory> trackingUpdateProcessorFactories = groupMembership.get(group);
-      if (trackingUpdateProcessorFactories == null || trackingUpdateProcessorFactories.isEmpty()) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There are no trackingUpdateProcessors for group " + group);
-      }
-      for (TrackingUpdateProcessorFactory trackingUpdateProcessorFactory : trackingUpdateProcessorFactories) {
-        trackingUpdateProcessorFactory.startRecording();
-      }
-    }
-  }
-  public static void stopRecording(String group) {
-    synchronized (memoryConsistency) {
-      Set<TrackingUpdateProcessorFactory> trackingUpdateProcessorFactories = groupMembership.get(group);
-      if (trackingUpdateProcessorFactories == null || trackingUpdateProcessorFactories.isEmpty()) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There are no trackingUpdateProcessors for group "
-            + group + " available groups are:" + groupMembership.keySet());
-      }
-      for (TrackingUpdateProcessorFactory trackingUpdateProcessorFactory : trackingUpdateProcessorFactories) {
-        trackingUpdateProcessorFactory.stopRecording();
-      }
-    }
+  public static List<UpdateCommand> stopRecording(String group) {
+    List<UpdateCommand> commands = groupToCommands.remove(group);
+    return Arrays.asList(commands.toArray(new UpdateCommand[0])); // safe copy. input list is synchronized
   }
 
   @Override
   public void init(NamedList args) {
     if (args != null && args.indexOf("group",0) >= 0) {
       group = (String) args.get("group");
+      log.debug("Init URP, group '{}'", group);
     } else {
       log.warn("TrackingUpdateProcessorFactory initialized without group configuration, using 'default' but this group is shared" +
           "across the entire VM and guaranteed to have unpredictable behavior if used by more than one test");
     }
-    // compute if absent to avoid replacing in the case of multiple "default"
-    commandQueueMap.computeIfAbsent(group, s -> new ArrayList<>());
-    groupMembership.computeIfAbsent(group,s-> new ConcurrentHashSet<>());
-    groupSerialNums.computeIfAbsent(group,s-> new AtomicInteger(0));
-
-    groupMembership.get(group).add(this);
-  }
-
-  /**
-   * @see #stopRecording 
-   * @see #commandQueueMap
-   */
-  public synchronized void startRecording() {
-    Set<TrackingUpdateProcessorFactory> facts = groupMembership.get(group);
-    // facts being null is a bug, all instances should have a group.
-    for (TrackingUpdateProcessorFactory fact : facts) {
-      fact.recording = true;
-    }
-  }
-
-  /** @see #startRecording */
-  public synchronized void stopRecording() {
-    Set<TrackingUpdateProcessorFactory> factories = groupMembership.get(group);
-    // facts being null is a bug, all instances should have a group.
-    for (TrackingUpdateProcessorFactory fact : factories) {
-      fact.recording = false;
-    }
   }
 
   @Override
@@ -156,35 +102,26 @@ public final class TrackingUpdateProcessorFactory
   public synchronized UpdateRequestProcessor getInstance(SolrQueryRequest req, 
                                                          SolrQueryResponse rsp, 
                                                          UpdateRequestProcessor next ) {
-    return recording ? new RecordingUpdateRequestProcessor(group, next) : next;
-  }
-
-  @Override
-  public void close() {
-    commandQueueMap.remove(group);
-    groupMembership.get(group).clear();
+    final List<UpdateCommand> commands = groupToCommands.get(group);
+    return commands == null ? next : new RecordingUpdateRequestProcessor(commands, next);
   }
 
   private static final class RecordingUpdateRequestProcessor
-    extends UpdateRequestProcessor {
+      extends UpdateRequestProcessor {
 
-    private String group;
+    private final List<UpdateCommand> groupCommands;
 
-    public RecordingUpdateRequestProcessor(String group,
-                                           UpdateRequestProcessor next) {
+    RecordingUpdateRequestProcessor(List<UpdateCommand> groupCommands, UpdateRequestProcessor next) {
       super(next);
-      this.group = group;
+      this.groupCommands = groupCommands;
     }
 
     private void record(UpdateCommand cmd) {
-      synchronized (memoryConsistency) {
-        String coreName = cmd.getReq().getCore().getName();
-        Map<Object, Object> context = cmd.getReq().getContext();
-        context.put(REQUEST_COUNT, groupSerialNums.get(group).incrementAndGet());
-        context.put(REQUEST_NODE, coreName);
-        List<UpdateCommand> commands = commandQueueMap.get(group);
-        commands.add(cmd.clone()); // important because cmd.clear() will be called
-      }
+      groupCommands.add(cmd.clone()); // important because cmd.clear() will be called
+
+      Map<Object, Object> context = cmd.getReq().getContext();
+      context.put(REQUEST_COUNT, groupCommands.size());
+      context.put(REQUEST_NODE, cmd.getReq().getCore().getName());
     }
 
     @Override
@@ -212,13 +149,6 @@ public final class TrackingUpdateProcessorFactory
       record(cmd);
       super.processRollback(cmd);
     }
-
-
-    @Override
-    protected void doClose() {
-      super.doClose();
-      groupMembership.get(group).remove(this);
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/solr-ref-guide/src/collections-api.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index 43825e8..e4230dd 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -623,6 +623,27 @@ without error.  If there was no limit, than an erroneous value could trigger man
 +
 The default is 10 minutes.
 
+`router.preemptiveCreateMath`::
+A date math expression that results in early creation of new collections.
++
+If a document arrives with a timestamp that is after the end time of the most recent collection minus this
+interval, then the next (and only the next) collection will be created asynchronously. Without this setting, collections are created
+synchronously when required by the document time stamp and thus block the flow of documents until the collection
+is created (possibly several seconds). Preemptive creation reduces these hiccups. If set to enough time (perhaps
+an hour or more) then if there are problems creating a collection, this window of time might be enough to take
+corrective action. However after a successful preemptive creation,  the collection is consuming resources without
+being used, and new documents will tend to be routed through it only to be routed elsewhere. Also, note that
+router.autoDeleteAge is currently evaluated relative to the date of a newly created collection, and so you may
+want to increase the delete age by the preemptive window amount so that the oldest collection isn't deleted too
+soon. Note that it has to be possible to subtract the interval specified from a date, so if prepending a
+minus sign creates invalid date math, this will cause an error. Also note that a document that is itself
+destined for a collection that does not exist will still trigger synchronous creation up to that destination collection
+but will not trigger additional async preemptive creation. Only one type of collection creation can happen
+per document.
+Example: `90MINUTES`.
++
+This property is blank by default indicating just-in-time, synchronous creation of new collections.
+
 `router.autoDeleteAge`::
 A date math expression that results in the oldest collections getting deleted automatically.
 +

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 9667f37..b400688 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1508,6 +1508,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public static final String ROUTER_START = "router.start";
     public static final String ROUTER_INTERVAL = "router.interval";
     public static final String ROUTER_MAX_FUTURE = "router.maxFutureMs";
+    public static final String ROUTER_PREEMPTIVE_CREATE_WINDOW = "router.preemptiveCreateMath";
 
     private final String aliasName;
     private final String routerField;
@@ -1516,6 +1517,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     //Optional:
     private TimeZone tz;
     private Integer maxFutureMs;
+    private String preemptiveCreateMath;
 
     private final Create createCollTemplate;
 
@@ -1540,6 +1542,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       return this;
     }
 
+    public CreateTimeRoutedAlias setPreemptiveCreateWindow(String preemptiveCreateMath) {
+      this.preemptiveCreateMath = preemptiveCreateMath;
+      return this;
+    }
+
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
@@ -1554,6 +1561,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (maxFutureMs != null) {
         params.add(ROUTER_MAX_FUTURE, ""+maxFutureMs);
       }
+      if (preemptiveCreateMath != null) {
+        params.add(ROUTER_PREEMPTIVE_CREATE_WINDOW, preemptiveCreateMath);
+      }
 
       // merge the above with collectionParams.  Above takes precedence.
       ModifiableSolrParams createCollParams = new ModifiableSolrParams(); // output target

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/21d130c3/solr/solrj/src/resources/apispec/collections.Commands.json
----------------------------------------------------------------------
diff --git a/solr/solrj/src/resources/apispec/collections.Commands.json b/solr/solrj/src/resources/apispec/collections.Commands.json
index ed55a1e..dc8e251 100644
--- a/solr/solrj/src/resources/apispec/collections.Commands.json
+++ b/solr/solrj/src/resources/apispec/collections.Commands.json
@@ -168,6 +168,10 @@
               "type": "integer",
               "description":"How many milliseconds into the future to accept document. Documents with a value in router.field that is greater than now() + maxFutureMs will be rejected to avoid provisioning too much resources."
             },
+            "preemptiveCreateMath":{
+              "type": "string",
+              "description": "If a document arrives with a timestamp that is after the end time of the most recent collection minus this interval, then the next collection will be created asynchronously. Without this setting, collections are created synchronously when required by the document time stamp and thus block the flow of documents until the collection is created (possibly several seconds). Preemptive creation reduces these hiccups. If set to enough time (perhaps an hour or more) then if there are problems creating a collection, this window of time might be enough to take corrective action. However after a successful preemptive creation,  the collection is consuming resources without being used, and new documents will tend to be routed through it only to be routed elsewhere. Also, note that router.autoDeleteAge is currently evaluated relative to the date of a newly created collection, and so you may want to increase the delete age by the preemptive window amount so that the 
 oldest collection isn't deleted too soon."
+            },
             "autoDeleteAge": {
               "type": "string",
               "description": "A date math expressions yielding a time in the past. Collections covering a period of time entirely before this age will be automatically deleted."