You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/01/31 09:54:39 UTC
[lucene-solr] 01/01: Initial patch.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-11127
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 48167039661af434c7d2d9fd1d4e03e75b9f8c0c
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Jan 31 10:54:11 2019 +0100
Initial patch.
---
.../cloud/api/collections/DeleteCollectionCmd.java | 2 +-
.../OverseerCollectionMessageHandler.java | 35 +---
.../api/collections/ReindexCollectionCmd.java | 204 +++++++++++++++++++++
.../solr/cloud/overseer/CollectionMutator.java | 11 ++
.../solr/handler/admin/CollectionsHandler.java | 2 +
.../solrj/request/CollectionAdminRequest.java | 16 +-
.../solr/common/params/CollectionParams.java | 3 +-
7 files changed, 237 insertions(+), 36 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index e5f6f2d..7177f03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -181,7 +181,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
}
- private String referencedByAlias(String collection, Aliases aliases) {
+ public static String referencedByAlias(String collection, Aliases aliases) {
Objects.requireNonNull(aliases);
return aliases.getCollectionAliasListMap().entrySet().stream()
.filter(e -> e.getValue().contains(collection))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index e67fc7f..3f5a7bc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -31,39 +31,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.UTILIZENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.Utils.makeMap;
@@ -271,6 +239,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new AddReplicaCmd(this))
.put(MOVEREPLICA, new MoveReplicaCmd(this))
+ .put(REINDEX_COLLECTION, new ReindexCollectionCmd(this))
.put(UTILIZENODE, new UtilizeNodeCmd(this))
.build()
;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
new file mode 100644
index 0000000..6f0b091
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String ABORT = "abort";
+ public static final String COL_PREFIX = ".reindex_";
+ public static final String REINDEX_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex";
+ public static final String REINDEX_PHASE_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex_phase";
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public enum State {
+ IDLE,
+ RUNNING,
+ ABORTED,
+ FINISHED;
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ public static State get(String p) {
+ if (p == null) {
+ return null;
+ }
+ return states.get(p.toLowerCase(Locale.ROOT));
+ }
+ static Map<String, State> states = Collections.unmodifiableMap(
+ Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
+ }
+
+ public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+
+ log.info("*** called: {}", message);
+
+ String collection = message.getStr(CommonParams.NAME);
+ boolean abort = message.getBool(ABORT, false);
+ if (collection == null || clusterState.getCollectionOrNull(collection) == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection name must be specified and must exist");
+ }
+ DocCollection coll = clusterState.getCollection(collection);
+ if (abort) {
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ REINDEX_PROP, State.ABORTED.toLower());
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ results.add(State.ABORTED.toLower(), collection);
+ return;
+ }
+ // check it's not already running
+ State state = State.get(coll.getStr(REINDEX_PROP, State.IDLE.toLower()));
+ if (state == State.RUNNING) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection);
+ }
+ // set the flag
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ REINDEX_PROP, State.RUNNING.toLower());
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ boolean aborted = false;
+ Integer rf = coll.getReplicationFactor();
+ Integer numNrt = coll.getNumNrtReplicas();
+ Integer numTlog = coll.getNumTlogReplicas();
+ Integer numPull = coll.getNumPullReplicas();
+ int numShards = coll.getActiveSlices().size();
+
+ String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
+ String tmpCollection = COL_PREFIX + collection;
+
+ try {
+ // 0. set up temp collection - delete first if necessary
+ NamedList<Object> cmdResults = new NamedList<>();
+ ZkNodeProps cmd;
+ if (clusterState.getCollectionOrNull(tmpCollection) != null) {
+ // delete any aliases and the collection
+ ocmh.zkStateReader.aliasesManager.update();
+ String alias = DeleteCollectionCmd.referencedByAlias(tmpCollection, ocmh.zkStateReader.getAliases());
+ if (alias != null) {
+ // delete the alias
+ cmd = new ZkNodeProps(CommonParams.NAME, alias);
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETEALIAS).call(clusterState, cmd, cmdResults);
+ // nocommit error checking
+ }
+ cmd = new ZkNodeProps(
+ CommonParams.NAME, tmpCollection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ // nocommit error checking
+ }
+ // create the tmp collection - use RF=1
+ cmd = new ZkNodeProps(
+ CommonParams.NAME, tmpCollection,
+ ZkStateReader.NUM_SHARDS_PROP, String.valueOf(numShards),
+ ZkStateReader.REPLICATION_FACTOR, "1",
+ CollectionAdminParams.COLL_CONF, configName,
+ CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
+ );
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+ // nocommit error checking
+ // wait for a while until we see the collection
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
+ boolean created = false;
+ while (! waitUntil.hasTimedOut()) {
+ waitUntil.sleep(100);
+ // this also refreshes our local var clusterState
+ clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+ created = clusterState.hasCollection(tmpCollection);
+ if(created) break;
+ }
+ if (!created) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + tmpCollection);
+ }
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
+
+ // 1. copy existing docs
+
+
+ // ?. set up alias - new docs will go to the tmpCollection
+ cmd = new ZkNodeProps(
+ CommonParams.NAME, collection,
+ "collections", tmpCollection
+ );
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, cmdResults);
+ // nocommit error checking
+ } finally {
+ if (aborted) {
+ // nocommit - cleanup
+ results.add(State.ABORTED.toLower(), collection);
+ }
+ }
+ }
+
+ private boolean maybeAbort(String collection) throws Exception {
+ DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
+ if (coll == null) {
+ // collection no longer present - abort
+ return true;
+ }
+ State state = State.get(coll.getStr(REINDEX_PROP, State.RUNNING.toLower()));
+ if (state != State.ABORTED) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 88e18e2..bb549f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -119,6 +119,17 @@ public class CollectionMutator {
}
}
}
+ // other aux properties are also modifiable
+ for (String prop : message.keySet()) {
+ if (prop.startsWith(CollectionAdminRequest.PROPERTY_PREFIX)) {
+ hasAnyOps = true;
+ if (message.get(prop) == null) {
+ m.remove(prop);
+ } else {
+ m.put(prop, message.get(prop));
+ }
+ }
+ }
if (!hasAnyOps) {
return ZkStateWriter.NO_OP;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 725d2bd..405ad3d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -524,6 +524,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
+ REINDEX_COLLECTION_OP(REINDEX_COLLECTION, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
+
SYNCSHARD_OP(SYNCSHARD, (req, rsp, h) -> {
String collection = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 6f7af617..a326abe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -91,7 +91,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected final CollectionAction action;
- private static String PROPERTY_PREFIX = "property.";
+ public static String PROPERTY_PREFIX = "property.";
public CollectionAdminRequest(CollectionAction action) {
this("/admin/collections", action);
@@ -782,6 +782,20 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
/**
+ * Returns a SolrRequest to reindex a collection
+ */
+ public static ReindexCollection reindexCollection(String collection) {
+ return new ReindexCollection(collection);
+ }
+
+ public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
+
+ private ReindexCollection(String collection) {
+ super(CollectionAction.REINDEX_COLLECTION, collection);
+ }
+ }
+
+ /**
* Returns a SolrRequest to delete a collection
*/
public static Delete deleteCollection(String collection) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index dee2f5f..6176263 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -121,7 +121,8 @@ public interface CollectionParams {
MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
NONE(false, LockLevel.NONE),
// TODO: not implemented yet
- MERGESHARDS(true, LockLevel.SHARD)
+ MERGESHARDS(true, LockLevel.SHARD),
+ REINDEX_COLLECTION(true, LockLevel.COLLECTION)
;
public final boolean isWrite;