You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2018/01/05 18:53:36 UTC
lucene-solr:master: SOLR-11653: TimeRoutedAlias URP now auto-creates
collections using new RoutedAliasCreateCollectionCmd
Repository: lucene-solr
Updated Branches:
refs/heads/master 3980aea18 -> 925733d1e
SOLR-11653: TimeRoutedAlias URP now auto-creates collections using new RoutedAliasCreateCollectionCmd
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/925733d1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/925733d1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/925733d1
Branch: refs/heads/master
Commit: 925733d1ef3ac6fbabc450804511c65a4c6424ac
Parents: 3980aea
Author: David Smiley <ds...@apache.org>
Authored: Fri Jan 5 13:53:26 2018 -0500
Committer: David Smiley <ds...@apache.org>
Committed: Fri Jan 5 13:53:26 2018 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +
.../java/org/apache/solr/cloud/Overseer.java | 4 +
.../cloud/OverseerCollectionMessageHandler.java | 3 +-
.../solr/cloud/OverseerTaskProcessor.java | 7 +-
.../cloud/RoutedAliasCreateCollectionCmd.java | 182 +++++++++++++++
.../solr/handler/admin/CollectionsHandler.java | 16 +-
.../apache/solr/request/SolrRequestInfo.java | 12 +-
.../TimeRoutedAliasUpdateProcessor.java | 228 +++++++++++++++++--
.../org/apache/solr/util/TimeZoneUtils.java | 18 ++
.../TimeRoutedAliasUpdateProcessorTest.java | 140 +++++++++---
.../solr/common/params/CollectionParams.java | 1 +
11 files changed, 539 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 61551e0..221d6ad 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -74,6 +74,10 @@ New Features
* SOLR-11201: Implement autoscaling trigger for arbitrary metrics that creates events when
a given metric breaches a threshold (shalin)
+* SOLR-11653: TimeRoutedAlias URP now auto-creates new collections on the fly according to alias metadata
+ rules that sets the time interval for each collection. An internal Overseer command "ROUTEDALIAS_CREATECOLL"
+ was created to facilitate this. (David Smiley)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index d1bb13a..3b9dd28 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -39,6 +39,7 @@ import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -267,6 +268,9 @@ public class Overseer implements SolrCloseable {
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
final String operation = message.getStr(QUEUE_OPERATION);
+ if (operation == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message);
+ }
List<ZkWriteCommand> zkWriteCommands = null;
final Timer.Context timerContext = stats.time(operation);
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index abfecab..426c879 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -219,6 +219,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETE, new DeleteCollectionCmd(this))
.put(CREATEALIAS, new CreateAliasCmd(this))
.put(DELETEALIAS, new DeleteAliasCmd(this))
+ .put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this))
.put(OVERSEERSTATUS, new OverseerStatusCmd(this))
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
@@ -232,7 +233,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
@Override
@SuppressWarnings("unchecked")
public SolrResponse processMessage(ZkNodeProps message, String operation) {
- log.debug("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
+ log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
NamedList results = new NamedList();
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index d014fc4..86e3564 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -257,7 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
- OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue))
@@ -269,6 +268,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ if (operation == null) {
+ log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+ workQueue.remove(head);
+ continue;
+ }
+ OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
if (lock == null) {
log.debug("Exclusivity check failed for [{}]", message.toString());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
new file mode 100644
index 0000000..607588c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
+import org.apache.solr.util.TimeZoneUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA;
+import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_INTERVAL_METADATA;
+
+/**
+ * For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
+ * add a new collection.
+ * If a collection is created, then collection creation info is returned.
+ *
+ * Note: this logic is within an Overseer because we want to leverage the mutual exclusion
+ * property afforded by the lock it obtains on the alias name.
+ * @since 7.3
+ */
+public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName";
+
+ public static final String COLL_METAPREFIX = "collection-create.";
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public RoutedAliasCreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ /* TODO:
+ There are a few classes related to time routed alias processing. We need to share some logic better.
+ */
+
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ //---- PARSE PRIMARY MESSAGE PARAMS
+ // important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us
+ final String aliasName = message.getStr(NAME);
+ // the client believes this is the mostRecent collection name. We assert this if provided.
+ final String ifMostRecentCollName = message.getStr(IF_MOST_RECENT_COLL_NAME); // optional
+
+ // TODO collection param (or intervalDateMath override?), useful for data capped collections
+
+ //---- PARSE ALIAS INFO FROM ZK
+ final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder;
+ final Aliases aliases = aliasesHolder.getAliases();
+ final Map<String, String> aliasMetadata = aliases.getCollectionAliasMetadata(aliasName);
+ if (aliasMetadata == null) {
+ throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
+ }
+
+ String routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
+ if (routeField == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "This command only works on time routed aliases. Expected alias metadata not found.");
+ }
+ String intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
+ TimeZone intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
+
+ //TODO this is ugly; how can we organize the code related to this feature better?
+ final List<Map.Entry<Instant, String>> parsedCollections =
+ TimeRoutedAliasUpdateProcessor.parseCollections(aliasName, aliases, () -> newAliasMustExistException(aliasName));
+
+ //---- GET MOST RECENT COLL
+ final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
+ final Instant mostRecentCollTimestamp = mostRecentEntry.getKey();
+ final String mostRecentCollName = mostRecentEntry.getValue();
+ if (ifMostRecentCollName != null) {
+ if (!mostRecentCollName.equals(ifMostRecentCollName)) {
+ // Possibly due to race conditions in URPs on multiple leaders calling us at the same time
+ String msg = IF_MOST_RECENT_COLL_NAME + " expected " + ifMostRecentCollName + " but it's " + mostRecentCollName;
+ if (parsedCollections.stream().map(Map.Entry::getValue).noneMatch(ifMostRecentCollName::equals)) {
+ msg += ". Furthermore this collection isn't in the list of collections referenced by the alias.";
+ }
+ log.info(msg);
+ results.add("message", msg);
+ return;
+ }
+ } else if (mostRecentCollTimestamp.isAfter(Instant.now())) {
+ final String msg = "Most recent collection is in the future, so we won't create another.";
+ log.info(msg);
+ results.add("message", msg);
+ return;
+ }
+
+ //---- COMPUTE NEXT COLLECTION NAME
+ final Instant nextCollTimestamp = TimeRoutedAliasUpdateProcessor.computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
+ assert nextCollTimestamp.isAfter(mostRecentCollTimestamp);
+ final String createCollName = TimeRoutedAliasUpdateProcessor.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
+
+ //---- CREATE THE COLLECTION
+ // Map alias metadata starting with a prefix to a create-collection API request
+ final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
+ for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
+ if (e.getKey().startsWith(COLL_METAPREFIX)) {
+ createReqParams.set(e.getKey().substring(COLL_METAPREFIX.length()), e.getValue());
+ }
+ }
+ if (createReqParams.get(COLL_CONF) == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "We require an explicit " + COLL_CONF );
+ }
+ createReqParams.set(NAME, createCollName);
+ createReqParams.set("property." + TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, aliasName);
+ // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
+ // Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
+ final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
+ new LocalSolrQueryRequest(null, createReqParams),
+ null,
+ ocmh.overseer.getCoreContainer().getCollectionsHandler());
+ createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
+ // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
+
+ CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
+
+ //TODO delete some of the oldest collection(s) ?
+
+ //---- UPDATE THE ALIAS
+ aliasesHolder.applyModificationAndExportToZk(curAliases -> {
+ final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
+ if (curTargetCollections.contains(createCollName)) {
+ return curAliases;
+ } else {
+ List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
+ // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
+ newTargetCollections.add(createCollName);
+ newTargetCollections.addAll(curTargetCollections);
+ return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
+ }
+ });
+
+ }
+
+ private SolrException newAliasMustExistException(String aliasName) {
+ return new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Alias " + aliasName + " does not exist.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d339f27..74d4764 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -260,16 +260,19 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
- void handleResponse(String operation, ZkNodeProps m,
+ //TODO rename to submitToOverseerRPC
+ public void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_COLLECTION_OP_TIMEOUT);
}
- private SolrResponse handleResponse(String operation, ZkNodeProps m,
+ //TODO rename to submitToOverseerRPC
+ public SolrResponse handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
- long time = System.nanoTime();
-
- if (m.containsKey(ASYNC) && m.get(ASYNC) != null) {
+ if (!m.containsKey(QUEUE_OPERATION)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "missing key " + QUEUE_OPERATION);
+ }
+ if (m.get(ASYNC) != null) {
String asyncId = m.getStr(ASYNC);
@@ -297,6 +300,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
return response;
}
+ long time = System.nanoTime();
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(Utils.toJSON(m), timeout);
@@ -1031,7 +1035,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
}
- private static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response)
+ public static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response)
throws KeeperException, InterruptedException {
if (response.getResponse().get("exception") != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index f759c91..f1a718d 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -101,17 +101,9 @@ public class SolrRequestInfo {
}
/** The TimeZone specified by the request, or null if none was specified */
- public TimeZone getClientTimeZone() {
-
+ public TimeZone getClientTimeZone() {
if (tz == null) {
- String tzStr = req.getParams().get(CommonParams.TZ);
- if (tzStr != null) {
- tz = TimeZoneUtils.getTimeZone(tzStr);
- if (null == tz) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Solr JVM does not support TZ: " + tzStr);
- }
- }
+ tz = TimeZoneUtils.parseTimezone(req.getParams().get(CommonParams.TZ));
}
return tz;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index 9148912..bc242ba 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -19,6 +19,7 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@@ -29,22 +30,34 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.RoutedAliasCreateCollectionCmd;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
@@ -52,14 +65,18 @@ import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
- * Distributes update requests to rolling series of collections partitioned by a timestamp field.
+ * Distributes update requests to a rolling series of collections partitioned by a timestamp field. Issues
+ * requests to create new collections on-demand.
*
* Depends on this core having a special core property that points to the alias name that this collection is a part of.
* And further requires certain metadata on the Alias.
@@ -69,16 +86,15 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
//TODO do we make this more generic to others who want to partition collections using something else?
- // TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid
- // race conditions, remembering that even the lead collection might have multiple instances of this URP
- // (multiple shards or perhaps just multiple streams thus instances of this URP)
-
public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
- public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
+ // alias metadata:
+ public static final String ROUTER_FIELD_METADATA = "router.field";
+ public static final String ROUTER_MAX_FUTURE_TIME_METADATA = "router.maxFutureMs";
+ public static final String ROUTER_INTERVAL_METADATA = "router.interval";
// This format must be compatible with collection name limitations
- private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
+ public static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
@@ -87,18 +103,26 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ // used to limit unnecessary concurrent collection creation requests
+ private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
+
private final String thisCollection;
private final String aliasName;
private final String routeField;
+ private final long maxFutureMs;
+ private final String intervalDateMath;
+ private final TimeZone intervalTimeZone;
- private final SolrCmdDistributor cmdDistrib;
private final ZkController zkController;
+ private final SolrCmdDistributor cmdDistrib;
+ private final CollectionsHandler collHandler;
private final SolrParams outParamsToLeader;
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ //TODO get from "Collection property"
final String timePartitionAliasName = req.getCore().getCoreDescriptor()
.getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
final DistribPhase shardDistribPhase =
@@ -126,12 +150,21 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
CoreContainer cc = core.getCoreContainer();
zkController = cc.getZkController();
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
+ collHandler = cc.getCollectionsHandler();
final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
if (aliasMetadata == null) {
throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
}
routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
+ intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
+ String futureTimeStr = aliasMetadata.get(ROUTER_MAX_FUTURE_TIME_METADATA);
+ if (futureTimeStr != null) {
+ maxFutureMs = Long.parseLong(futureTimeStr);
+ } else {
+ maxFutureMs = TimeUnit.MINUTES.toMillis(10);
+ }
+ intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
// Don't distribute these params; they will be distributed from the local processCommit separately.
@@ -153,11 +186,59 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
- final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
- if (targetCollection == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
- }
+ final Instant routeTimestamp = parseRouteKey(routeValue);
+
+ updateParsedCollectionAliases();
+ String targetCollection;
+ do {
+ targetCollection = findTargetCollectionGivenTimestamp(routeTimestamp);
+
+ if (targetCollection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeTimestamp);
+ }
+
+ // Note: the following rule is tempting but not necessary and is not compatible with
+ // only using this URP when the alias distrib phase is NONE; otherwise a doc may be routed to from a non-recent
+ // collection to the most recent only to then go there directly instead of realizing a new collection is needed.
+ // // If it's going to some other collection (not "this") then break to just send it there
+ // if (!thisCollection.equals(targetCollection)) {
+ // break;
+ // }
+ // Also tempting but not compatible: check that we're the leader, if not then break
+
+ // If the doc goes to the most recent collection then do some checks below, otherwise break the loop.
+ final Instant mostRecentCollTimestamp = parsedCollectionsDesc.get(0).getKey();
+ final String mostRecentCollName = parsedCollectionsDesc.get(0).getValue();
+ if (!mostRecentCollName.equals(targetCollection)) {
+ break;
+ }
+
+ // Check the doc isn't too far in the future
+ final Instant maxFutureTime = Instant.now().plusMillis(maxFutureMs);
+ if (routeTimestamp.isAfter(maxFutureTime)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "The document's time routed key of " + routeValue + " is too far in the future given " +
+ ROUTER_MAX_FUTURE_TIME_METADATA + "=" + maxFutureMs);
+ }
+
+ // Create a new collection?
+ final Instant nextCollTimestamp = computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
+ if (routeTimestamp.isBefore(nextCollTimestamp)) {
+ break; // thus we don't need another collection
+ }
+
+ createCollectionAfter(mostRecentCollName); // *should* throw if fails for some reason but...
+ final boolean updated = updateParsedCollectionAliases();
+ if (!updated) { // thus we didn't make progress...
+ // this is not expected, even in known failure cases, but we check just in case
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "We need to create a new time routed collection but for unknown reasons were unable to do so.");
+ }
+ // then retry the loop ...
+ } while(true);
+ assert targetCollection != null;
+
if (thisCollection.equals(targetCollection)) {
// pass on through; we've reached the right collection
super.processAdd(cmd);
@@ -168,7 +249,23 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
}
}
- protected String findTargetCollectionGivenRouteKey(Object routeKey) {
+ /** Computes the timestamp of the next collection given the timestamp of the one before. */
+ public static Instant computeNextCollTimestamp(Instant fromTimestamp, String intervalDateMath, TimeZone intervalTimeZone) {
+ //TODO overload DateMathParser.parseMath to take tz and "now"
+ final DateMathParser dateMathParser = new DateMathParser(intervalTimeZone);
+ dateMathParser.setNow(Date.from(fromTimestamp));
+ final Instant nextCollTimestamp;
+ try {
+ nextCollTimestamp = dateMathParser.parseMath(intervalDateMath).toInstant();
+ } catch (ParseException e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Invalid Date Math String:'" + intervalDateMath +'\'', e);
+ }
+ assert nextCollTimestamp.isAfter(fromTimestamp);
+ return nextCollTimestamp;
+ }
+
+ private Instant parseRouteKey(Object routeKey) {
final Instant docTimestamp;
if (routeKey instanceof Instant) {
docTimestamp = (Instant) routeKey;
@@ -179,15 +276,30 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
}
+ return docTimestamp;
+ }
+
+ /**
+ * Ensure {@link #parsedCollectionsAliases} is up to date. If it was modified, return true.
+ * Note that this will return true if some other alias was modified or if metadata was modified. These
+ * are spurious and the caller should be written to be tolerant of no material changes.
+ */
+ private boolean updateParsedCollectionAliases() {
final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
if (this.parsedCollectionsAliases != aliases) {
if (this.parsedCollectionsAliases != null) {
- log.info("Observing possibly updated alias {}", aliasName);
+ log.debug("Observing possibly updated alias: {}", aliasName);
}
- this.parsedCollectionsDesc = doParseCollections(aliases);
+ this.parsedCollectionsDesc = parseCollections(aliasName, aliases, this::newAliasMustExistException);
this.parsedCollectionsAliases = aliases;
+ return true;
}
- // iterates in reverse chronological order
+ return false;
+ }
+
+ /** Given the route key, finds the collection. Returns null if too old to go in last one. */
+ private String findTargetCollectionGivenTimestamp(Instant docTimestamp) {
+ // Lookup targetCollection given route key. Iterates in reverse chronological order.
// We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
Instant colStartTime = entry.getKey();
@@ -195,16 +307,77 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
return entry.getValue(); //found it
}
}
- return null;
+ return null; //not found
+ }
+
+ private void createCollectionAfter(String mostRecentCollName) {
+ // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection
+ // and update the alias contingent on the most recent collection name being the same as
+ // what we think so here, otherwise it will return (without error).
+ // To avoid needless concurrent communication with the Overseer from this JVM, we
+ // maintain a Semaphore from an alias name keyed ConcurrentHashMap.
+ // Alternatively a Lock or CountDownLatch could have been used but they didn't seem
+ // to make it any easier.
+
+ final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(aliasName, n -> new Semaphore(1));
+ if (semaphore.tryAcquire()) {
+ try {
+ final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
+ Map<String, Object> msg = new HashMap<>();
+ msg.put(Overseer.QUEUE_OPERATION, operation);
+ msg.put(CollectionParams.NAME, aliasName);
+ msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
+ SolrQueryResponse rsp = new SolrQueryResponse();
+ try {
+ this.collHandler.handleResponse(
+ operation,
+ new ZkNodeProps(msg),
+ rsp);
+ if (rsp.getException() != null) {
+ throw rsp.getException();
+ } // otherwise don't care about the response. It's possible no collection was created because
+ // of a race and that's okay... we'll ultimately retry any way.
+
+ // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
+ // not yet know about the new alias (thus won't see the newly added collection to it), and we might think
+ // we failed.
+ zkController.getZkStateReader().aliasesHolder.update();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ } finally {
+ semaphore.release(); // to signal we're done to anyone waiting on it
+ }
+
+ } else {
+ // Failed to acquire permit because another URP instance on this JVM is creating a collection.
+ // So wait till it's available
+ log.debug("Collection creation is already in progress so we'll wait then try again.");
+ try {
+ if (semaphore.tryAcquire(DEFAULT_COLLECTION_OP_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ semaphore.release(); // we don't actually want a permit so give it back
+ // return to continue...
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Waited too long for another update thread to be done with collection creation.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Interrupted waiting on collection creation.", e); // if we were interrupted, give up.
+ }
+ }
}
- /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
- private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
+ /** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
+ public static List<Map.Entry<Instant,String>> parseCollections(String aliasName, Aliases aliases, Supplier<SolrException> aliasNotExist) {
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
if (collections == null) {
- throw newAliasMustExistException();
+ throw aliasNotExist.get();
}
- // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
+ // note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later
List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
for (String collection : collections) {
Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
@@ -225,6 +398,17 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
}
+ public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) {
+ String nextCollName = TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.format(timestamp);
+ for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours
+ if (nextCollName.endsWith("_00")) {
+ nextCollName = nextCollName.substring(0, nextCollName.length()-3);
+ }
+ }
+ assert TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp);
+ return aliasName + "_" + nextCollName;
+ }
+
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java b/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java
index 9d11f81..0600a83 100644
--- a/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java
+++ b/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
+import org.apache.solr.common.SolrException;
+
/**
* Simple utilities for working with TimeZones
* @see java.util.TimeZone
@@ -82,4 +84,20 @@ public final class TimeZoneUtils {
private static Pattern CUSTOM_ID_REGEX = Pattern.compile("GMT(?:\\+|\\-)(\\d{1,2})(?::?(\\d{2}))?");
+ /**
+ * Parse the specified timezone ID. If null input then return UTC. If we can't resolve it then
+ * throw an exception.
+ */
+ public static TimeZone parseTimezone(String tzStr) {
+ if (tzStr != null) {
+ TimeZone tz = getTimeZone(tzStr);
+ if (null == tz) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Solr JVM does not support TZ: " + tzStr);
+ }
+ return tz;
+ } else {
+ return DateMathParser.UTC; //TODO move to TimeZoneUtils
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index f7f200f..db4b877 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -19,16 +19,20 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
-import java.util.function.UnaryOperator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
@@ -39,12 +43,14 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -52,7 +58,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
static final String configName = "timeConfig";
static final String alias = "myalias";
- static final String timeField = "timestamp";
+ static final String timeField = "timestamp_dt";
static final String intField = "integer_i";
static SolrClient solrClient;
@@ -71,6 +77,14 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
IOUtils.close(solrClient);
}
+ //TODO this is necessary when -Dtests.iters but why? Some other tests aren't affected
+ @Before
+ public void doBefore() throws Exception {
+ for (String col : CollectionAdminRequest.listCollections(solrClient)) {
+ CollectionAdminRequest.deleteCollection(col).process(solrClient);
+ }
+ }
+
@Test
public void test() throws Exception {
// First create a config using REST API. To do this, we create a collection with the name of the eventual config.
@@ -91,18 +105,21 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
" 'fieldName':'" + intField + "'" +
" }," +
"}").build()));
+ // only sometimes test with "tolerant" URP
+ final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
.withMethod(SolrRequest.METHOD.POST)
.withPayload("{" +
" 'set' : {" +
- " '_UPDATE' : {'processor':'inc,tolerant'}" +
+ " '_UPDATE' : {'processor':'" + urpNames + "'}" +
" }" +
"}").build()));
CollectionAdminRequest.deleteCollection(configName).process(solrClient);
// start with one collection and an alias for it
final String col23rd = alias + "_2017-10-23";
- CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
+ CollectionAdminRequest.createCollection(col23rd, configName, 2, 2)
+ .setMaxShardsPerNode(2)
.withProperty(TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
.process(solrClient);
@@ -112,30 +129,29 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
//TODO use SOLR-11617 client API to set alias metadata
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
- UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
- zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
+ zkStateReader.aliasesHolder.applyModificationAndExportToZk(a ->
+ a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField)
+ .cloneWithCollectionAliasMetadata(alias, "collection-create.collection.configName", configName)
+ .cloneWithCollectionAliasMetadata(alias, "collection-create.numShards", "1")
+ .cloneWithCollectionAliasMetadata(alias, "collection-create.replicationFactor", "1")
+ .cloneWithCollectionAliasMetadata(alias, "router.interval", "+1DAY"));
// now we index a document
- solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
+ assertUpdateResponse(solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z"))));
solrClient.commit(alias);
//assertDocRoutedToCol(lastDocId, col23rd);
- assertInvariants();
+ assertInvariants(col23rd);
- // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
- try {
- final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
- final Object errors = resp.getResponseHeader().get("errors");
- assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
- } catch (SolrException e) {
- assertTrue(e.getMessage().contains("couldn't be routed"));
- }
- numDocsDeletedOrFailed++;
+ // a document that is too old
+ testFailedDocument(Instant.parse("2017-10-01T00:00:00Z"), "couldn't be routed");
+
+ // a document which is too far into the future
+ testFailedDocument(Instant.now().plus(30, ChronoUnit.MINUTES), "too far in the future");
// add another collection, add to alias (soonest comes first)
final String col24th = alias + "_2017-10-24";
- CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now
- .setMaxShardsPerNode(2)
+ CollectionAdminRequest.createCollection(col24th, configName, 1, 1) // more shards and replicas now
.withProperty("timePartitionAliasName", alias)
.process(solrClient);
CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
@@ -146,7 +162,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
newDoc(Instant.parse("2017-10-24T01:00:00Z")),
newDoc(Instant.parse("2017-10-24T02:00:00Z"))
);
- assertInvariants();
+ assertInvariants(col24th, col23rd);
// assert that the IncrementURP has updated all '0' to '1'
final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
@@ -154,16 +170,45 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
//delete a random document id; ensure we don't find it
int idToDelete = 1 + random().nextInt(lastDocId);
- if (idToDelete == 2) { // #2 didn't make it
- idToDelete++;
+ if (idToDelete == 2 || idToDelete == 3) { // these didn't make it
+ idToDelete = 4;
}
- solrClient.deleteById(alias, Integer.toString(idToDelete));
- solrClient.commit(alias);
+ assertUpdateResponse(solrClient.deleteById(alias, Integer.toString(idToDelete)));
+ assertUpdateResponse(solrClient.commit(alias));
numDocsDeletedOrFailed++;
- assertInvariants();
+ assertInvariants(col24th, col23rd);
+
+ // delete the Oct23rd (save memory)...
+ // make sure we track that we are effectively deleting docs there
+ numDocsDeletedOrFailed += solrClient.query(col23rd, params("q", "*:*", "rows", "0")).getResults().getNumFound();
+ // remove from alias
+ CollectionAdminRequest.createAlias(alias, col24th).process(solrClient);
+ // delete the collection
+ CollectionAdminRequest.deleteCollection(col23rd).process(solrClient);
+
+ // now we're going to add documents that will trigger more collections to be created
+ // for 25th & 26th
+ addDocsAndCommit(
+ newDoc(Instant.parse("2017-10-24T03:00:00Z")),
+ newDoc(Instant.parse("2017-10-25T04:00:00Z")),
+ newDoc(Instant.parse("2017-10-26T05:00:00Z"))
+ );
+ assertInvariants(alias + "_2017-10-26", alias + "_2017-10-25", col24th);
}
- private void checkNoError(NamedList<Object> response) {
+ private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
+ try {
+ final UpdateResponse resp = solrClient.add(alias, newDoc(timestamp));
+ // if we have a TolerantUpdateProcessor then we see it there)
+ final Object errors = resp.getResponseHeader().get("errors"); // Tolerant URP
+ assertTrue(errors != null && errors.toString().contains(errorMsg));
+ } catch (SolrException e) {
+ assertTrue(e.getMessage().contains(errorMsg));
+ }
+ numDocsDeletedOrFailed++;
+ }
+
+ private void checkNoError(NamedList<Object> response) { //TODO rename
Object errors = response.get("errorMessages");
assertNull("" + errors, errors);
}
@@ -171,7 +216,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
/** Adds these documents and commits, returning when they are committed.
* We randomly go about this in different ways. */
private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
- // we assume these are not old docs!
+ // we assume all docs will be added (none too old/new to cause exception)
+ Collections.shuffle(Arrays.asList(solrInputDocuments), random());
// this is a list of the collections & the alias name. Use to pick randomly where to send.
// (it doesn't matter where we send docs since the alias is honored at the URP level)
@@ -182,15 +228,27 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
int numDocsBefore = queryNumDocs();
if (random().nextBoolean()) {
- // send in separate requests
- for (SolrInputDocument solrInputDocument : solrInputDocuments) {
- String col = collections.get(random().nextInt(collections.size()));
- solrClient.add(col, solrInputDocument, commitWithin);
+ // Send in separate threads. Choose random collection & solrClient
+ try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
+ ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
+ new DefaultSolrThreadFactory(getTestName()));
+ List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
+ for (SolrInputDocument solrInputDocument : solrInputDocuments) {
+ String col = collections.get(random().nextInt(collections.size()));
+ futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
+ }
+ for (Future<UpdateResponse> future : futures) {
+ assertUpdateResponse(future.get());
+ }
+ // at this point there shouldn't be any tasks running
+ assertEquals(0, exec.shutdownNow().size());
}
} else {
// send in a batch.
String col = collections.get(random().nextInt(collections.size()));
- solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
+ try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
+ assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
+ }
}
String col = collections.get(random().nextInt(collections.size()));
if (commitWithin == -1) {
@@ -210,21 +268,30 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
}
}
+ private void assertUpdateResponse(UpdateResponse rsp) {
+ // use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
+ List errors = (List) rsp.getResponseHeader().get("errors");
+ assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
+ }
+
private int queryNumDocs() throws SolrServerException, IOException {
return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
}
- private void assertInvariants() throws IOException, SolrServerException {
+ private void assertInvariants(String... expectedColls) throws IOException, SolrServerException {
final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assert !cols.isEmpty();
+ assertArrayEquals("expected reverse sorted",
+ cols.stream().sorted(Collections.reverseOrder()).toArray(),
+ cols.toArray());
+
int totalNumFound = 0;
Instant colEndInstant = null; // exclusive end
- for (String col : cols) {
+ for (String col : cols) { // ASSUMPTION: reverse sorted order
final Instant colStartInstant = TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, col);
- //TODO do this in parallel threads
final QueryResponse colStatsResp = solrClient.query(col, params(
"q", "*:*",
"rows", "0",
@@ -243,6 +310,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
colEndInstant = colStartInstant; // next older segment will max out at our current start time
}
assertEquals(expectNumFound, totalNumFound);
+ assertArrayEquals(expectedColls, cols.toArray());
}
private SolrInputDocument newDoc(Instant timestamp) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/925733d1/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index 77dd454..9d5fc36 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -78,6 +78,7 @@ public interface CollectionParams {
CREATEALIAS(true, LockLevel.COLLECTION),
DELETEALIAS(true, LockLevel.COLLECTION),
LISTALIASES(false, LockLevel.NONE),
+ ROUTEDALIAS_CREATECOLL(true, LockLevel.COLLECTION),
SPLITSHARD(true, LockLevel.SHARD),
DELETESHARD(true, LockLevel.SHARD),
CREATESHARD(true, LockLevel.COLLECTION),