You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/01/23 13:28:06 UTC
[34/51] lucene-solr:jira/solr-11714: SOLR-11817: Move Collections API
classes to it's own package
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
deleted file mode 100644
index 607588c..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
-import org.apache.solr.util.TimeZoneUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_INTERVAL_METADATA;
-
-/**
- * For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
- * add a new collection.
- * If a collection is created, then collection creation info is returned.
- *
- * Note: this logic is within an Overseer because we want to leverage the mutual exclusion
- * property afforded by the lock it obtains on the alias name.
- * @since 7.3
- */
-public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName";
-
- public static final String COLL_METAPREFIX = "collection-create.";
-
- private final OverseerCollectionMessageHandler ocmh;
-
- public RoutedAliasCreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
- this.ocmh = ocmh;
- }
-
- /* TODO:
- There are a few classes related to time routed alias processing. We need to share some logic better.
- */
-
-
- @Override
- public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
- //---- PARSE PRIMARY MESSAGE PARAMS
- // important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us
- final String aliasName = message.getStr(NAME);
- // the client believes this is the mostRecent collection name. We assert this if provided.
- final String ifMostRecentCollName = message.getStr(IF_MOST_RECENT_COLL_NAME); // optional
-
- // TODO collection param (or intervalDateMath override?), useful for data capped collections
-
- //---- PARSE ALIAS INFO FROM ZK
- final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder;
- final Aliases aliases = aliasesHolder.getAliases();
- final Map<String, String> aliasMetadata = aliases.getCollectionAliasMetadata(aliasName);
- if (aliasMetadata == null) {
- throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
- }
-
- String routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
- if (routeField == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "This command only works on time routed aliases. Expected alias metadata not found.");
- }
- String intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
- TimeZone intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
-
- //TODO this is ugly; how can we organize the code related to this feature better?
- final List<Map.Entry<Instant, String>> parsedCollections =
- TimeRoutedAliasUpdateProcessor.parseCollections(aliasName, aliases, () -> newAliasMustExistException(aliasName));
-
- //---- GET MOST RECENT COLL
- final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
- final Instant mostRecentCollTimestamp = mostRecentEntry.getKey();
- final String mostRecentCollName = mostRecentEntry.getValue();
- if (ifMostRecentCollName != null) {
- if (!mostRecentCollName.equals(ifMostRecentCollName)) {
- // Possibly due to race conditions in URPs on multiple leaders calling us at the same time
- String msg = IF_MOST_RECENT_COLL_NAME + " expected " + ifMostRecentCollName + " but it's " + mostRecentCollName;
- if (parsedCollections.stream().map(Map.Entry::getValue).noneMatch(ifMostRecentCollName::equals)) {
- msg += ". Furthermore this collection isn't in the list of collections referenced by the alias.";
- }
- log.info(msg);
- results.add("message", msg);
- return;
- }
- } else if (mostRecentCollTimestamp.isAfter(Instant.now())) {
- final String msg = "Most recent collection is in the future, so we won't create another.";
- log.info(msg);
- results.add("message", msg);
- return;
- }
-
- //---- COMPUTE NEXT COLLECTION NAME
- final Instant nextCollTimestamp = TimeRoutedAliasUpdateProcessor.computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
- assert nextCollTimestamp.isAfter(mostRecentCollTimestamp);
- final String createCollName = TimeRoutedAliasUpdateProcessor.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
-
- //---- CREATE THE COLLECTION
- // Map alias metadata starting with a prefix to a create-collection API request
- final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
- for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
- if (e.getKey().startsWith(COLL_METAPREFIX)) {
- createReqParams.set(e.getKey().substring(COLL_METAPREFIX.length()), e.getValue());
- }
- }
- if (createReqParams.get(COLL_CONF) == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "We require an explicit " + COLL_CONF );
- }
- createReqParams.set(NAME, createCollName);
- createReqParams.set("property." + TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, aliasName);
- // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
- // Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
- final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
- new LocalSolrQueryRequest(null, createReqParams),
- null,
- ocmh.overseer.getCoreContainer().getCollectionsHandler());
- createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
- // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd
- ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
-
- CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
-
- //TODO delete some of the oldest collection(s) ?
-
- //---- UPDATE THE ALIAS
- aliasesHolder.applyModificationAndExportToZk(curAliases -> {
- final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
- if (curTargetCollections.contains(createCollName)) {
- return curAliases;
- } else {
- List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
- // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
- newTargetCollections.add(createCollName);
- newTargetCollections.addAll(curTargetCollections);
- return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
- }
- });
-
- }
-
- private SolrException newAliasMustExistException(String aliasName) {
- return new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Alias " + aliasName + " does not exist.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
deleted file mode 100644
index 9732616..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CompositeIdRouter;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.PlainIdRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CommonAdminParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.util.TestInjection;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-
-public class SplitShardCmd implements Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final OverseerCollectionMessageHandler ocmh;
-
- public SplitShardCmd(OverseerCollectionMessageHandler ocmh) {
- this.ocmh = ocmh;
- }
-
- @Override
- public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- split(state, message, results);
- }
-
- public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
- boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
- String collectionName = message.getStr(CoreAdminParams.COLLECTION);
-
- log.info("Split shard invoked");
- ZkStateReader zkStateReader = ocmh.zkStateReader;
- zkStateReader.forceUpdateCollection(collectionName);
- AtomicReference<String> slice = new AtomicReference<>();
- slice.set(message.getStr(ZkStateReader.SHARD_ID_PROP));
-
- String splitKey = message.getStr("split.key");
- DocCollection collection = clusterState.getCollection(collectionName);
-
- PolicyHelper.SessionWrapper sessionWrapper = null;
-
- Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
-
- // find the leader for the shard
- Replica parentShardLeader = null;
- try {
- parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice.get(), 10000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- // let's record the ephemeralOwner of the parent leader node
- Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
- if (leaderZnodeStat == null) {
- // we just got to know the leader but its live node is gone already!
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
- }
-
- List<DocRouter.Range> subRanges = new ArrayList<>();
- List<String> subSlices = new ArrayList<>();
- List<String> subShardNames = new ArrayList<>();
-
- String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
-
- try {
-
- boolean oldShardsDeleted = false;
- for (String subSlice : subSlices) {
- Slice oSlice = collection.getSlice(subSlice);
- if (oSlice != null) {
- final Slice.State state = oSlice.getState();
- if (state == Slice.State.ACTIVE) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
- } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
- // delete the shards
- log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, subSlice);
- ZkNodeProps m = new ZkNodeProps(propMap);
- try {
- ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + subSlice,
- e);
- }
-
- oldShardsDeleted = true;
- }
- }
- }
-
- if (oldShardsDeleted) {
- // refresh the locally cached cluster state
- // we know we have the latest because otherwise deleteshard would have failed
- clusterState = zkStateReader.getClusterState();
- collection = clusterState.getCollection(collectionName);
- }
-
- final String asyncId = message.getStr(ASYNC);
- Map<String, String> requestMap = new HashMap<>();
- String nodeName = parentShardLeader.getNodeName();
-
- for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = subSlices.get(i);
- String subShardName = subShardNames.get(i);
- DocRouter.Range subRange = subRanges.get(i);
-
- log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
-
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
- propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
- propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
- propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
- propMap.put("shard_parent_node", parentShardLeader.getNodeName());
- propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
-
- // wait until we are able to see the new shard in cluster state
- ocmh.waitForNewShard(collectionName, subSlice);
-
- // refresh cluster state
- clusterState = zkStateReader.getClusterState();
-
- log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
- + " on " + nodeName);
- propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, subSlice);
- propMap.put("node", nodeName);
- propMap.put(CoreAdminParams.NAME, subShardName);
- propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- // copy over property params:
- for (String key : message.keySet()) {
- if (key.startsWith(COLL_PROP_PREFIX)) {
- propMap.put(key, message.getStr(key));
- }
- }
- // add async param
- if (asyncId != null) {
- propMap.put(ASYNC, asyncId);
- }
- ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
- }
-
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
-
- ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
-
- for (String subShardName : subShardNames) {
- // wait for parent leader to acknowledge the sub-shard core
- log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
- String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
- CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
- cmd.setCoreName(subShardName);
- cmd.setNodeName(nodeName);
- cmd.setCoreNodeName(coreNodeName);
- cmd.setState(Replica.State.ACTIVE);
- cmd.setCheckLive(true);
- cmd.setOnlyIfLeader(true);
-
- ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
- ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
- }
-
- ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
- asyncId, requestMap);
-
- log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
- + " on: " + parentShardLeader);
-
- log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
- + collectionName + " on " + parentShardLeader);
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
- params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
- for (int i = 0; i < subShardNames.size(); i++) {
- String subShardName = subShardNames.get(i);
- params.add(CoreAdminParams.TARGET_CORE, subShardName);
- }
- params.set(CoreAdminParams.RANGES, rangesStr);
-
- ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
- ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
- requestMap);
-
- log.info("Index on shard: " + nodeName + " split into two successfully");
-
- // apply buffered updates on sub-shards
- for (int i = 0; i < subShardNames.size(); i++) {
- String subShardName = subShardNames.get(i);
-
- log.info("Applying buffered updates on : " + subShardName);
-
- params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
- params.set(CoreAdminParams.NAME, subShardName);
-
- ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
- }
-
- ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
- " to apply buffered updates", asyncId, requestMap);
-
- log.info("Successfully applied buffered updates on : " + subShardNames);
-
- // Replica creation for the new Slices
-
- // look at the replication factor and see if it matches reality
- // if it does not, find best nodes to create more cores
-
- // TODO: Have replication factor decided in some other way instead of numShards for the parent
-
- int repFactor = parentSlice.getReplicas().size();
-
- // we need to look at every node and see how many cores it serves
- // add our new cores to existing nodes serving the least number of cores
- // but (for now) require that each core goes on a distinct node.
-
- // TODO: add smarter options that look at the current number of cores per
- // node?
- // for now we just go random
- Set<String> nodes = clusterState.getLiveNodes();
- List<String> nodeList = new ArrayList<>(nodes.size());
- nodeList.addAll(nodes);
-
- // TODO: Have maxShardsPerNode param for this operation?
-
- // Remove the node that hosts the parent shard for replica creation.
- nodeList.remove(nodeName);
-
- // TODO: change this to handle sharding a slice into > 2 sub-shards.
-
- List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
- clusterState,
- new ArrayList<>(clusterState.getLiveNodes()),
- collectionName,
- new ZkNodeProps(collection.getProperties()),
- subSlices, repFactor - 1, 0, 0);
- sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
-
- List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
-
- for (ReplicaPosition replicaPosition : replicaPositions) {
- String sliceName = replicaPosition.shard;
- String subShardNodeName = replicaPosition.node;
- String solrCoreName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
-
- log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
- + collectionName + " on " + subShardNodeName);
-
- ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
- ZkStateReader.COLLECTION_PROP, collectionName,
- ZkStateReader.SHARD_ID_PROP, sliceName,
- ZkStateReader.CORE_NAME_PROP, solrCoreName,
- ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
- ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
- ZkStateReader.NODE_NAME_PROP, subShardNodeName,
- CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
-
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, sliceName);
- propMap.put("node", subShardNodeName);
- propMap.put(CoreAdminParams.NAME, solrCoreName);
- // copy over property params:
- for (String key : message.keySet()) {
- if (key.startsWith(COLL_PROP_PREFIX)) {
- propMap.put(key, message.getStr(key));
- }
- }
- // add async param
- if (asyncId != null) {
- propMap.put(ASYNC, asyncId);
- }
- // special flag param to instruct addReplica not to create the replica in cluster state again
- propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
-
- propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-
- replicas.add(propMap);
- }
-
- assert TestInjection.injectSplitFailureBeforeReplicaCreation();
-
- long ephemeralOwner = leaderZnodeStat.getEphemeralOwner();
- // compare against the ephemeralOwner of the parent leader node
- leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
- if (leaderZnodeStat == null || ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
- // put sub-shards in recovery_failed state
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- for (String subSlice : subSlices) {
- propMap.put(subSlice, Slice.State.RECOVERY_FAILED.toString());
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
-
- if (leaderZnodeStat == null) {
- // the leader is not live anymore, fail the split!
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
- } else if (ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
- // there's a new leader, fail the split!
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "The zk session id for the shard leader node: " + parentShardLeader.getNodeName() + " has changed from "
- + ephemeralOwner + " to " + leaderZnodeStat.getEphemeralOwner() + ". This can cause data loss so we must abort the split");
- }
- }
-
- // we must set the slice state into recovery before actually creating the replica cores
- // this ensures that the logic inside Overseer to update sub-shard state to 'active'
- // always gets a chance to execute. See SOLR-7673
-
- if (repFactor == 1) {
- // switch sub shard states to 'active'
- log.info("Replication factor is 1 so switching shard states");
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- propMap.put(slice.get(), Slice.State.INACTIVE.toString());
- for (String subSlice : subSlices) {
- propMap.put(subSlice, Slice.State.ACTIVE.toString());
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
- } else {
- log.info("Requesting shard state be set to 'recovery'");
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- for (String subSlice : subSlices) {
- propMap.put(subSlice, Slice.State.RECOVERY.toString());
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
- }
-
- // now actually create replica cores on sub shard nodes
- for (Map<String, Object> replica : replicas) {
- ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
- }
-
- ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
-
- log.info("Successfully created all replica shards for all sub-slices " + subSlices);
-
- ocmh.commit(results, slice.get(), parentShardLeader);
-
- return true;
- } catch (SolrException e) {
- throw e;
- } catch (Exception e) {
- log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
- } finally {
- if (sessionWrapper != null) sessionWrapper.release();
- }
- }
-
- public static Slice getParentSlice(ClusterState clusterState, String collectionName, AtomicReference<String> slice, String splitKey) {
- DocCollection collection = clusterState.getCollection(collectionName);
- DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
-
- Slice parentSlice;
-
- if (slice.get() == null) {
- if (router instanceof CompositeIdRouter) {
- Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
- if (searchSlices.isEmpty()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
- }
- if (searchSlices.size() > 1) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
- }
- parentSlice = searchSlices.iterator().next();
- slice.set(parentSlice.getName());
- log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
- + router.getClass().getName());
- }
- } else {
- parentSlice = collection.getSlice(slice.get());
- }
-
- if (parentSlice == null) {
- // no chance of the collection being null because ClusterState#getCollection(String) would have thrown
- // an exception already
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
- }
- return parentSlice;
- }
-
- public static String fillRanges(SolrCloudManager cloudManager, ZkNodeProps message, DocCollection collection, Slice parentSlice,
- List<DocRouter.Range> subRanges, List<String> subSlices, List<String> subShardNames) {
- String splitKey = message.getStr("split.key");
- DocRouter.Range range = parentSlice.getRange();
- if (range == null) {
- range = new PlainIdRouter().fullRange();
- }
- DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
-
- String rangesStr = message.getStr(CoreAdminParams.RANGES);
- if (rangesStr != null) {
- String[] ranges = rangesStr.split(",");
- if (ranges.length == 0 || ranges.length == 1) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
- } else {
- for (int i = 0; i < ranges.length; i++) {
- String r = ranges[i];
- try {
- subRanges.add(DocRouter.DEFAULT.fromString(r));
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
- }
- if (!subRanges.get(i).isSubsetOf(range)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
- }
- }
- List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
- Collections.sort(temp);
- if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
- }
- for (int i = 1; i < temp.size(); i++) {
- if (temp.get(i - 1).max + 1 != temp.get(i).min) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
- + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
- }
- }
- }
- } else if (splitKey != null) {
- if (router instanceof CompositeIdRouter) {
- CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
- List<DocRouter.Range> tmpSubRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
- if (tmpSubRanges.size() == 1) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
- + " has a hash range that is exactly equal to hash range of shard: " + parentSlice.getName());
- }
- for (DocRouter.Range subRange : tmpSubRanges) {
- if (subRange.min == subRange.max) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
- }
- }
- subRanges.addAll(tmpSubRanges);
- log.info("Partitioning parent shard " + parentSlice.getName() + " range: " + parentSlice.getRange() + " yields: " + subRanges);
- rangesStr = "";
- for (int i = 0; i < subRanges.size(); i++) {
- DocRouter.Range subRange = subRanges.get(i);
- rangesStr += subRange.toString();
- if (i < subRanges.size() - 1) rangesStr += ',';
- }
- }
- } else {
- // todo: fixed to two partitions?
- subRanges.addAll(router.partitionRange(2, range));
- }
-
- for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = parentSlice.getName() + "_" + i;
- subSlices.add(subSlice);
- String subShardName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), collection, subSlice, Replica.Type.NRT);
- subShardNames.add(subShardName);
- }
- return rangesStr;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
deleted file mode 100644
index 6a55cfd..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.params.AutoScalingParams.NODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final OverseerCollectionMessageHandler ocmh;
-
- public UtilizeNodeCmd(OverseerCollectionMessageHandler ocmh) {
- this.ocmh = ocmh;
- }
-
- @Override
- public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
- ocmh.checkRequired(message, NODE);
- String nodeName = message.getStr(NODE);
- String async = message.getStr(ASYNC);
- AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
-
- //first look for any violation that may use this replica
- List<ZkNodeProps> requests = new ArrayList<>();
- //first look for suggestions if any
- List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(autoScalingConfig, ocmh.overseer.getSolrCloudManager());
- for (Suggester.SuggestionInfo suggestionInfo : suggestions) {
- log.info("op: " + suggestionInfo.getOperation());
- String coll = null;
- List<String> pieces = StrUtils.splitSmart(suggestionInfo.getOperation().getPath(), '/');
- if (pieces.size() > 1) {
- coll = pieces.get(2);
- } else {
- continue;
- }
- log.info("coll: " + coll);
- if (suggestionInfo.getOperation() instanceof V2Request) {
- String targetNode = (String) Utils.getObjectByPath(suggestionInfo.getOperation(), true, "command/move-replica/targetNode");
- if (Objects.equals(targetNode, nodeName)) {
- String replica = (String) Utils.getObjectByPath(suggestionInfo.getOperation(), true, "command/move-replica/replica");
- requests.add(new ZkNodeProps(COLLECTION_PROP, coll,
- CollectionParams.TARGET_NODE, targetNode,
- ASYNC, async,
- REPLICA_PROP, replica));
- }
- }
- }
- executeAll(requests);
- PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager());
- Policy.Session session = sessionWrapper.get();
- for (; ; ) {
- Suggester suggester = session.getSuggester(MOVEREPLICA)
- .hint(Suggester.Hint.TARGET_NODE, nodeName);
- session = suggester.getSession();
- SolrRequest request = suggester.getSuggestion();
- if (request == null) break;
- requests.add(new ZkNodeProps(COLLECTION_PROP, request.getParams().get(COLLECTION_PROP),
- CollectionParams.TARGET_NODE, request.getParams().get(CollectionParams.TARGET_NODE),
- REPLICA_PROP, request.getParams().get(REPLICA_PROP),
- ASYNC, request.getParams().get(ASYNC)));
- }
- sessionWrapper.returnSession(session);
- try {
- executeAll(requests);
- } finally {
- sessionWrapper.release();
- }
- }
-
- private void executeAll(List<ZkNodeProps> requests) throws Exception {
- if (requests.isEmpty()) return;
- for (ZkNodeProps props : requests) {
- NamedList result = new NamedList();
- ocmh.commandMap.get(MOVEREPLICA)
- .call(ocmh.overseer.getSolrCloudManager().getClusterStateProvider().getClusterState(),
- props,
- result);
- }
- requests.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
new file mode 100644
index 0000000..6b4e427
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.cloud.CloudUtil;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
+public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public AddReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ addReplica(state, message, results, null);
+ }
+
+ ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+ throws IOException, InterruptedException {
+ log.debug("addReplica() : {}", Utils.toJSONString(message));
+ boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+ boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+ final String asyncId = message.getStr(ASYNC);
+
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+ message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
+
+ String collection = message.getStr(COLLECTION_PROP);
+ DocCollection coll = clusterState.getCollection(collection);
+
+ String node = message.getStr(CoreAdminParams.NODE);
+ String shard = message.getStr(SHARD_ID_PROP);
+ String coreName = message.getStr(CoreAdminParams.NAME);
+ String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
+ int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
+ Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
+ boolean parallel = message.getBool("parallel", false);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ ZkStateReader zkStateReader = ocmh.zkStateReader;
+ if (!Overseer.isLegacy(zkStateReader)) {
+ if (!skipCreateReplicaInClusterState) {
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.SHARD_ID_PROP, shard,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+ ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
+ ZkStateReader.NODE_NAME_PROP, node,
+ ZkStateReader.REPLICA_TYPE, replicaType.name());
+ if (coreNodeName != null) {
+ props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+ }
+ try {
+ Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
+ }
+ }
+ params.set(CoreAdminParams.CORE_NODE_NAME,
+ ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
+ }
+
+ String configName = zkStateReader.readConfigName(collection);
+ String routeKey = message.getStr(ShardParams._ROUTE_);
+ String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
+ String ulogDir = message.getStr(CoreAdminParams.ULOG_DIR);
+ String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
+
+ params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+ params.set(CoreAdminParams.NAME, coreName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collection);
+ params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
+ if (shard != null) {
+ params.set(CoreAdminParams.SHARD, shard);
+ } else if (routeKey != null) {
+ Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
+ if (slices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
+ } else {
+ params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
+ }
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
+ }
+ if (dataDir != null) {
+ params.set(CoreAdminParams.DATA_DIR, dataDir);
+ }
+ if (ulogDir != null) {
+ params.set(CoreAdminParams.ULOG_DIR, ulogDir);
+ }
+ if (instanceDir != null) {
+ params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
+ }
+ if (coreNodeName != null) {
+ params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+ }
+ ocmh.addPropertyParams(message, params);
+
+ // For tracking async calls.
+ Map<String,String> requestMap = new HashMap<>();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+ ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
+
+ final String fnode = node;
+ final String fcoreName = coreName;
+
+ Runnable runnable = () -> {
+ ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
+ ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
+ if (sessionWrapper.get() != null) {
+ sessionWrapper.get().release();
+ }
+ if (onComplete != null) onComplete.run();
+ };
+
+ if (!parallel || waitForFinalState) {
+ if (waitForFinalState) {
+ SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh);
+ ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), latch);
+ try {
+ zkStateReader.registerCollectionStateWatcher(collection, watcher);
+ runnable.run();
+ if (!latch.await(timeout, TimeUnit.SECONDS)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
+ }
+ } finally {
+ zkStateReader.removeCollectionStateWatcher(collection, watcher);
+ }
+ } else {
+ runnable.run();
+ }
+ } else {
+ ocmh.tpe.submit(runnable);
+ }
+
+
+ return new ZkNodeProps(
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.SHARD_ID_PROP, shard,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.NODE_NAME_PROP, node
+ );
+ }
+
+ public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
+ ZkNodeProps message, AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+ boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+
+ String collection = message.getStr(COLLECTION_PROP);
+ String node = message.getStr(CoreAdminParams.NODE);
+ String shard = message.getStr(SHARD_ID_PROP);
+ String coreName = message.getStr(CoreAdminParams.NAME);
+ String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
+ Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
+ if (StringUtils.isBlank(coreName)) {
+ coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
+ }
+
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ if (coll.getSlice(shard) == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " shard: " + shard + " does not exist");
+ }
+
+ // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
+ if (!skipCreateReplicaInClusterState) {
+ if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+ if (node == null) {
+ if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
+ node = Assign.identifyNodes(cloudManager,
+ clusterState,
+ Collections.emptyList(),
+ collection,
+ message,
+ Collections.singletonList(shard),
+ replicaType == Replica.Type.NRT ? 0 : 1,
+ replicaType == Replica.Type.TLOG ? 0 : 1,
+ replicaType == Replica.Type.PULL ? 0 : 1
+ ).get(0).node;
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+ }
+ } else {
+ node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node,
+ cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
+ }
+ }
+ log.info("Node Identified {} for creating new replica", node);
+
+ if (!clusterState.liveNodesContain(node)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+ }
+ if (coreName == null) {
+ coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
+ } else if (!skipCreateReplicaInClusterState) {
+ //Validate that the core name is unique in that collection
+ for (Slice slice : coll.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ String replicaCoreName = replica.getStr(CORE_NAME_PROP);
+ if (coreName.equals(replicaCoreName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
+ " for this collection");
+ }
+ }
+ }
+ }
+ if (coreNodeName != null) {
+ message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+ }
+ message = message.plus(CoreAdminParams.NAME, coreName);
+ message = message.plus(CoreAdminParams.NODE, node);
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
new file mode 100644
index 0000000..e7ce583
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.cloud.rule.Rule;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.NumberUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+
+public class Assign {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static int incAndGetId(DistribStateManager stateManager, String collection, int defaultValue) {
+ String path = "/collections/"+collection;
+ try {
+ if (!stateManager.hasData(path)) {
+ try {
+ stateManager.makePath(path);
+ } catch (AlreadyExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+ }
+ path += "/counter";
+ if (!stateManager.hasData(path)) {
+ try {
+ stateManager.createData(path, NumberUtils.intToBytes(defaultValue), CreateMode.PERSISTENT);
+ } catch (AlreadyExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating counter node in Zookeeper for collection:" + collection, e);
+ } catch (IOException | KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating counter node in Zookeeper for collection:" + collection, e);
+ }
+
+ while (true) {
+ try {
+ int version = 0;
+ int currentId = 0;
+ VersionedData data = stateManager.getData(path, null);
+ if (data != null) {
+ currentId = NumberUtils.bytesToInt(data.getData());
+ version = data.getVersion();
+ }
+ byte[] bytes = NumberUtils.intToBytes(++currentId);
+ stateManager.setData(path, bytes, version);
+ return currentId;
+ } catch (BadVersionException e) {
+ continue;
+ } catch (IOException | KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:"+collection, e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:" + collection, e);
+ }
+ }
+ }
+
+ public static String assignCoreNodeName(DistribStateManager stateManager, DocCollection collection) {
+ // for backward compatibility;
+ int defaultValue = defaultCounterValue(collection, false);
+ String coreNodeName = "core_node" + incAndGetId(stateManager, collection.getName(), defaultValue);
+ while (collection.getReplica(coreNodeName) != null) {
+ // there is wee chance that, the new coreNodeName id not totally unique,
+ // but this will be guaranteed unique for new collections
+ coreNodeName = "core_node" + incAndGetId(stateManager, collection.getName(), defaultValue);
+ }
+ return coreNodeName;
+ }
+
+ /**
+ * Assign a new unique id up to slices count - then add replicas evenly.
+ *
+ * @return the assigned shard id
+ */
+ public static String assignShard(DocCollection collection, Integer numShards) {
+ if (numShards == null) {
+ numShards = 1;
+ }
+ String returnShardId = null;
+ Map<String, Slice> sliceMap = collection != null ? collection.getActiveSlicesMap() : null;
+
+
+ // TODO: now that we create shards ahead of time, is this code needed? Esp since hash ranges aren't assigned when creating via this method?
+
+ if (sliceMap == null) {
+ return "shard1";
+ }
+
+ List<String> shardIdNames = new ArrayList<>(sliceMap.keySet());
+
+ if (shardIdNames.size() < numShards) {
+ return "shard" + (shardIdNames.size() + 1);
+ }
+
+ // TODO: don't need to sort to find shard with fewest replicas!
+
+ // else figure out which shard needs more replicas
+ final Map<String, Integer> map = new HashMap<>();
+ for (String shardId : shardIdNames) {
+ int cnt = sliceMap.get(shardId).getReplicasMap().size();
+ map.put(shardId, cnt);
+ }
+
+ Collections.sort(shardIdNames, (String o1, String o2) -> {
+ Integer one = map.get(o1);
+ Integer two = map.get(o2);
+ return one.compareTo(two);
+ });
+
+ returnShardId = shardIdNames.get(0);
+ return returnShardId;
+ }
+
+ private static String buildSolrCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
+ // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
+ return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
+ }
+
+ private static int defaultCounterValue(DocCollection collection, boolean newCollection) {
+ if (newCollection) return 0;
+ int defaultValue = collection.getReplicas().size();
+ if (collection.getReplicationFactor() != null) {
+ // numReplicas and replicationFactor * numSlices can be not equals,
+ // in case of many addReplicas or deleteReplicas are executed
+ defaultValue = Math.max(defaultValue,
+ collection.getReplicationFactor() * collection.getSlices().size());
+ }
+ return defaultValue * 20;
+ }
+
+ public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
+ Slice slice = collection.getSlice(shard);
+ int defaultValue = defaultCounterValue(collection, newCollection);
+ int replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
+ String coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
+ while (existCoreName(coreName, slice)) {
+ replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
+ coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
+ }
+ return coreName;
+ }
+
+ public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type) {
+ return buildSolrCoreName(stateManager, collection, shard, type, false);
+ }
+
+ private static boolean existCoreName(String coreName, Slice slice) {
+ if (slice == null) return false;
+ for (Replica replica : slice.getReplicas()) {
+ if (coreName.equals(replica.getStr(CORE_NAME_PROP))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
+ // TODO: add smarter options that look at the current number of cores per
+ // node?
+ // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
+
+ List<String> nodeList;
+
+ final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
+ final List<String> createNodeList = (createNodeSetStr == null) ? null :
+ StrUtils.splitSmart((OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY.equals(createNodeSetStr) ?
+ "" : createNodeSetStr), ",", true);
+
+ if (createNodeList != null) {
+ nodeList = new ArrayList<>(createNodeList);
+ nodeList.retainAll(liveNodes);
+ if (message.getBool(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE,
+ OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
+ Collections.shuffle(nodeList, random);
+ }
+ } else {
+ nodeList = new ArrayList<>(liveNodes);
+ Collections.shuffle(nodeList, random);
+ }
+
+ return nodeList;
+ }
+
+ public static List<ReplicaPosition> identifyNodes(SolrCloudManager cloudManager,
+ ClusterState clusterState,
+ List<String> nodeList,
+ String collectionName,
+ ZkNodeProps message,
+ List<String> shardNames,
+ int numNrtReplicas,
+ int numTlogReplicas,
+ int numPullReplicas) throws IOException, InterruptedException {
+ List<Map> rulesMap = (List) message.get("rule");
+ String policyName = message.getStr(POLICY);
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+
+ if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+ log.debug("Identify nodes using default");
+ int i = 0;
+ List<ReplicaPosition> result = new ArrayList<>();
+ for (String aShard : shardNames)
+ for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+ Replica.Type.TLOG, numTlogReplicas,
+ Replica.Type.PULL, numPullReplicas
+ ).entrySet()) {
+ for (int j = 0; j < e.getValue(); j++){
+ result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size())));
+ i++;
+ }
+ }
+ return result;
+ } else {
+ if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
+ }
+ }
+
+ if (rulesMap != null && !rulesMap.isEmpty()) {
+ List<Rule> rules = new ArrayList<>();
+ for (Object map : rulesMap) rules.add(new Rule((Map) map));
+ Map<String, Integer> sharVsReplicaCount = new HashMap<>();
+
+ for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
+ ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
+ sharVsReplicaCount,
+ (List<Map>) message.get(SNITCH),
+ new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
+ nodeList,
+ cloudManager,
+ clusterState);
+
+ Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
+ return nodeMappings.entrySet().stream()
+ .map(e -> new ReplicaPosition(e.getKey().shard, e.getKey().index, e.getKey().type, e.getValue()))
+ .collect(Collectors.toList());
+ } else {
+ if (message.getStr(CREATE_NODE_SET) == null)
+ nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
+ return getPositionsUsingPolicy(collectionName,
+ shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, cloudManager, nodeList);
+ }
+ }
+
+ static class ReplicaCount {
+ public final String nodeName;
+ public int thisCollectionNodes = 0;
+ public int totalNodes = 0;
+
+ ReplicaCount(String nodeName) {
+ this.nodeName = nodeName;
+ }
+
+ public int weight() {
+ return (thisCollectionNodes * 100) + totalNodes;
+ }
+ }
+
+ // Only called from createShard and addReplica (so far).
+ //
+ // Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas
+ // could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
+ public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
+ String shard, int nrtReplicas,
+ Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException {
+ log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet );
+ DocCollection coll = clusterState.getCollection(collectionName);
+ Integer maxShardsPerNode = coll.getMaxShardsPerNode();
+ List<String> createNodeList = null;
+
+ if (createNodeSet instanceof List) {
+ createNodeList = (List) createNodeSet;
+ } else {
+ createNodeList = createNodeSet == null ? null : StrUtils.splitSmart((String) createNodeSet, ",", true);
+ }
+
+ HashMap<String, ReplicaCount> nodeNameVsShardCount = getNodeNameVsShardCount(collectionName, clusterState, createNodeList);
+
+ if (createNodeList == null) { // We only care if we haven't been told to put new replicas on specific nodes.
+ int availableSlots = 0;
+ for (Map.Entry<String, ReplicaCount> ent : nodeNameVsShardCount.entrySet()) {
+ //ADDREPLICA can put more than maxShardsPerNode on an instance, so this test is necessary.
+ if (maxShardsPerNode > ent.getValue().thisCollectionNodes) {
+ availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes);
+ }
+ }
+ if (availableSlots < nrtReplicas) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d",
+ nrtReplicas, collectionName, maxShardsPerNode));
+ }
+ }
+
+ List l = (List) coll.get(DocCollection.RULE);
+ List<ReplicaPosition> replicaPositions = null;
+ if (l != null) {
+ // TODO: make it so that this method doesn't require access to CC
+ replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l);
+ }
+ String policyName = coll.getStr(POLICY);
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+ if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+ replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
+ policyName, cloudManager, createNodeList);
+ }
+
+ if(replicaPositions != null){
+ List<ReplicaCount> repCounts = new ArrayList<>();
+ for (ReplicaPosition p : replicaPositions) {
+ repCounts.add(new ReplicaCount(p.node));
+ }
+ return repCounts;
+ }
+
+ ArrayList<ReplicaCount> sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values());
+ Collections.sort(sortedNodeList, (x, y) -> (x.weight() < y.weight()) ? -1 : ((x.weight() == y.weight()) ? 0 : 1));
+ return sortedNodeList;
+
+ }
+
+ public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames,
+ int nrtReplicas,
+ int tlogReplicas,
+ int pullReplicas,
+ String policyName, SolrCloudManager cloudManager,
+ List<String> nodesList) throws IOException, InterruptedException {
+ log.debug("shardnames {} NRT {} TLOG {} PULL {} , policy {}, nodeList {}", shardNames, nrtReplicas, tlogReplicas, pullReplicas, policyName, nodesList);
+ List<ReplicaPosition> replicaPositions = null;
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+ try {
+ Map<String, String> kvMap = Collections.singletonMap(collName, policyName);
+ replicaPositions = PolicyHelper.getReplicaLocations(
+ collName,
+ autoScalingConfig,
+ cloudManager,
+ kvMap,
+ shardNames,
+ nrtReplicas,
+ tlogReplicas,
+ pullReplicas,
+ nodesList);
+ return replicaPositions;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error getting replica locations", e);
+ } finally {
+ if (log.isTraceEnabled()) {
+ if (replicaPositions != null)
+ log.trace("REPLICA_POSITIONS: " + Utils.toJSONString(Utils.getDeepCopy(replicaPositions, 7, true)));
+ log.trace("AUTOSCALING_CONF: " + Utils.toJSONString(autoScalingConfig));
+ }
+ }
+ }
+
+ private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
+ SolrCloudManager cloudManager, DocCollection coll, List<String> createNodeList, List l) {
+ ArrayList<Rule> rules = new ArrayList<>();
+ for (Object o : l) rules.add(new Rule((Map) o));
+ Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>();
+ for (Slice slice : coll.getSlices()) {
+ LinkedHashMap<String, Integer> n = new LinkedHashMap<>();
+ shardVsNodes.put(slice.getName(), n);
+ for (Replica replica : slice.getReplicas()) {
+ Integer count = n.get(replica.getNodeName());
+ if (count == null) count = 0;
+ n.put(replica.getNodeName(), ++count);
+ }
+ }
+ List snitches = (List) coll.get(SNITCH);
+ List<String> nodesList = createNodeList == null ?
+ new ArrayList<>(clusterState.getLiveNodes()) :
+ createNodeList;
+ Map<ReplicaPosition, String> positions = new ReplicaAssigner(
+ rules,
+ Collections.singletonMap(shard, numberOfNodes),
+ snitches,
+ shardVsNodes,
+ nodesList, cloudManager, clusterState).getNodeMappings();
+
+ return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions);
+ }
+
+ private static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionName,
+ ClusterState clusterState, List<String> createNodeList) {
+ Set<String> nodes = clusterState.getLiveNodes();
+
+ List<String> nodeList = new ArrayList<>(nodes.size());
+ nodeList.addAll(nodes);
+ if (createNodeList != null) nodeList.retainAll(createNodeList);
+
+ HashMap<String, ReplicaCount> nodeNameVsShardCount = new HashMap<>();
+ for (String s : nodeList) {
+ nodeNameVsShardCount.put(s, new ReplicaCount(s));
+ }
+ if (createNodeList != null) { // Overrides petty considerations about maxShardsPerNode
+ if (createNodeList.size() != nodeNameVsShardCount.size()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "At least one of the node(s) specified " + createNodeList + " are not currently active in "
+ + nodeNameVsShardCount.keySet() + ", no action taken.");
+ }
+ return nodeNameVsShardCount;
+ }
+ DocCollection coll = clusterState.getCollection(collectionName);
+ Integer maxShardsPerNode = coll.getMaxShardsPerNode();
+ Map<String, DocCollection> collections = clusterState.getCollectionsMap();
+ for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+ DocCollection c = entry.getValue();
+ //identify suitable nodes by checking the no:of cores in each of them
+ for (Slice slice : c.getSlices()) {
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica replica : replicas) {
+ ReplicaCount count = nodeNameVsShardCount.get(replica.getNodeName());
+ if (count != null) {
+ count.totalNodes++; // Used ot "weigh" whether this node should be used later.
+ if (entry.getKey().equals(collectionName)) {
+ count.thisCollectionNodes++;
+ if (count.thisCollectionNodes >= maxShardsPerNode) nodeNameVsShardCount.remove(replica.getNodeName());
+ }
+ }
+ }
+ }
+ }
+
+ return nodeNameVsShardCount;
+ }
+
+
+}