You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2017/12/01 04:25:23 UTC
lucene-solr:master: SOLR-11542: Rename TimePartitionedUpdateProcessor
to TimeRoutedAliasUpdateProcessor
Repository: lucene-solr
Updated Branches:
refs/heads/master 9c0ca9b46 -> 7877f5a51
SOLR-11542: Rename TimePartitionedUpdateProcessor to TimeRoutedAliasUpdateProcessor
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7877f5a5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7877f5a5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7877f5a5
Branch: refs/heads/master
Commit: 7877f5a511a60e44f2dabd45ac1d6f84626b1161
Parents: 9c0ca9b
Author: David Smiley <ds...@apache.org>
Authored: Thu Nov 30 23:25:14 2017 -0500
Committer: David Smiley <ds...@apache.org>
Committed: Thu Nov 30 23:25:14 2017 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +-
.../DistributedUpdateProcessorFactory.java | 2 +-
.../TimePartitionedUpdateProcessor.java | 294 -------------------
.../TimeRoutedAliasUpdateProcessor.java | 294 +++++++++++++++++++
.../TimePartitionedUpdateProcessorTest.java | 275 -----------------
.../TimeRoutedAliasUpdateProcessorTest.java | 275 +++++++++++++++++
6 files changed, 572 insertions(+), 572 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 737ef60..849a49b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -91,11 +91,11 @@ New Features
* SOLR-11487: Collection Aliases may now have metadata (currently an internal feature).
(Gus Heck, David Smiley)
-* SOLR-11542: New TimePartitionedUpdateProcessor URP that routes documents to another collection
+* SOLR-11542: New TimeRoutedAliasUpdateProcessor URP that routes documents to another collection
in the same Alias defined set based on a time field (currently an internal feature).
(David Smiley)
-* SOLR-9743: A new UTILIZENODE command (noble)
+* SOLR-9743: A new UTILIZENODE command (noble)
* SOLR-11202: Implement a set-property command for AutoScaling API. (ab, shalin)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index c706e0c..1930c08 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -50,7 +50,7 @@ public class DistributedUpdateProcessorFactory
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
// note: will sometimes return DURP (no overhead) instead of wrapping
- return TimePartitionedUpdateProcessor.wrap(req, rsp,
+ return TimeRoutedAliasUpdateProcessor.wrap(req, rsp,
new DistributedUpdateProcessor(req, rsp, next));
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java
deleted file mode 100644
index e485a3d..0000000
--- a/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update.processor;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.SolrCmdDistributor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
-/**
- * Distributes update requests to rolling series of collections partitioned by a timestamp field.
- *
- * Depends on this core having a special core property that points to the alias name that this collection is a part of.
- * And further requires certain metadata on the Alias.
- *
- * @since 7.2.0
- */
-public class TimePartitionedUpdateProcessor extends UpdateRequestProcessor {
- //TODO do we make this more generic to others who want to partition collections using something else?
-
- // TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid
- // race conditions, remembering that even the lead collection might have multiple instances of this URP
- // (multiple shards or perhaps just multiple streams thus instances of this URP)
-
- public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
- public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
- public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
-
- // This format must be compatible with collection name limitations
- private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
- .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
- .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
- .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
- .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
- .toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC);
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final String thisCollection;
- private final String aliasName;
- private final String routeField;
-
- private final SolrCmdDistributor cmdDistrib;
- private final ZkController zkController;
- private final SolrParams outParamsToLeader;
-
- private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
- private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
-
- public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
- final String timePartitionAliasName = req.getCore().getCoreDescriptor()
- .getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
- final DistribPhase shardDistribPhase =
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
- final DistribPhase aliasDistribPhase =
- DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
- if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
- // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
- // TODO this may eventually not be true but at the moment it is
- // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
- return next;
- } else {
- return new TimePartitionedUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase);
- }
- }
-
- protected TimePartitionedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next,
- String aliasName,
- DistribPhase aliasDistribPhase) {
- super(next);
- assert aliasDistribPhase == DistribPhase.NONE;
- final SolrCore core = req.getCore();
- this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- this.aliasName = aliasName;
- CoreContainer cc = core.getCoreContainer();
- zkController = cc.getZkController();
- cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
-
- final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
- if (aliasMetadata == null) {
- throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
- }
- routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
-
- ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
- // Don't distribute these params; they will be distributed from the local processCommit separately.
- // (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
- outParams.remove(UpdateParams.OPTIMIZE);
- outParams.remove(UpdateParams.COMMIT);
- outParams.remove(UpdateParams.SOFT_COMMIT);
- outParams.remove(UpdateParams.PREPARE_COMMIT);
- outParams.remove(UpdateParams.ROLLBACK);
- // Add these...
- // Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
- outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
- // Signal this is a distributed search from this URP (see #wrap())
- outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
- outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
- outParamsToLeader = outParams;
- }
-
- @Override
- public void processAdd(AddUpdateCommand cmd) throws IOException {
- final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
- final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
- if (targetCollection == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
- }
- if (thisCollection.equals(targetCollection)) {
- // pass on through; we've reached the right collection
- super.processAdd(cmd);
- } else {
- // send to the right collection
- SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection);
- cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
- }
- }
-
- protected String findTargetCollectionGivenRouteKey(Object routeKey) {
- final Instant docTimestamp;
- if (routeKey instanceof Instant) {
- docTimestamp = (Instant) routeKey;
- } else if (routeKey instanceof Date) {
- docTimestamp = ((Date)routeKey).toInstant();
- } else if (routeKey instanceof CharSequence) {
- docTimestamp = Instant.parse((CharSequence)routeKey);
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
- }
- final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
- if (this.parsedCollectionsAliases != aliases) {
- if (this.parsedCollectionsAliases != null) {
- log.info("Observing possibly updated alias {}", aliasName);
- }
- this.parsedCollectionsDesc = doParseCollections(aliases);
- this.parsedCollectionsAliases = aliases;
- }
- // iterates in reverse chronological order
- // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
- for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
- Instant colStartTime = entry.getKey();
- if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime
- return entry.getValue(); //found it
- }
- }
- return null;
- }
-
- /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
- private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
- final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
- if (collections == null) {
- throw newAliasMustExistException();
- }
- // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
- List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
- for (String collection : collections) {
- Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
- result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
- }
- result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
- return result;
- }
-
- private SolrException newAliasMustExistException() {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." +
- " You cannot write to this unless the alias exists.");
- }
-
- static Instant parseInstantFromCollectionName(String aliasName, String collection) {
- final String dateTimePart = collection.substring(aliasName.length() + 1);
- return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
- }
-
- @Override
- public void processDelete(DeleteUpdateCommand cmd) throws IOException {
- final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
- cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
- }
-
- @Override
- public void processCommit(CommitUpdateCommand cmd) throws IOException {
- final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
- cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
- cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't.
- }
-
-// Not supported by SolrCmdDistributor and is sketchy any way
-// @Override
-// public void processRollback(RollbackUpdateCommand cmd) throws IOException {
-// }
-
- @Override
- public void finish() throws IOException {
- try {
- cmdDistrib.finish();
- final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
- if (!errors.isEmpty()) {
- throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
- }
- } finally {
- super.finish();
- }
- }
-
- @Override
- protected void doClose() {
- try {
- cmdDistrib.close();
- } finally {
- super.doClose();
- }
- }
-
- private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
- final Aliases aliases = zkController.getZkStateReader().getAliases();
- List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
- if (collections == null) {
- throw newAliasMustExistException();
- }
- return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
- }
-
- private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
- //TODO consider router to get the right slice. Refactor common code in CloudSolrClient & DistributedUrp
- final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
- if (activeSlices.isEmpty()) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
- }
- final Slice slice = activeSlices.iterator().next();
- //TODO when should we do StdNode vs RetryNode?
- final Replica leader = slice.getLeader();
- if (leader == null) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
- }
- return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
- collection, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
new file mode 100644
index 0000000..9148912
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * Distributes update requests to rolling series of collections partitioned by a timestamp field.
+ *
+ * Depends on this core having a special core property that points to the alias name that this collection is a part of.
+ * And further requires certain metadata on the Alias.
+ *
+ * @since 7.2.0
+ */
+public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
+ //TODO do we make this more generic to others who want to partition collections using something else?
+
+ // TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid
+ // race conditions, remembering that even the lead collection might have multiple instances of this URP
+ // (multiple shards or perhaps just multiple streams thus instances of this URP)
+
+ public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
+ public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
+ public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
+
+ // This format must be compatible with collection name limitations
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
+ .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+ .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+ .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+ .toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC);
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final String thisCollection;
+ private final String aliasName;
+ private final String routeField;
+
+ private final SolrCmdDistributor cmdDistrib;
+ private final ZkController zkController;
+ private final SolrParams outParamsToLeader;
+
+ private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
+ private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
+
+ public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ final String timePartitionAliasName = req.getCore().getCoreDescriptor()
+ .getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
+ final DistribPhase shardDistribPhase =
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+ final DistribPhase aliasDistribPhase =
+ DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
+ if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
+ // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
+ // TODO this may eventually not be true but at the moment it is
+ // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
+ return next;
+ } else {
+ return new TimeRoutedAliasUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase);
+ }
+ }
+
+ protected TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next,
+ String aliasName,
+ DistribPhase aliasDistribPhase) {
+ super(next);
+ assert aliasDistribPhase == DistribPhase.NONE;
+ final SolrCore core = req.getCore();
+ this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ this.aliasName = aliasName;
+ CoreContainer cc = core.getCoreContainer();
+ zkController = cc.getZkController();
+ cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
+
+ final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
+ if (aliasMetadata == null) {
+ throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
+ }
+ routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
+
+ ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
+ // Don't distribute these params; they will be distributed from the local processCommit separately.
+ // (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
+ outParams.remove(UpdateParams.OPTIMIZE);
+ outParams.remove(UpdateParams.COMMIT);
+ outParams.remove(UpdateParams.SOFT_COMMIT);
+ outParams.remove(UpdateParams.PREPARE_COMMIT);
+ outParams.remove(UpdateParams.ROLLBACK);
+ // Add these...
+ // Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
+ outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
+ // Signal this is a distributed search from this URP (see #wrap())
+ outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
+ outParamsToLeader = outParams;
+ }
+
+ @Override
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
+ final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
+ if (targetCollection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
+ }
+ if (thisCollection.equals(targetCollection)) {
+ // pass on through; we've reached the right collection
+ super.processAdd(cmd);
+ } else {
+ // send to the right collection
+ SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection);
+ cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
+ }
+ }
+
+ protected String findTargetCollectionGivenRouteKey(Object routeKey) {
+ final Instant docTimestamp;
+ if (routeKey instanceof Instant) {
+ docTimestamp = (Instant) routeKey;
+ } else if (routeKey instanceof Date) {
+ docTimestamp = ((Date)routeKey).toInstant();
+ } else if (routeKey instanceof CharSequence) {
+ docTimestamp = Instant.parse((CharSequence)routeKey);
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
+ }
+ final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
+ if (this.parsedCollectionsAliases != aliases) {
+ if (this.parsedCollectionsAliases != null) {
+ log.info("Observing possibly updated alias {}", aliasName);
+ }
+ this.parsedCollectionsDesc = doParseCollections(aliases);
+ this.parsedCollectionsAliases = aliases;
+ }
+ // iterates in reverse chronological order
+ // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
+ for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
+ Instant colStartTime = entry.getKey();
+ if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime
+ return entry.getValue(); //found it
+ }
+ }
+ return null;
+ }
+
+ /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
+ private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
+ final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
+ if (collections == null) {
+ throw newAliasMustExistException();
+ }
+ // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
+ List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
+ for (String collection : collections) {
+ Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
+ result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
+ }
+ result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
+ return result;
+ }
+
+ private SolrException newAliasMustExistException() {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." +
+ " You cannot write to this unless the alias exists.");
+ }
+
+ static Instant parseInstantFromCollectionName(String aliasName, String collection) {
+ final String dateTimePart = collection.substring(aliasName.length() + 1);
+ return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
+ }
+
+ @Override
+ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+ cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+ }
+
+ @Override
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+ cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+ cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't.
+ }
+
+// Not supported by SolrCmdDistributor and is sketchy any way
+// @Override
+// public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+// }
+
+ @Override
+ public void finish() throws IOException {
+ try {
+ cmdDistrib.finish();
+ final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+ if (!errors.isEmpty()) {
+ throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
+ }
+ } finally {
+ super.finish();
+ }
+ }
+
+ @Override
+ protected void doClose() {
+ try {
+ cmdDistrib.close();
+ } finally {
+ super.doClose();
+ }
+ }
+
+ private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
+ final Aliases aliases = zkController.getZkStateReader().getAliases();
+ List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
+ if (collections == null) {
+ throw newAliasMustExistException();
+ }
+ return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
+ }
+
+ private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
+ //TODO consider router to get the right slice. Refactor common code in CloudSolrClient & DistributedUrp
+ final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
+ if (activeSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
+ }
+ final Slice slice = activeSlices.iterator().next();
+ //TODO when should we do StdNode vs RetryNode?
+ final Replica leader = slice.getLeader();
+ if (leader == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
+ }
+ return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
+ collection, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java
deleted file mode 100644
index eca6fbb..0000000
--- a/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update.processor;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.function.UnaryOperator;
-
-import org.apache.lucene.util.IOUtils;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.client.solrj.response.FieldStatsInfo;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TimePartitionedUpdateProcessorTest extends SolrCloudTestCase {
-
- static final String configName = "timeConfig";
- static final String alias = "myalias";
- static final String timeField = "timestamp";
- static final String intField = "integer_i";
-
- static SolrClient solrClient;
-
- private int lastDocId = 0;
- private int numDocsDeletedOrFailed = 0;
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- configureCluster(2).configure();
- solrClient = getCloudSolrClient(cluster);
- }
-
- @AfterClass
- public static void finish() throws Exception {
- IOUtils.close(solrClient);
- }
-
- @Test
- public void test() throws Exception {
- // First create a config using REST API. To do this, we create a collection with the name of the eventual config.
- // We configure it, and ultimately delete it the collection, leaving a config with the same name behind.
- // Then when we create the "real" collections referencing this config.
- CollectionAdminRequest.createCollection(configName, 1, 1).process(solrClient);
- // manipulate the config...
- checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
- .withMethod(SolrRequest.METHOD.POST)
- .withPayload("{" +
- " 'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven
- " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
- " 'add-updateprocessor' : {" +
- " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
- " }," +
- " 'add-updateprocessor' : {" + // for testing
- " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
- " 'fieldName':'" + intField + "'" +
- " }," +
- "}").build()));
- checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
- .withMethod(SolrRequest.METHOD.POST)
- .withPayload("{" +
- " 'set' : {" +
- " '_UPDATE' : {'processor':'inc,tolerant'}" +
- " }" +
- "}").build()));
- CollectionAdminRequest.deleteCollection(configName).process(solrClient);
-
- // start with one collection and an alias for it
- final String col23rd = alias + "_2017-10-23";
- CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
- .withProperty(TimePartitionedUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
- .process(solrClient);
-
- assertEquals("We only expect 2 configSets",
- Arrays.asList("_default", configName), new ConfigSetAdminRequest.List().process(solrClient).getConfigSets());
-
- CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
- //TODO use SOLR-11617 client API to set alias metadata
- final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
- UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimePartitionedUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
- zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
-
-
- // now we index a document
- solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
- solrClient.commit(alias);
- //assertDocRoutedToCol(lastDocId, col23rd);
- assertInvariants();
-
- // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
- try {
- final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
- final Object errors = resp.getResponseHeader().get("errors");
- assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
- } catch (SolrException e) {
- assertTrue(e.getMessage().contains("couldn't be routed"));
- }
- numDocsDeletedOrFailed++;
-
- // add another collection, add to alias (soonest comes first)
- final String col24th = alias + "_2017-10-24";
- CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now
- .setMaxShardsPerNode(2)
- .withProperty("timePartitionAliasName", alias)
- .process(solrClient);
- CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
-
- // index 3 documents in a random fashion
- addDocsAndCommit(
- newDoc(Instant.parse("2017-10-23T00:00:00Z")),
- newDoc(Instant.parse("2017-10-24T01:00:00Z")),
- newDoc(Instant.parse("2017-10-24T02:00:00Z"))
- );
- assertInvariants();
-
- // assert that the IncrementURP has updated all '0' to '1'
- final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
- assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
-
- //delete a random document id; ensure we don't find it
- int idToDelete = 1 + random().nextInt(lastDocId);
- if (idToDelete == 2) { // #2 didn't make it
- idToDelete++;
- }
- solrClient.deleteById(alias, Integer.toString(idToDelete));
- solrClient.commit(alias);
- numDocsDeletedOrFailed++;
- assertInvariants();
- }
-
- private void checkNoError(NamedList<Object> response) {
- Object errors = response.get("errorMessages");
- assertNull("" + errors, errors);
- }
-
- /** Adds these documents and commits, returning when they are committed.
- * We randomly go about this in different ways. */
- private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
- // we assume these are not old docs!
-
- // this is a list of the collections & the alias name. Use to pick randomly where to send.
- // (it doesn't matter where we send docs since the alias is honored at the URP level)
- List<String> collections = new ArrayList<>();
- collections.add(alias);
- collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
-
- int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
- int numDocsBefore = queryNumDocs();
- if (random().nextBoolean()) {
- // send in separate requests
- for (SolrInputDocument solrInputDocument : solrInputDocuments) {
- String col = collections.get(random().nextInt(collections.size()));
- solrClient.add(col, solrInputDocument, commitWithin);
- }
- } else {
- // send in a batch.
- String col = collections.get(random().nextInt(collections.size()));
- solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
- }
- String col = collections.get(random().nextInt(collections.size()));
- if (commitWithin == -1) {
- solrClient.commit(col);
- } else {
- // check that it all got committed eventually
- int numDocs = queryNumDocs();
- if (numDocs == numDocsBefore + solrInputDocuments.length) {
- System.err.println("Docs committed sooner than expected. Bug or slow test env?");
- return;
- }
- // wait until it's committed, plus some play time for commit to become visible
- Thread.sleep(commitWithin + 200);
- numDocs = queryNumDocs();
- assertEquals("not committed. Bug or a slow test?",
- numDocsBefore + solrInputDocuments.length, numDocs);
- }
- }
-
- private int queryNumDocs() throws SolrServerException, IOException {
- return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
- }
-
- private void assertInvariants() throws IOException, SolrServerException {
- final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
-
- final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
- assert !cols.isEmpty();
-
- int totalNumFound = 0;
- Instant colEndInstant = null; // exclusive end
- for (String col : cols) {
- final Instant colStartInstant = TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, col);
- //TODO do this in parallel threads
- final QueryResponse colStatsResp = solrClient.query(col, params(
- "q", "*:*",
- "rows", "0",
- "stats", "true",
- "stats.field", timeField));
- long numFound = colStatsResp.getResults().getNumFound();
- if (numFound > 0) {
- totalNumFound += numFound;
- final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
- assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
- if (colEndInstant != null) {
- assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
- }
- }
-
- colEndInstant = colStartInstant; // next older segment will max out at our current start time
- }
- assertEquals(expectNumFound, totalNumFound);
- }
-
- private SolrInputDocument newDoc(Instant timestamp) {
- return sdoc("id", Integer.toString(++lastDocId),
- timeField, timestamp.toString(),
- intField, "0"); // always 0
- }
-
- @Test
- public void testParse() {
- assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
- TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
- assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
- TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
- assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
- TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
- assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
- TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
- }
-
- public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
-
- @Override
- public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
- return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
- (src) -> Integer.valueOf(src.toString()) + 1);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7877f5a5/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
new file mode 100644
index 0000000..f7f200f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.FieldStatsInfo;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
+
+ static final String configName = "timeConfig";
+ static final String alias = "myalias";
+ static final String timeField = "timestamp";
+ static final String intField = "integer_i";
+
+ static SolrClient solrClient;
+
+ private int lastDocId = 0;
+ private int numDocsDeletedOrFailed = 0;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2).configure();
+ solrClient = getCloudSolrClient(cluster);
+ }
+
+ @AfterClass
+ public static void finish() throws Exception {
+ IOUtils.close(solrClient);
+ }
+
+ @Test
+ public void test() throws Exception {
+ // First create a config using REST API. To do this, we create a collection with the name of the eventual config.
+ // We configure it, and ultimately delete it the collection, leaving a config with the same name behind.
+ // Then when we create the "real" collections referencing this config.
+ CollectionAdminRequest.createCollection(configName, 1, 1).process(solrClient);
+ // manipulate the config...
+ checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
+ .withMethod(SolrRequest.METHOD.POST)
+ .withPayload("{" +
+ " 'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven
+ " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
+ " 'add-updateprocessor' : {" +
+ " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
+ " }," +
+ " 'add-updateprocessor' : {" + // for testing
+ " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
+ " 'fieldName':'" + intField + "'" +
+ " }," +
+ "}").build()));
+ checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
+ .withMethod(SolrRequest.METHOD.POST)
+ .withPayload("{" +
+ " 'set' : {" +
+ " '_UPDATE' : {'processor':'inc,tolerant'}" +
+ " }" +
+ "}").build()));
+ CollectionAdminRequest.deleteCollection(configName).process(solrClient);
+
+ // start with one collection and an alias for it
+ final String col23rd = alias + "_2017-10-23";
+ CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
+ .withProperty(TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
+ .process(solrClient);
+
+ assertEquals("We only expect 2 configSets",
+ Arrays.asList("_default", configName), new ConfigSetAdminRequest.List().process(solrClient).getConfigSets());
+
+ CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
+ //TODO use SOLR-11617 client API to set alias metadata
+ final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
+ zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
+
+
+ // now we index a document
+ solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
+ solrClient.commit(alias);
+ //assertDocRoutedToCol(lastDocId, col23rd);
+ assertInvariants();
+
+ // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
+ try {
+ final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
+ final Object errors = resp.getResponseHeader().get("errors");
+ assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
+ } catch (SolrException e) {
+ assertTrue(e.getMessage().contains("couldn't be routed"));
+ }
+ numDocsDeletedOrFailed++;
+
+ // add another collection, add to alias (soonest comes first)
+ final String col24th = alias + "_2017-10-24";
+ CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now
+ .setMaxShardsPerNode(2)
+ .withProperty("timePartitionAliasName", alias)
+ .process(solrClient);
+ CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
+
+ // index 3 documents in a random fashion
+ addDocsAndCommit(
+ newDoc(Instant.parse("2017-10-23T00:00:00Z")),
+ newDoc(Instant.parse("2017-10-24T01:00:00Z")),
+ newDoc(Instant.parse("2017-10-24T02:00:00Z"))
+ );
+ assertInvariants();
+
+ // assert that the IncrementURP has updated all '0' to '1'
+ final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
+ assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
+
+ //delete a random document id; ensure we don't find it
+ int idToDelete = 1 + random().nextInt(lastDocId);
+ if (idToDelete == 2) { // #2 didn't make it
+ idToDelete++;
+ }
+ solrClient.deleteById(alias, Integer.toString(idToDelete));
+ solrClient.commit(alias);
+ numDocsDeletedOrFailed++;
+ assertInvariants();
+ }
+
+ private void checkNoError(NamedList<Object> response) {
+ Object errors = response.get("errorMessages");
+ assertNull("" + errors, errors);
+ }
+
+ /** Adds these documents and commits, returning when they are committed.
+ * We randomly go about this in different ways. */
+ private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
+ // we assume these are not old docs!
+
+ // this is a list of the collections & the alias name. Use to pick randomly where to send.
+ // (it doesn't matter where we send docs since the alias is honored at the URP level)
+ List<String> collections = new ArrayList<>();
+ collections.add(alias);
+ collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
+
+ int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
+ int numDocsBefore = queryNumDocs();
+ if (random().nextBoolean()) {
+ // send in separate requests
+ for (SolrInputDocument solrInputDocument : solrInputDocuments) {
+ String col = collections.get(random().nextInt(collections.size()));
+ solrClient.add(col, solrInputDocument, commitWithin);
+ }
+ } else {
+ // send in a batch.
+ String col = collections.get(random().nextInt(collections.size()));
+ solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
+ }
+ String col = collections.get(random().nextInt(collections.size()));
+ if (commitWithin == -1) {
+ solrClient.commit(col);
+ } else {
+ // check that it all got committed eventually
+ int numDocs = queryNumDocs();
+ if (numDocs == numDocsBefore + solrInputDocuments.length) {
+ System.err.println("Docs committed sooner than expected. Bug or slow test env?");
+ return;
+ }
+ // wait until it's committed, plus some play time for commit to become visible
+ Thread.sleep(commitWithin + 200);
+ numDocs = queryNumDocs();
+ assertEquals("not committed. Bug or a slow test?",
+ numDocsBefore + solrInputDocuments.length, numDocs);
+ }
+ }
+
+ private int queryNumDocs() throws SolrServerException, IOException {
+ return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
+ }
+
+ private void assertInvariants() throws IOException, SolrServerException {
+ final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
+
+ final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
+ assert !cols.isEmpty();
+
+ int totalNumFound = 0;
+ Instant colEndInstant = null; // exclusive end
+ for (String col : cols) {
+ final Instant colStartInstant = TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, col);
+ //TODO do this in parallel threads
+ final QueryResponse colStatsResp = solrClient.query(col, params(
+ "q", "*:*",
+ "rows", "0",
+ "stats", "true",
+ "stats.field", timeField));
+ long numFound = colStatsResp.getResults().getNumFound();
+ if (numFound > 0) {
+ totalNumFound += numFound;
+ final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
+ assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
+ if (colEndInstant != null) {
+ assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
+ }
+ }
+
+ colEndInstant = colStartInstant; // next older segment will max out at our current start time
+ }
+ assertEquals(expectNumFound, totalNumFound);
+ }
+
+ private SolrInputDocument newDoc(Instant timestamp) {
+ return sdoc("id", Integer.toString(++lastDocId),
+ timeField, timestamp.toString(),
+ intField, "0"); // always 0
+ }
+
+ @Test
+ public void testParse() {
+ assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
+ TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
+ assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
+ TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
+ assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
+ TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
+ assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
+ TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
+ }
+
+ public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
+
+ @Override
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
+ (src) -> Integer.valueOf(src.toString()) + 1);
+ }
+ }
+
+}