You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/01/28 23:07:36 UTC
[lucene-solr] 01/05: SOLR-13148 - Patch #2 - Extract
TimeRoutedAlias related code from URP & CMD clases creating a generic
RoutedAlias
This is an automated email from the ASF dual-hosted git repository.
gus pushed a commit to branch solr-13131
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 3e575a88dcc9b62de18b93d50bb96df7f48542a4
Author: Gus Heck <gu...@apache.org>
AuthorDate: Sun Jan 27 14:00:54 2019 +0200
SOLR-13148 - Patch #2 - Extract TimeRoutedAlias related code from URP & CMD clases creating a generic RoutedAlias
---
.../solr/cloud/api/collections/AliasCmd.java | 90 ++++
.../cloud/api/collections/CategoryRoutedAlias.java | 62 +++
.../solr/cloud/api/collections/CreateAliasCmd.java | 4 +-
.../MaintainCategoryRoutedAliasCmd.java | 29 ++
...iasCmd.java => MaintainTimeRoutedAliasCmd.java} | 73 +--
.../OverseerCollectionMessageHandler.java | 37 +-
.../solr/cloud/api/collections/RoutedAlias.java | 67 +++
.../cloud/api/collections/TimeRoutedAlias.java | 285 +++++++++++-
.../DistributedUpdateProcessorFactory.java | 2 +-
.../processor/RoutedAliasUpdateProcessor.java | 270 +++++++++++
.../processor/TimeRoutedAliasUpdateProcessor.java | 507 ---------------------
.../TimeRoutedAliasUpdateProcessorTest.java | 9 +-
.../solr/common/params/CollectionParams.java | 3 +-
13 files changed, 817 insertions(+), 621 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
new file mode 100644
index 0000000..3bc1d0b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.util.Map;
+
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.OverseerSolrResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+
+import static org.apache.solr.cloud.api.collections.RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
+import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CREATE_COLLECTION_PREFIX;
+import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+/**
+ * Common superclass for commands that maintain or manipulate aliases. In the routed alias parlance, "maintain"
+ * means, given the current state of the alias and some information from a routed field in a document that
+ * may imply a need for changes, create, delete or otherwise modify collections as required.
+ */
+public abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
+
+ /**
+ * Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
+ * If the collection already exists then this is not an error.<p>
+ */
+ NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
+ String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
+ // Map alias metadata starting with a prefix to a create-collection API request
+ final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
+ for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
+ if (e.getKey().startsWith(CREATE_COLLECTION_PREFIX)) {
+ createReqParams.set(e.getKey().substring(CREATE_COLLECTION_PREFIX.length()), e.getValue());
+ }
+ }
+ if (createReqParams.get(COLL_CONF) == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "We require an explicit " + COLL_CONF);
+ }
+ createReqParams.set(NAME, createCollName);
+ createReqParams.set("property." + ROUTED_ALIAS_NAME_CORE_PROP, aliasName);
+ // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
+ // Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
+ final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
+ new LocalSolrQueryRequest(null, createReqParams),
+ null,
+ ocmh.overseer.getCoreContainer().getCollectionsHandler());
+ createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
+
+ NamedList results = new NamedList();
+ try {
+ // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
+ // note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
+ // already have a lock on the alias name which should be sufficient.
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
+ } catch (SolrException e) {
+ // The collection might already exist, and that's okay -- we can adopt it.
+ if (!e.getMessage().contains("collection already exists")) {
+ throw e;
+ }
+ }
+
+ CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
+ new OverseerSolrResponse(results));
+ return results;
+ }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
new file mode 100644
index 0000000..bc565f03
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.update.AddUpdateCommand;
+
+public class CategoryRoutedAlias implements RoutedAlias {
+ private final String aliasName;
+ private final Map<String, String> aliasProperties;
+
+ public CategoryRoutedAlias(String aliasName, Map<String, String> aliasProperties) {
+
+ this.aliasName = aliasName;
+ this.aliasProperties = aliasProperties;
+ }
+
+ @Override
+ public boolean updateParsedCollectionAliases(ZkController zkController) {
+ return false;
+ }
+
+ @Override
+ public String getAliasName() {
+ return aliasName;
+ }
+
+ @Override
+ public List<Map.Entry<Instant, String>> parseCollections(Aliases aliases) {
+ return null;
+ }
+
+ @Override
+ public void validateRouteValue(AddUpdateCommand cmd) {
+
+ }
+
+ @Override
+ public String createCollectionsIfRequired(AddUpdateCommand cmd) {
+ return null;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
index 7117019..61931d9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateAliasCmd.java
@@ -41,7 +41,7 @@ import org.apache.solr.util.DateMathParser;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
-public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+public class CreateAliasCmd extends AliasCmd {
private final OverseerCollectionMessageHandler ocmh;
@@ -121,7 +121,7 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, startTime);
// Create the collection
- MaintainRoutedAliasCmd.createCollectionAndWait(state, aliasName, aliasProperties, initialCollectionName, ocmh);
+ createCollectionAndWait(state, aliasName, aliasProperties, initialCollectionName, ocmh);
validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);
// Create/update the alias
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
new file mode 100644
index 0000000..59274d6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainCategoryRoutedAliasCmd.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.NamedList;
+
+public class MaintainCategoryRoutedAliasCmd extends AliasCmd {
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ //todo
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
similarity index 74%
rename from solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
rename to solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
index e5c5de6..4751979 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainTimeRoutedAliasCmd.java
@@ -31,14 +31,12 @@ import java.util.Map;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
@@ -49,9 +47,6 @@ import org.apache.solr.util.DateMathParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CREATE_COLLECTION_PREFIX;
-import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
@@ -64,14 +59,14 @@ import static org.apache.solr.common.params.CommonParams.NAME;
* @since 7.3
* @lucene.internal
*/
-public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+public class MaintainTimeRoutedAliasCmd extends AliasCmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName"; //TODO rename to createAfter
private final OverseerCollectionMessageHandler ocmh;
- public MaintainRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
+ public MaintainTimeRoutedAliasCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@@ -79,15 +74,15 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
* Invokes this command from the client. If there's a problem it will throw an exception.
* Please note that is important to never add async to this invocation. This method must
* block (up to the standard OCP timeout) to prevent large batches of add's from sending a message
- * to the overseer for every document added in TimeRoutedAliasUpdateProcessor.
+ * to the overseer for every document added in RoutedAliasUpdateProcessor.
*/
public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName)
throws Exception {
- final String operation = CollectionParams.CollectionAction.MAINTAINROUTEDALIAS.toLower();
+ final String operation = CollectionParams.CollectionAction.MAINTAINTIMEROUTEDALIAS.toLower();
Map<String, Object> msg = new HashMap<>();
msg.put(Overseer.QUEUE_OPERATION, operation);
msg.put(CollectionParams.NAME, aliasName);
- msg.put(MaintainRoutedAliasCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
+ msg.put(MaintainTimeRoutedAliasCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
final SolrResponse rsp = collHandler.sendToOCPQueue(new ZkNodeProps(msg));
if (rsp.getException() != null) {
throw rsp.getException();
@@ -110,12 +105,13 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
final Aliases aliases = aliasesManager.getAliases();
final Map<String, String> aliasMetadata = aliases.getCollectionAliasProperties(aliasName);
if (aliasMetadata == null) {
- throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Alias " + aliasName + " does not exist."); // if it did exist, we'd have a non-null map
}
final TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata);
final List<Map.Entry<Instant, String>> parsedCollections =
- timeRoutedAlias.parseCollections(aliases, () -> newAliasMustExistException(aliasName));
+ timeRoutedAlias.parseCollections(aliases);
//---- GET MOST RECENT COLL
final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
@@ -200,7 +196,7 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
// note: we could re-parse the TimeRoutedAlias object from curAliases but I don't think there's a point to it.
final List<Map.Entry<Instant, String>> parsedCollections =
- timeRoutedAlias.parseCollections(curAliases, () -> newAliasMustExistException(aliasName));
+ timeRoutedAlias.parseCollections(curAliases);
//iterating from newest to oldest, find the first collection that has a time <= "before". We keep this collection
// (and all newer to left) but we delete older collections, which are the ones that follow.
@@ -251,55 +247,4 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
return results;
}
- /**
- * Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
- * If the collection already exists then this is not an error.
- * IMPORTANT: Only call this from an {@link OverseerCollectionMessageHandler.Cmd}.
- */
- static NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
- String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
- // Map alias metadata starting with a prefix to a create-collection API request
- final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
- for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
- if (e.getKey().startsWith(CREATE_COLLECTION_PREFIX)) {
- createReqParams.set(e.getKey().substring(CREATE_COLLECTION_PREFIX.length()), e.getValue());
- }
- }
- if (createReqParams.get(COLL_CONF) == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "We require an explicit " + COLL_CONF );
- }
- createReqParams.set(NAME, createCollName);
- createReqParams.set("property." + ROUTED_ALIAS_NAME_CORE_PROP, aliasName);
- // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
- // Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
- final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
- new LocalSolrQueryRequest(null, createReqParams),
- null,
- ocmh.overseer.getCoreContainer().getCollectionsHandler());
- createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
-
- NamedList results = new NamedList();
- try {
- // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
- // note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
- // already have a lock on the alias name which should be sufficient.
- ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
- } catch (SolrException e) {
- // The collection might already exist, and that's okay -- we can adopt it.
- if (!e.getMessage().contains("collection already exists")) {
- throw e;
- }
- }
-
- CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
- new OverseerSolrResponse(results));
- return results;
- }
-
- private SolrException newAliasMustExistException(String aliasName) {
- return new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Alias " + aliasName + " does not exist.");
- }
-
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index e67fc7f..c29c711 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -31,39 +31,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.UTILIZENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.Utils.makeMap;
@@ -265,7 +233,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(CREATEALIAS, new CreateAliasCmd(this))
.put(DELETEALIAS, new DeleteAliasCmd(this))
.put(ALIASPROP, new SetAliasPropCmd(this))
- .put(MAINTAINROUTEDALIAS, new MaintainRoutedAliasCmd(this))
+ .put(MAINTAINTIMEROUTEDALIAS, new MaintainTimeRoutedAliasCmd(this))
+ .put(MAINTAINCATEGORYROUTEDALIAS, new MaintainTimeRoutedAliasCmd(this))
.put(OVERSEERSTATUS, new OverseerStatusCmd(this))
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
new file mode 100644
index 0000000..68685f6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.update.AddUpdateCommand;
+
+public interface RoutedAlias {
+ String ROUTED_ALIAS_NAME_CORE_PROP = "routedAliasName"; // core prop
+
+ static SolrException newAliasMustExistException(String aliasName) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Routed alias " + aliasName + " appears to have been removed during request processing.");
+ }
+
+ /**
+ * Ensure our parsed version of the alias collection list is up to date. If it was modified, return true.
+ * Note that this will return true if some other alias was modified or if properties were modified. These
+ * are spurious and the caller should be written to be tolerant of no material changes.
+ */
+ boolean updateParsedCollectionAliases(ZkController zkController);
+
+ String getAliasName();
+
+ /**
+ * Parses the elements of the collection list. Result is returned them in sorted order (desc) if there
+ * is a natural order for this type of routed alias
+ */
+ List<Map.Entry<Instant,String>> parseCollections(Aliases aliases);
+
+ /**
+ * Check that the value we will be routing on is legal for this type of routed alias.
+ *
+ * @param cmd the command containing the document
+ */
+ void validateRouteValue(AddUpdateCommand cmd);
+
+ /**
+ * Create any required collections and return the name of the collection to which the current document should be sent.
+ *
+ * @param cmd The command that might cause collection creation
+ * @return The name of the proper destination collection for the document which may or may not be a
+ * newly created collection
+ */
+ String createCollectionsIfRequired( AddUpdateCommand cmd);
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
index 1fb3d9e..5070478 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud.api.collections;
+import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneOffset;
@@ -34,17 +35,27 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
-import java.util.function.Supplier;
import com.google.common.base.Objects;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.RequiredSolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.processor.RoutedAliasUpdateProcessor;
import org.apache.solr.util.DateMathParser;
import org.apache.solr.util.TimeZoneUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CreationType.*;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.params.CommonParams.TZ;
@@ -52,10 +63,21 @@ import static org.apache.solr.common.params.CommonParams.TZ;
* Holds configuration for a routed alias, and some common code and constants.
*
* @see CreateAliasCmd
- * @see MaintainRoutedAliasCmd
- * @see org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor
+ * @see MaintainTimeRoutedAliasCmd
+ * @see RoutedAliasUpdateProcessor
*/
-public class TimeRoutedAlias {
+public class TimeRoutedAlias implements RoutedAlias {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // This class is created once per request and the overseer methods prevent duplicate create requests
+ // from creating extra copies. All we need to track here is that we don't spam preemptive creates to
+ // the overseer multiple times from *this* request.
+ private volatile boolean preemptiveCreateOnceAlready = false;
+
+ // These two fields may be updated within the calling thread during processing but should
+ // never be updated by any async creation thread.
+ private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
+ private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
// These are parameter names to routed alias creation, AND are stored as metadata with the alias.
public static final String ROUTER_PREFIX = "router.";
@@ -94,8 +116,6 @@ public class TimeRoutedAlias {
(key.startsWith(ROUTER_PREFIX) && !key.equals(ROUTER_START)) || //TODO reconsider START special case
key.startsWith(CREATE_COLLECTION_PREFIX);
- public static final String ROUTED_ALIAS_NAME_CORE_PROP = "routedAliasName"; // core prop
-
// This format must be compatible with collection name limitations
private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
@@ -121,6 +141,22 @@ public class TimeRoutedAlias {
}
+ @Override
+ public boolean updateParsedCollectionAliases(ZkController zkController) {
+ final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
+ if (this.parsedCollectionsAliases != aliases) {
+ if (this.parsedCollectionsAliases != null) {
+ log.debug("Observing possibly updated alias: {}", getAliasName());
+ }
+ this.parsedCollectionsDesc = parseCollections(aliases );
+ this.parsedCollectionsAliases = aliases;
+ return true;
+ }
+ return false;
+ }
+
+
+
//
// Instance data and methods
//
@@ -187,6 +223,7 @@ public class TimeRoutedAlias {
}
}
+ @Override
public String getAliasName() {
return aliasName;
}
@@ -229,10 +266,10 @@ public class TimeRoutedAlias {
}
/** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
- public List<Map.Entry<Instant,String>> parseCollections(Aliases aliases, Supplier<SolrException> aliasNotExist) {
+ public List<Map.Entry<Instant,String>> parseCollections(Aliases aliases) {
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
if (collections == null) {
- throw aliasNotExist.get();
+ throw RoutedAlias.newAliasMustExistException(getAliasName());
}
// note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later
List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
@@ -251,4 +288,236 @@ public class TimeRoutedAlias {
assert nextCollTimestamp.isAfter(fromTimestamp);
return nextCollTimestamp;
}
+
+ @Override
+ public void validateRouteValue(AddUpdateCommand cmd) {
+ final Instant docTimestamp =
+ parseRouteKey(cmd.getSolrInputDocument().getFieldValue(getRouteField()));
+
+ // TODO: maybe in some cases the user would want to ignore/warn instead?
+ if (docTimestamp.isAfter(Instant.now().plusMillis(getMaxFutureMs()))) {
+ throw new SolrException(BAD_REQUEST,
+ "The document's time routed key of " + docTimestamp + " is too far in the future given " +
+ ROUTER_MAX_FUTURE + "=" + getMaxFutureMs());
+ }
+ }
+
+
+
+ @Override
+ public String createCollectionsIfRequired(AddUpdateCommand cmd) {
+ SolrQueryRequest req = cmd.getReq();
+ SolrCore core = req.getCore();
+ CoreContainer coreContainer = core.getCoreContainer();
+ CollectionsHandler collectionsHandler = coreContainer.getCollectionsHandler();
+ final Instant docTimestamp =
+ parseRouteKey(cmd.getSolrInputDocument().getFieldValue(getRouteField()));
+
+ // Even though it is possible that multiple requests hit this code in the 1-2 sec that
+ // it takes to create a collection, it's an established anti-pattern to feed data with a very large number
+ // of client connections. This in mind, we only guard against spamming the overseer within a batch of
+ // updates. We are intentionally tolerating a low level of redundant requests in favor of simpler code. Most
+ // super-sized installations with many update clients will likely be multi-tenant and multiple tenants
+ // probably don't write to the same alias. As such, we have deferred any solution to the "many clients causing
+ // collection creation simultaneously" problem until such time as someone actually has that problem in a
+ // real world use case that isn't just an anti-pattern.
+ Map.Entry<Instant, String> candidateCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
+ String candidateCollectionName = candidateCollectionDesc.getValue();
+
+ try {
+ switch (typeOfCreationRequired(docTimestamp, candidateCollectionDesc.getKey())) {
+ case SYNCHRONOUS:
+ // This next line blocks until all collections required by the current document have been created
+ return createAllRequiredCollections(docTimestamp, cmd, candidateCollectionDesc);
+ case ASYNC_PREEMPTIVE:
+ if (!preemptiveCreateOnceAlready) {
+ log.info("EXECUTING preemptive creation for {}", getAliasName());
+ // It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
+ // in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
+ // and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
+ // collections that existed when candidateCollectionDesc was created. If this class updates it's notion of
+ // parsedCollectionsDesc since candidateCollectionDesc was chosen, we could create collection n+2
+ // instead of collection n+1.
+ String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+
+ // This line does not block and the document can be added immediately
+ preemptiveAsync(() -> createNextCollection(mostRecentCollName, collectionsHandler), core);
+ }
+ return candidateCollectionName;
+ case NONE:
+ return candidateCollectionName; // could use fall through, but fall through is fiddly for later editors.
+ default:
+ throw unknownCreateType();
+ }
+ // do nothing if creationType == NONE
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ /**
+ * Create as many collections as required. This method loops to allow for the possibility that the docTimestamp
+ * requires more than one collection to be created. Since multiple threads may be invoking maintain on separate
+ * requests to the same alias, we must pass in the name of the collection that this thread believes to be the most
+ * recent collection. This assumption is checked when the command is executed in the overseer. When this method
+ * finds that all collections required have been created it returns the (possibly new) most recent collection.
+ * The return value is ignored by the calling code in the async preemptive case.
+ *
+ * @param docTimestamp the timestamp from the document that determines routing
+ * @param cmd the update command being processed
+ * @param targetCollectionDesc the descriptor for the presently selected collection which should also be
+ * the most recent collection in all cases where this method is invoked.
+ * @return The latest collection, including collections created during maintenance
+ */
+ private String createAllRequiredCollections( Instant docTimestamp, AddUpdateCommand cmd,
+ Map.Entry<Instant, String> targetCollectionDesc) {
+ SolrQueryRequest req = cmd.getReq();
+ SolrCore core = req.getCore();
+ CoreContainer coreContainer = core.getCoreContainer();
+ CollectionsHandler collectionsHandler = coreContainer.getCollectionsHandler();
+ do {
+ switch(typeOfCreationRequired(docTimestamp, targetCollectionDesc.getKey())) {
+ case NONE:
+ return targetCollectionDesc.getValue(); // we don't need another collection
+ case ASYNC_PREEMPTIVE:
+ // can happen when preemptive interval is longer than one time slice
+ String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
+ preemptiveAsync(() -> createNextCollection(mostRecentCollName, collectionsHandler), core);
+ return targetCollectionDesc.getValue();
+ case SYNCHRONOUS:
+ createNextCollection(targetCollectionDesc.getValue(), collectionsHandler); // *should* throw if fails for some reason but...
+ ZkController zkController = coreContainer.getZkController();
+ if (!updateParsedCollectionAliases(zkController)) { // thus we didn't make progress...
+ // this is not expected, even in known failure cases, but we check just in case
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "We need to create a new time routed collection but for unknown reasons were unable to do so.");
+ }
+ // then retry the loop ... have to do find again in case other requests also added collections
+ // that were made visible when we called updateParsedCollectionAliases()
+ targetCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
+ break;
+ default:
+ throw unknownCreateType();
+
+ }
+ } while (true);
+ }
+
+ private SolrException unknownCreateType() {
+ return new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown creation type while adding " +
+ "document to a Time Routed Alias! This is a bug caused when a creation type has been added but " +
+ "not all code has been updated to handle it.");
+ }
+
+ private void createNextCollection(String mostRecentCollName, CollectionsHandler collHandler) {
+ // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection
+ // and update the alias contingent on the most recent collection name being the same as
+ // what we think so here, otherwise it will return (without error).
+ try {
+ MaintainTimeRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
+ // we don't care about the response. It's possible no collection was created because
+ // of a race and that's okay... we'll ultimately retry any way.
+
+ // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
+ // not yet know about the new alias (thus won't see the newly added collection to it), and we might think
+ // we failed.
+ collHandler.getCoreContainer().getZkController().getZkStateReader().aliasesManager.update();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+
+
+ private void preemptiveAsync(Runnable r, SolrCore core) {
+ preemptiveCreateOnceAlready = true;
+ core.runAsync(r);
+ }
+
+ /**
+ * Determine if the a new collection will be required based on the document timestamp. Passing null for
+ * preemptiveCreateInterval tells you if the document is beyond all existing collections with a response of
+ * {@link CreationType#NONE} or {@link CreationType#SYNCHRONOUS}, and passing a valid date math for
+ * preemptiveCreateMath additionally distinguishes the case where the document is close enough to the end of
+ * the TRA to trigger preemptive creation but not beyond all existing collections with a value of
+ * {@link CreationType#ASYNC_PREEMPTIVE}.
+ *
+ * @param docTimeStamp The timestamp from the document
+ * @param targetCollectionTimestamp The timestamp for the presently selected destination collection
+ * @return a {@code CreationType} indicating if and how to create a collection
+ */
+ private CreationType typeOfCreationRequired(Instant docTimeStamp, Instant targetCollectionTimestamp) {
+ final Instant nextCollTimestamp = computeNextCollTimestamp(targetCollectionTimestamp);
+
+ if (!docTimeStamp.isBefore(nextCollTimestamp)) {
+ // current document is destined for a collection that doesn't exist, must create the destination
+ // to proceed with this add command
+ return SYNCHRONOUS;
+ }
+
+ if (isNotBlank(getPreemptiveCreateWindow())) {
+ Instant preemptNextColCreateTime =
+ calcPreemptNextColCreateTime(getPreemptiveCreateWindow(), nextCollTimestamp);
+ if (!docTimeStamp.isBefore(preemptNextColCreateTime)) {
+ return ASYNC_PREEMPTIVE;
+ }
+ }
+
+ return NONE;
+ }
+
+ private Instant calcPreemptNextColCreateTime(String preemptiveCreateMath, Instant nextCollTimestamp) {
+ DateMathParser dateMathParser = new DateMathParser();
+ dateMathParser.setNow(Date.from(nextCollTimestamp));
+ try {
+ return dateMathParser.parseMath(preemptiveCreateMath).toInstant();
+ } catch (ParseException e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Invalid Preemptive Create Window Math:'" + preemptiveCreateMath + '\'', e);
+ }
+ }
+
+ private Instant parseRouteKey(Object routeKey) {
+ final Instant docTimestamp;
+ if (routeKey instanceof Instant) {
+ docTimestamp = (Instant) routeKey;
+ } else if (routeKey instanceof Date) {
+ docTimestamp = ((Date)routeKey).toInstant();
+ } else if (routeKey instanceof CharSequence) {
+ docTimestamp = Instant.parse((CharSequence)routeKey);
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
+ }
+ return docTimestamp;
+ }
+ /**
+ * Given the route key, finds the correct collection or returns the most recent collection if the doc
+ * is in the future. Future docs will potentially cause creation of a collection that does not yet exist
+ * or an error if they exceed the maxFutureMs setting.
+ *
+ * @throws SolrException if the doc is too old to be stored in the TRA
+ */
+ private Map.Entry<Instant, String> findCandidateGivenTimestamp(Instant docTimestamp, String printableId) {
+ // Lookup targetCollection given route key. Iterates in reverse chronological order.
+ // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
+ for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
+ Instant colStartTime = entry.getKey();
+ if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime
+ return entry; //found it
+ }
+ }
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Doc " + printableId + " couldn't be routed with " + getRouteField() + "=" + docTimestamp);
+ }
+
+ enum CreationType {
+ NONE,
+ ASYNC_PREEMPTIVE,
+ SYNCHRONOUS
+ }
+
+
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 9f2a0ba..4addae0 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -50,7 +50,7 @@ public class DistributedUpdateProcessorFactory
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
// note: will sometimes return DURP (no overhead) instead of wrapping
- return TimeRoutedAliasUpdateProcessor.wrap(req,
+ return RoutedAliasUpdateProcessor.wrap(req,
new DistributedUpdateProcessor(req, rsp, next));
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
new file mode 100644
index 0000000..5929e94
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.api.collections.CategoryRoutedAlias;
+import org.apache.solr.cloud.api.collections.RoutedAlias;
+import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * Distributes update requests to a series of collections partitioned by a "routing" field. Issues
+ * requests to create new collections on-demand.
+ *
+ * Depends on this core having a special core property that points to the alias name that this collection is a part of.
+ * And further requires certain properties on the Alias. Collections pointed to by the alias must be named for the alias
+ * plus underscored ('_') and a routing specifier specific to the type of routed alias. These collections should not be
+ * created by the user, but are created automatically by the routed alias.
+ *
+ * @since 7.2.0 (formerly known as TimeRoutedAliasUpdateProcessor)
+ */
+public class RoutedAliasUpdateProcessor extends UpdateRequestProcessor {
+
+ private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // refs to std infrastructure
+ private final SolrQueryRequest req;
+ private final SolrCmdDistributor cmdDistrib;
+ private final ZkController zkController;
+
+ // Stuff specific to this class
+ private final String thisCollection;
+ private final RoutedAlias routedAlias;
+ private final SolrParams outParamsToLeader;
+
+
+ public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
+ //TODO get from "Collection property"
+ final String aliasName = req.getCore().getCoreDescriptor()
+ .getCoreProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
+ final DistribPhase shardDistribPhase =
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+ final DistribPhase aliasDistribPhase =
+ DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
+ if (aliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
+ // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
+ // TODO this may eventually not be true but at the moment it is
+ // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
+ return next;
+ } else {
+ try {
+ final Map<String, String> aliasProperties = getAliasProps(req, aliasName);
+ String routerType = aliasProperties.get("router.name");
+ RoutedAlias alias;
+ switch (routerType) {
+ case "time": {
+ log.debug("Time Routed Alias detected for {}", aliasName );
+ alias = new TimeRoutedAlias(aliasName, aliasProperties);
+ break;
+ }
+ case "category":{
+ log.debug("Category Routed Alias detected for {}", aliasName );
+ alias = new CategoryRoutedAlias(aliasName, aliasProperties);
+ break;
+ }
+ default:
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown router type " + routerType);
+ }
+ return new RoutedAliasUpdateProcessor(req, next, aliasDistribPhase, alias);
+ } catch (Exception e) { // ensure we throw SERVER_ERROR not BAD_REQUEST at this stage
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Routed alias has invalid properties: " + e, e);
+ }
+
+ }
+ }
+
+ private static Map<String, String> getAliasProps(SolrQueryRequest req, String aliasName) {
+ ZkController zkController = req.getCore().getCoreContainer().getZkController();
+ final Map<String, String> aliasProperties = zkController.getZkStateReader().getAliases().getCollectionAliasProperties(aliasName);
+ if (aliasProperties == null) {
+ throw RoutedAlias.newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
+ }
+ return aliasProperties;
+ }
+
+ private RoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
+ DistribPhase aliasDistribPhase, RoutedAlias routedAlias) {
+ super(next);
+ this.routedAlias = routedAlias;
+ assert aliasDistribPhase == DistribPhase.NONE;
+ final SolrCore core = req.getCore();
+ final CoreContainer cc = core.getCoreContainer();
+ this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ this.req = req;
+ this.zkController = cc.getZkController();
+ this.cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
+
+
+
+ ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
+ // Don't distribute these params; they will be distributed from the local processCommit separately.
+ // (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
+ outParams.remove(UpdateParams.OPTIMIZE);
+ outParams.remove(UpdateParams.COMMIT);
+ outParams.remove(UpdateParams.SOFT_COMMIT);
+ outParams.remove(UpdateParams.PREPARE_COMMIT);
+ outParams.remove(UpdateParams.ROLLBACK);
+ // Add these...
+ // Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
+ outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
+ // Signal this is a distributed search from this URP (see #wrap())
+ outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
+ outParamsToLeader = outParams;
+ }
+
+ private String getAliasName() {
+ return routedAlias.getAliasName();
+ }
+
+ @Override
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ routedAlias.validateRouteValue(cmd);
+
+ // to avoid potential for race conditions, this next method should not get called again unless
+ // we have created a collection synchronously
+ routedAlias.updateParsedCollectionAliases(this.zkController);
+
+ String targetCollection = routedAlias.createCollectionsIfRequired(cmd);
+
+ if (thisCollection.equals(targetCollection)) {
+ // pass on through; we've reached the right collection
+ super.processAdd(cmd);
+ } else {
+ // send to the right collection
+ SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
+ cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
+ }
+ }
+
+ @Override
+ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+ cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+ }
+
+ @Override
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
+ cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
+ cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't.
+ }
+
+// Not supported by SolrCmdDistributor and is sketchy any way
+// @Override
+// public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+// }
+
+ @Override
+ public void finish() throws IOException {
+ try {
+ cmdDistrib.finish();
+ final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+ if (!errors.isEmpty()) {
+ throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
+ }
+ } finally {
+ super.finish();
+ }
+ }
+
+ @Override
+ protected void doClose() {
+ try {
+ cmdDistrib.close();
+ } finally {
+ super.doClose();
+ }
+ }
+
+ private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {
+ SchemaField uniqueKeyField = req.getSchema().getUniqueKeyField();
+ // schema might not have key field...
+ String idFieldName = uniqueKeyField == null ? null : uniqueKeyField.getName();
+ String idValue = uniqueKeyField == null ? null : doc.getFieldValue(idFieldName).toString();
+ DocCollection coll = zkController.getClusterState().getCollection(collection);
+ Slice slice = coll.getRouter().getTargetSlice(idValue, doc, null, req.getParams(), coll);
+ return getLeaderNode(collection, slice);
+ }
+
+ private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
+ final Aliases aliases = zkController.getZkStateReader().getAliases();
+ List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
+ if (collections == null) {
+ throw RoutedAlias.newAliasMustExistException(getAliasName());
+ }
+ return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
+ }
+
+ private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
+ final Slice[] activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlicesArr();
+ if (activeSlices.length == 0) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
+ }
+ final Slice slice = activeSlices[0];
+ return getLeaderNode(collection, slice);
+ }
+
+ private SolrCmdDistributor.Node getLeaderNode(String collection, Slice slice) {
+ //TODO when should we do StdNode vs RetryNode?
+ final Replica leader = slice.getLeader();
+ if (leader == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
+ }
+ return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
+ collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
+ }
+
+
+
+
+
+}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
deleted file mode 100644
index c28ac44..0000000
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update.processor;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.text.ParseException;
-import java.time.Instant;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.api.collections.MaintainRoutedAliasCmd;
-import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.schema.SchemaField;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.SolrCmdDistributor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-import org.apache.solr.util.DateMathParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.NONE;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.SYNCHRONOUS;
-
-/**
- * Distributes update requests to a rolling series of collections partitioned by a timestamp field. Issues
- * requests to create new collections on-demand.
- *
- * Depends on this core having a special core property that points to the alias name that this collection is a part of.
- * And further requires certain properties on the Alias. Collections pointed to by the alias must be named for the alias
- * plus underscored ('_') and a time stamp of ISO_DATE plus optionally _HH_mm_ss. These collections should not be
- * created by the user, but are created automatically by the time partitioning system.
- *
- * @since 7.2.0
- */
-public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
- //TODO do we make this more generic to others who want to partition collections using something else besides time?
-
- private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- // refs to std infrastructure
- private final SolrQueryRequest req;
- private final SolrCmdDistributor cmdDistrib;
- private final CollectionsHandler collHandler;
- private final ZkController zkController;
-
- // Stuff specific to this class
- private final String thisCollection;
- private final TimeRoutedAlias timeRoutedAlias;
- private final SolrParams outParamsToLeader;
-
- // These two fields may be updated within the calling thread during processing but should
- // never be updated by any async creation thread.
- private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
- private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
-
- // This class is created once per request and the overseer methods prevent duplicate create requests
- // from creating extra copies. All we need to track here is that we don't spam preemptive creates to
- // the overseer multiple times from *this* request.
- private volatile boolean preemptiveCreateOnceAlready = false;
-
- public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
- //TODO get from "Collection property"
- final String aliasName = req.getCore().getCoreDescriptor()
- .getCoreProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
- final DistribPhase shardDistribPhase =
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
- final DistribPhase aliasDistribPhase =
- DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
- if (aliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
- // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
- // TODO this may eventually not be true but at the moment it is
- // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
- return next;
- } else {
- return new TimeRoutedAliasUpdateProcessor(req, next, aliasName, aliasDistribPhase);
- }
- }
-
- private TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
- String aliasName,
- DistribPhase aliasDistribPhase) {
- super(next);
- assert aliasDistribPhase == DistribPhase.NONE;
- final SolrCore core = req.getCore();
- this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- this.req = req;
- CoreContainer cc = core.getCoreContainer();
- zkController = cc.getZkController();
- cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
- collHandler = cc.getCollectionsHandler();
-
- final Map<String, String> aliasProperties = zkController.getZkStateReader().getAliases().getCollectionAliasProperties(aliasName);
- if (aliasProperties == null) {
- throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
- }
- try {
- this.timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasProperties);
- } catch (Exception e) { // ensure we throw SERVER_ERROR not BAD_REQUEST at this stage
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Routed alias has invalid properties: " + e, e);
- }
-
- ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
- // Don't distribute these params; they will be distributed from the local processCommit separately.
- // (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
- outParams.remove(UpdateParams.OPTIMIZE);
- outParams.remove(UpdateParams.COMMIT);
- outParams.remove(UpdateParams.SOFT_COMMIT);
- outParams.remove(UpdateParams.PREPARE_COMMIT);
- outParams.remove(UpdateParams.ROLLBACK);
- // Add these...
- // Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
- outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
- // Signal this is a distributed search from this URP (see #wrap())
- outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
- outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
- outParamsToLeader = outParams;
- }
-
- private String getAliasName() {
- return timeRoutedAlias.getAliasName();
- }
-
- @Override
- public void processAdd(AddUpdateCommand cmd) throws IOException {
- final Instant docTimestamp =
- parseRouteKey(cmd.getSolrInputDocument().getFieldValue(timeRoutedAlias.getRouteField()));
-
- // TODO: maybe in some cases the user would want to ignore/warn instead?
- if (docTimestamp.isAfter(Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs()))) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "The document's time routed key of " + docTimestamp + " is too far in the future given " +
- TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
- }
-
- // to avoid potential for race conditions, this next method should not get called again unless
- // we have created a collection synchronously
- updateParsedCollectionAliases();
-
- String targetCollection = createCollectionsIfRequired(docTimestamp, cmd);
-
- if (thisCollection.equals(targetCollection)) {
- // pass on through; we've reached the right collection
- super.processAdd(cmd);
- } else {
- // send to the right collection
- SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
- cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
- }
- }
-
- /**
- * Create any required collections and return the name of the collection to which the current document should be sent.
- *
- * @param docTimestamp the date for the document taken from the field specified in the TRA config
- * @param cmd The initial calculated destination collection.
- * @return The name of the proper destination collection for the document which may or may not be a
- * newly created collection
- */
- private String createCollectionsIfRequired(Instant docTimestamp, AddUpdateCommand cmd) {
- // Even though it is possible that multiple requests hit this code in the 1-2 sec that
- // it takes to create a collection, it's an established anti-pattern to feed data with a very large number
- // of client connections. This in mind, we only guard against spamming the overseer within a batch of
- // updates. We are intentionally tolerating a low level of redundant requests in favor of simpler code. Most
- // super-sized installations with many update clients will likely be multi-tenant and multiple tenants
- // probably don't write to the same alias. As such, we have deferred any solution to the "many clients causing
- // collection creation simultaneously" problem until such time as someone actually has that problem in a
- // real world use case that isn't just an anti-pattern.
- Map.Entry<Instant, String> candidateCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
- String candidateCollectionName = candidateCollectionDesc.getValue();
- try {
- switch (typeOfCreationRequired(docTimestamp, candidateCollectionDesc.getKey())) {
- case SYNCHRONOUS:
- // This next line blocks until all collections required by the current document have been created
- return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
- case ASYNC_PREEMPTIVE:
- if (!preemptiveCreateOnceAlready) {
- log.info("EXECUTING preemptive creation for {}", timeRoutedAlias.getAliasName());
- // It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
- // in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
- // and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
- // collections that existed when candidateCollectionDesc was created. If this class updates it's notion of
- // parsedCollectionsDesc since candidateCollectionDesc was chosen, we could create collection n+2
- // instead of collection n+1.
- String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
-
- // This line does not block and the document can be added immediately
- preemptiveAsync(() -> createNextCollection(mostRecentCollName));
- }
- return candidateCollectionName;
- case NONE:
- return candidateCollectionName; // could use fall through, but fall through is fiddly for later editors.
- default:
- throw unknownCreateType();
- }
- // do nothing if creationType == NONE
- } catch (SolrException e) {
- throw e;
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
-
- private void preemptiveAsync(Runnable r) {
- preemptiveCreateOnceAlready = true;
- req.getCore().runAsync(r);
- }
-
- /**
- * Determine if the a new collection will be required based on the document timestamp. Passing null for
- * preemptiveCreateInterval tells you if the document is beyond all existing collections with a response of
- * {@link CreationType#NONE} or {@link CreationType#SYNCHRONOUS}, and passing a valid date math for
- * preemptiveCreateMath additionally distinguishes the case where the document is close enough to the end of
- * the TRA to trigger preemptive creation but not beyond all existing collections with a value of
- * {@link CreationType#ASYNC_PREEMPTIVE}.
- *
- * @param docTimeStamp The timestamp from the document
- * @param targetCollectionTimestamp The timestamp for the presently selected destination collection
- * @return a {@code CreationType} indicating if and how to create a collection
- */
- private CreationType typeOfCreationRequired(Instant docTimeStamp, Instant targetCollectionTimestamp) {
- final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(targetCollectionTimestamp);
-
- if (!docTimeStamp.isBefore(nextCollTimestamp)) {
- // current document is destined for a collection that doesn't exist, must create the destination
- // to proceed with this add command
- return SYNCHRONOUS;
- }
-
- if (isNotBlank(timeRoutedAlias.getPreemptiveCreateWindow())) {
- Instant preemptNextColCreateTime =
- calcPreemptNextColCreateTime(timeRoutedAlias.getPreemptiveCreateWindow(), nextCollTimestamp);
- if (!docTimeStamp.isBefore(preemptNextColCreateTime)) {
- return ASYNC_PREEMPTIVE;
- }
- }
-
- return NONE;
- }
-
- private Instant calcPreemptNextColCreateTime(String preemptiveCreateMath, Instant nextCollTimestamp) {
- DateMathParser dateMathParser = new DateMathParser();
- dateMathParser.setNow(Date.from(nextCollTimestamp));
- try {
- return dateMathParser.parseMath(preemptiveCreateMath).toInstant();
- } catch (ParseException e) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Invalid Preemptive Create Window Math:'" + preemptiveCreateMath + '\'', e);
- }
- }
-
- private Instant parseRouteKey(Object routeKey) {
- final Instant docTimestamp;
- if (routeKey instanceof Instant) {
- docTimestamp = (Instant) routeKey;
- } else if (routeKey instanceof Date) {
- docTimestamp = ((Date)routeKey).toInstant();
- } else if (routeKey instanceof CharSequence) {
- docTimestamp = Instant.parse((CharSequence)routeKey);
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
- }
- return docTimestamp;
- }
-
- /**
- * Ensure {@link #parsedCollectionsAliases} is up to date. If it was modified, return true.
- * Note that this will return true if some other alias was modified or if properties were modified. These
- * are spurious and the caller should be written to be tolerant of no material changes.
- */
- private boolean updateParsedCollectionAliases() {
- final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
- if (this.parsedCollectionsAliases != aliases) {
- if (this.parsedCollectionsAliases != null) {
- log.debug("Observing possibly updated alias: {}", getAliasName());
- }
- this.parsedCollectionsDesc = timeRoutedAlias.parseCollections(aliases, this::newAliasMustExistException);
- this.parsedCollectionsAliases = aliases;
- return true;
- }
- return false;
- }
-
- /**
- * Given the route key, finds the correct collection or returns the most recent collection if the doc
- * is in the future. Future docs will potentially cause creation of a collection that does not yet exist
- * or an error if they exceed the maxFutureMs setting.
- *
- * @throws SolrException if the doc is too old to be stored in the TRA
- */
- private Map.Entry<Instant, String> findCandidateGivenTimestamp(Instant docTimestamp, String printableId) {
- // Lookup targetCollection given route key. Iterates in reverse chronological order.
- // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
- for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
- Instant colStartTime = entry.getKey();
- if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime
- return entry; //found it
- }
- }
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Doc " + printableId + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + docTimestamp);
- }
-
- private void createNextCollection(String mostRecentCollName) {
- // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection
- // and update the alias contingent on the most recent collection name being the same as
- // what we think so here, otherwise it will return (without error).
- try {
- MaintainRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
- // we don't care about the response. It's possible no collection was created because
- // of a race and that's okay... we'll ultimately retry any way.
-
- // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
- // not yet know about the new alias (thus won't see the newly added collection to it), and we might think
- // we failed.
- zkController.getZkStateReader().aliasesManager.update();
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
-
- private SolrException newAliasMustExistException() {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "Collection " + thisCollection + " created for use with alias " + getAliasName() + " which doesn't exist anymore." +
- " You cannot write to this unless the alias exists.");
- }
-
- @Override
- public void processDelete(DeleteUpdateCommand cmd) throws IOException {
- final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
- cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
- }
-
- @Override
- public void processCommit(CommitUpdateCommand cmd) throws IOException {
- final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
- cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
- cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't.
- }
-
-// Not supported by SolrCmdDistributor and is sketchy any way
-// @Override
-// public void processRollback(RollbackUpdateCommand cmd) throws IOException {
-// }
-
- @Override
- public void finish() throws IOException {
- try {
- cmdDistrib.finish();
- final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
- if (!errors.isEmpty()) {
- throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
- }
- } finally {
- super.finish();
- }
- }
-
- @Override
- protected void doClose() {
- try {
- cmdDistrib.close();
- } finally {
- super.doClose();
- }
- }
-
- private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {
- SchemaField uniqueKeyField = req.getSchema().getUniqueKeyField();
- // schema might not have key field...
- String idFieldName = uniqueKeyField == null ? null : uniqueKeyField.getName();
- String idValue = uniqueKeyField == null ? null : doc.getFieldValue(idFieldName).toString();
- DocCollection coll = zkController.getClusterState().getCollection(collection);
- Slice slice = coll.getRouter().getTargetSlice(idValue, doc, null, req.getParams(), coll);
- return getLeaderNode(collection, slice);
- }
-
- private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
- final Aliases aliases = zkController.getZkStateReader().getAliases();
- List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
- if (collections == null) {
- throw newAliasMustExistException();
- }
- return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
- }
-
- private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
- final Slice[] activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlicesArr();
- if (activeSlices.length == 0) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
- }
- final Slice slice = activeSlices[0];
- return getLeaderNode(collection, slice);
- }
-
- private SolrCmdDistributor.Node getLeaderNode(String collection, Slice slice) {
- //TODO when should we do StdNode vs RetryNode?
- final Replica leader = slice.getLeader();
- if (leader == null) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
- }
- return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
- collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
- }
-
-
- /**
- * Create as many collections as required. This method loops to allow for the possibility that the docTimestamp
- * requires more than one collection to be created. Since multiple threads may be invoking maintain on separate
- * requests to the same alias, we must pass in the name of the collection that this thread believes to be the most
- * recent collection. This assumption is checked when the command is executed in the overseer. When this method
- * finds that all collections required have been created it returns the (possibly new) most recent collection.
- * The return value is ignored by the calling code in the async preemptive case.
- *
- * @param docTimestamp the timestamp from the document that determines routing
- * @param printableId an identifier for the add command used in error messages
- * @param targetCollectionDesc the descriptor for the presently selected collection which should also be
- * the most recent collection in all cases where this method is invoked.
- * @return The latest collection, including collections created during maintenance
- */
- private String createAllRequiredCollections( Instant docTimestamp, String printableId,
- Map.Entry<Instant, String> targetCollectionDesc) {
- do {
- switch(typeOfCreationRequired(docTimestamp, targetCollectionDesc.getKey())) {
- case NONE:
- return targetCollectionDesc.getValue(); // we don't need another collection
- case ASYNC_PREEMPTIVE:
- // can happen when preemptive interval is longer than one time slice
- if (!preemptiveCreateOnceAlready) {
- String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
- preemptiveAsync(() -> createNextCollection(mostRecentCollName));
- return targetCollectionDesc.getValue();
- }
- case SYNCHRONOUS:
- createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but...
- if (!updateParsedCollectionAliases()) { // thus we didn't make progress...
- // this is not expected, even in known failure cases, but we check just in case
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "We need to create a new time routed collection but for unknown reasons were unable to do so.");
- }
- // then retry the loop ... have to do find again in case other requests also added collections
- // that were made visible when we called updateParsedCollectionAliases()
- targetCollectionDesc = findCandidateGivenTimestamp(docTimestamp, printableId);
- break;
- default:
- throw unknownCreateType();
- }
- } while (true);
- }
-
- private SolrException unknownCreateType() {
- return new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown creation type while adding " +
- "document to a Time Routed Alias! This is a bug caused when a creation type has been added but " +
- "not all code has been updated to handle it.");
- }
-
- enum CreationType {
- NONE,
- ASYNC_PREEMPTIVE,
- SYNCHRONOUS
- }
-
-
-}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
index 7eeff5a..4fc1d50 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java
@@ -52,6 +52,7 @@ import org.apache.solr.client.solrj.response.FieldStatsInfo;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.api.collections.RoutedAlias;
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
@@ -127,9 +128,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
final String col23rd = alias + "_2017-10-23";
CollectionAdminRequest.createCollection(col23rd, configName, 2, 2)
.setMaxShardsPerNode(2)
- .withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
+ .withProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
.process(solrClient);
-
+
cluster.waitForActiveCollection(col23rd, 2, 4);
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
@@ -161,7 +162,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
// destined for this collection, Solr will see it already exists and add it to the alias.
final String col24th = alias + "_2017-10-24";
CollectionAdminRequest.createCollection(col24th, configName, 1, 1) // more shards and replicas now
- .withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
+ .withProperty(RoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
.process(solrClient);
// index 3 documents in a random fashion
@@ -636,7 +637,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
// Here we quickly add another doc in a separate request, before the collection creation has completed.
// This has the potential to incorrectly cause preemptive collection creation to run twice and create a
- // second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
+ // second collection. RoutedAliasUpdateProcessor is meant to guard against this race condition.
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
params));
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index dee2f5f..f8d2925 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -79,7 +79,8 @@ public interface CollectionParams {
DELETEALIAS(true, LockLevel.COLLECTION),
ALIASPROP(true, LockLevel.COLLECTION),
LISTALIASES(false, LockLevel.NONE),
- MAINTAINROUTEDALIAS(true, LockLevel.COLLECTION),
+ MAINTAINTIMEROUTEDALIAS(true, LockLevel.COLLECTION), // internal use only
+ MAINTAINCATEGORYROUTEDALIAS(true, LockLevel.COLLECTION), // internal use only
DELETEROUTEDALIASCOLLECTIONS(true, LockLevel.COLLECTION),
SPLITSHARD(true, LockLevel.SHARD),
DELETESHARD(true, LockLevel.SHARD),