You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/05/27 15:58:31 UTC
svn commit: r1682031 [1/2] - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/
solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/core/
solr/core/src/java/org/apache/solr/logging/ solr/...
Author: markrmiller
Date: Wed May 27 13:58:30 2015
New Revision: 1682031
URL: http://svn.apache.org/r1682031
Log:
SOLR-7590: Finish and improve MDC context logging support.
Added:
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/logging/MDCLoggingContext.java
- copied unchanged from r1682020, lucene/dev/trunk/solr/core/src/java/org/apache/solr/logging/MDCLoggingContext.java
Removed:
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/logging/MDCUtils.java
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/SolrCores.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/ZkContainer.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/PeerSync.java
lucene/dev/branches/branch_5x/solr/core/src/test-files/log4j.properties
lucene/dev/branches/branch_5x/solr/server/ (props changed)
lucene/dev/branches/branch_5x/solr/server/resources/log4j.properties
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
lucene/dev/branches/branch_5x/solr/solrj/src/test-files/log4j.properties
lucene/dev/branches/branch_5x/solr/test-framework/ (props changed)
lucene/dev/branches/branch_5x/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
lucene/dev/branches/branch_5x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1682031&r1=1682030&r2=1682031&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Wed May 27 13:58:30 2015
@@ -45,6 +45,8 @@ Other Changes
* SOLR-7146: MiniSolrCloudCluster based tests can fail with ZooKeeperException NoNode for /live_nodes.
(Vamsee Yarlagadda via shalin)
+* SOLR-7590: Finish and improve MDC context logging support. (Mark Miller)
+
================== 5.2.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1682031&r1=1682030&r2=1682031&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed May 27 13:58:30 2015
@@ -1,5 +1,11 @@
package org.apache.solr.cloud;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
@@ -15,7 +21,7 @@ import org.apache.solr.common.util.Retry
import org.apache.solr.common.util.RetryUtil.RetryCmd;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.logging.MDCUtils;
+import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
@@ -25,14 +31,6 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -117,8 +115,6 @@ class ShardLeaderElectionContextBase ext
this.shardId = shardId;
this.collection = collection;
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setMDC(collection, shardId, null, null);
try {
new ZkCmdExecutor(zkStateReader.getZkClient().getZkClientTimeout())
.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection,
@@ -128,8 +124,6 @@ class ShardLeaderElectionContextBase ext
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
}
}
@@ -203,158 +197,152 @@ final class ShardLeaderElectionContext e
*/
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
- InterruptedException, IOException {
- log.info("Running the leader process for shard " + shardId);
-
+ InterruptedException, IOException {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
-
if (core == null) {
cancelElection();
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "SolrCore not found:" + coreName + " in "
- + cc.getCoreNames());
+ throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
}
-
+ MDCLoggingContext.setCore(core);
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
}
-
- lt.minimumWaitBetweenActions();
- lt.markAttemptingAction();
-
- // clear the leader in clusterstate
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
- ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
- collection);
- Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
-
- int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
- if (!weAreReplacement) {
- waitForReplicasToComeUp(leaderVoteWait);
- }
- try (SolrCore core = cc.getCore(coreName)) {
-
- if (core == null) {
- cancelElection();
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "SolrCore not found:" + coreName + " in "
- + cc.getCoreNames());
- }
+ try {
+ lt.minimumWaitBetweenActions();
+ lt.markAttemptingAction();
- // should I be leader?
- if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
- rejoinLeaderElection(core);
- return;
+ log.info("Running the leader process for shard " + shardId);
+ // clear the leader in clusterstate
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+ ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
+ Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
+
+ int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
+ if (!weAreReplacement) {
+ waitForReplicasToComeUp(leaderVoteWait);
}
- log.info("I may be the new leader - try and sync");
-
-
- // we are going to attempt to be the leader
- // first cancel any current recovery
- core.getUpdateHandler().getSolrCoreState().cancelRecovery();
-
- if (weAreReplacement) {
- // wait a moment for any floating updates to finish
- try {
- Thread.sleep(2500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
+ try (SolrCore core = cc.getCore(coreName)) {
+
+ if (core == null) {
+ cancelElection();
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
}
- }
-
- boolean success = false;
- try {
- success = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
- } catch (Exception e) {
- SolrException.log(log, "Exception while trying to sync", e);
- success = false;
- }
-
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-
- if (!success) {
- boolean hasRecentUpdates = false;
- if (ulog != null) {
- // TODO: we could optimize this if necessary
- UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+
+ // should I be leader?
+ if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
+ rejoinLeaderElection(core);
+ return;
+ }
+
+ log.info("I may be the new leader - try and sync");
+
+ // we are going to attempt to be the leader
+ // first cancel any current recovery
+ core.getUpdateHandler().getSolrCoreState().cancelRecovery();
+
+ if (weAreReplacement) {
+ // wait a moment for any floating updates to finish
try {
- hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
- } finally {
- recentUpdates.close();
+ Thread.sleep(2500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
}
}
-
- if (!hasRecentUpdates) {
- // we failed sync, but we have no versions - we can't sync in that case
- // - we were active
- // before, so become leader anyway
- log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
- success = true;
- }
- }
-
- // solrcloud_debug
- if (log.isDebugEnabled()) {
+
+ boolean success = false;
try {
- RefCounted<SolrIndexSearcher> searchHolder = core
- .getNewestSearcher(false);
- SolrIndexSearcher searcher = searchHolder.get();
+ success = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
+ } catch (Exception e) {
+ SolrException.log(log, "Exception while trying to sync", e);
+ success = false;
+ }
+
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+
+ if (!success) {
+ boolean hasRecentUpdates = false;
+ if (ulog != null) {
+ // TODO: we could optimize this if necessary
+ UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+ try {
+ hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
+ } finally {
+ recentUpdates.close();
+ }
+ }
+
+ if (!hasRecentUpdates) {
+ // we failed sync, but we have no versions - we can't sync in that case
+ // - we were active
+ // before, so become leader anyway
+ log.info(
+ "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
+ success = true;
+ }
+ }
+
+ // solrcloud_debug
+ if (log.isDebugEnabled()) {
try {
- log.debug(core.getCoreDescriptor().getCoreContainer()
- .getZkController().getNodeName()
- + " synched "
- + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
- } finally {
- searchHolder.decref();
+ RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+ SolrIndexSearcher searcher = searchHolder.get();
+ try {
+ log.debug(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " synched "
+ + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+ } finally {
+ searchHolder.decref();
+ }
+ } catch (Exception e) {
+ log.error("Error in solrcloud_debug block", e);
}
- } catch (Exception e) {
- log.error("Error in solrcloud_debug block", e);
}
- }
- if (!success) {
- rejoinLeaderElection(core);
- return;
- }
-
- log.info("I am the new leader: "
- + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
- core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
- }
-
- boolean isLeader = true;
- try {
- super.runLeaderProcess(weAreReplacement, 0);
- } catch (Exception e) {
- isLeader = false;
- SolrException.log(log, "There was a problem trying to register as the leader", e);
-
- try (SolrCore core = cc.getCore(coreName)) {
-
- if (core == null) {
- log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+ if (!success) {
+ rejoinLeaderElection(core);
return;
}
- core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
-
- // we could not publish ourselves as leader - try and rejoin election
- rejoinLeaderElection(core);
+ log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
+ core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
}
- }
-
- if (isLeader) {
- // check for any replicas in my shard that were set to down by the previous leader
+
+ boolean isLeader = true;
try {
- startLeaderInitiatedRecoveryOnReplicas(coreName);
- } catch (Exception exc) {
- // don't want leader election to fail because of
- // an error trying to tell others to recover
+ super.runLeaderProcess(weAreReplacement, 0);
+ } catch (Exception e) {
+ isLeader = false;
+ SolrException.log(log, "There was a problem trying to register as the leader", e);
+
+ try (SolrCore core = cc.getCore(coreName)) {
+
+ if (core == null) {
+ log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+ return;
+ }
+
+ core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
+
+ // we could not publish ourselves as leader - try and rejoin election
+ rejoinLeaderElection(core);
+ }
}
- }
+
+ if (isLeader) {
+ // check for any replicas in my shard that were set to down by the previous leader
+ try {
+ startLeaderInitiatedRecoveryOnReplicas(coreName);
+ } catch (Exception exc) {
+ // don't want leader election to fail because of
+ // an error trying to tell others to recover
+ }
+ }
+ } finally {
+ MDCLoggingContext.clear();
+ }
}
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
@@ -493,8 +481,7 @@ final class ShardLeaderElectionContext e
}
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
- log.info("Checking if I (core={},coreNodeName={}) should try and be the leader.", core.getName(),
- core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+ log.info("Checking if I should try and be the leader.");
if (isClosed) {
log.info("Bailing on leader process because we have been closed");
@@ -534,7 +521,7 @@ final class ShardLeaderElectionContext e
}
final class OverseerElectionContext extends ElectionContext {
-
+ private static Logger log = LoggerFactory.getLogger(OverseerElectionContext.class);
private final SolrZkClient zkClient;
private Overseer overseer;
public static final String PATH = "/overseer_elect";
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1682031&r1=1682030&r2=1682031&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Wed May 27 13:58:30 2015
@@ -91,7 +91,7 @@ import org.apache.solr.handler.component
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
-import org.apache.solr.logging.MDCUtils;
+import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.stats.Snapshot;
@@ -998,68 +998,66 @@ public class OverseerCollectionProcessor
}
@SuppressWarnings("unchecked")
- private void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
- checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP,REPLICA_PROP);
+ private void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
+ throws KeeperException, InterruptedException {
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
String collectionName = message.getStr(COLLECTION_PROP);
String shard = message.getStr(SHARD_ID_PROP);
String replicaName = message.getStr(REPLICA_PROP);
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setMDC(collectionName, shard, replicaName, null);
+
+ DocCollection coll = clusterState.getCollection(collectionName);
+ Slice slice = coll.getSlice(shard);
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ if (slice == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Invalid shard name : " + shard + " in collection : " + collectionName);
+ }
+ Replica replica = slice.getReplica(replicaName);
+ if (replica == null) {
+ ArrayList<String> l = new ArrayList<>();
+ for (Replica r : slice.getReplicas())
+ l.add(r.getName());
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
+ + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
+ }
+
+ // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
+ // on the command.
+ if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
+ + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
+ }
+
+ String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ // assume the core exists and try to unload it
+ Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE,
+ core, CoreAdminParams.DELETE_INSTANCE_DIR, "true", CoreAdminParams.DELETE_DATA_DIR, "true");
+
+ ShardRequest sreq = new ShardRequest();
+ sreq.purpose = 1;
+ sreq.shards = new String[] {baseUrl};
+ sreq.actualShards = sreq.shards;
+ sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
try {
- DocCollection coll = clusterState.getCollection(collectionName);
- Slice slice = coll.getSlice(shard);
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
- if (slice == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid shard name : " + shard + " in collection : " + collectionName);
- }
- Replica replica = slice.getReplica(replicaName);
- if (replica == null) {
- ArrayList<String> l = new ArrayList<>();
- for (Replica r : slice.getReplicas()) l.add(r.getName());
- throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
- + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
- }
-
- // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
- // on the command.
- if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Attempted to remove replica : " + collectionName + "/" +
- shard + "/" + replicaName +
- " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
- }
-
- String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
- String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
- // assume the core exists and try to unload it
- Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION,
- CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, core,
- CoreAdminParams.DELETE_INSTANCE_DIR, "true",
- CoreAdminParams.DELETE_DATA_DIR, "true");
-
- ShardRequest sreq = new ShardRequest();
- sreq.purpose = 1;
- sreq.shards = new String[]{baseUrl};
- sreq.actualShards = sreq.shards;
- sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
- try {
- shardHandler.submit(sreq, baseUrl, sreq.params);
- } catch (Exception e) {
- log.warn("Exception trying to unload core " + sreq, e);
- }
-
- collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results,
- false, null, shardHandler);
-
- if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000))
- return;//check if the core unload removed the corenode zk enry
- deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
- if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
-
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
+ shardHandler.submit(sreq, baseUrl, sreq.params);
+ } catch (Exception e) {
+ log.warn("Exception trying to unload core " + sreq, e);
}
+
+ collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results, false, null,
+ shardHandler);
+
+ if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;// check if the core unload removed the
+ // corenode zk enry
+ deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
+ if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
+
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+
}
private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
@@ -1155,42 +1153,33 @@ public class OverseerCollectionProcessor
private void createAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr(NAME);
String collections = message.getStr("collections");
-
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setCollection(aliasName);
-
+
+ Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
+ Map<String,String> newCollectionAliasesMap = new HashMap<>();
+ Map<String,String> prevColAliases = aliases.getCollectionAliasMap();
+ if (prevColAliases != null) {
+ newCollectionAliasesMap.putAll(prevColAliases);
+ }
+ newCollectionAliasesMap.put(aliasName, collections);
+ newAliasesMap.put("collection", newCollectionAliasesMap);
+ Aliases newAliases = new Aliases(newAliasesMap);
+ byte[] jsonBytes = null;
+ if (newAliases.collectionAliasSize() > 0) { // only sub map right now
+ jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap());
+ }
try {
- Map<String, Map<String, String>> newAliasesMap = new HashMap<>();
- Map<String, String> newCollectionAliasesMap = new HashMap<>();
- Map<String, String> prevColAliases = aliases.getCollectionAliasMap();
- if (prevColAliases != null) {
- newCollectionAliasesMap.putAll(prevColAliases);
- }
- newCollectionAliasesMap.put(aliasName, collections);
- newAliasesMap.put("collection", newCollectionAliasesMap);
- Aliases newAliases = new Aliases(newAliasesMap);
- byte[] jsonBytes = null;
- if (newAliases.collectionAliasSize() > 0) { // only sub map right now
- jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap());
- }
- try {
- zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
- jsonBytes, true);
-
- checkForAlias(aliasName, collections);
- // some fudge for other nodes
- Thread.sleep(100);
- } catch (KeeperException e) {
- log.error("", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- log.warn("", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
+ zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
+
+ checkForAlias(aliasName, collections);
+ // some fudge for other nodes
+ Thread.sleep(100);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ log.warn("", e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
-
}
private void checkForAlias(String name, String value) {
@@ -1233,8 +1222,6 @@ public class OverseerCollectionProcessor
private void deleteAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr(NAME);
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setCollection(aliasName);
Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
Map<String,String> newCollectionAliasesMap = new HashMap<>();
@@ -1258,282 +1245,393 @@ public class OverseerCollectionProcessor
} catch (InterruptedException e) {
log.warn("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
- }
+ }
}
private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws KeeperException, InterruptedException {
- Map previousMDCContext = MDC.getCopyOfContextMap();
String collectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
-
- MDCUtils.setMDC(collectionName, sliceName, null, null);
- try {
- log.info("Create shard invoked: {}", message);
- if (collectionName == null || sliceName == null)
- throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
- int numSlices = 1;
-
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
- DocCollection collection = clusterState.getCollection(collectionName);
- int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
- String createNodeSetStr = message.getStr(CREATE_NODE_SET);
- List<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, sliceName, repFactor,
- createNodeSetStr, overseer.getZkController().getCoreContainer());
-
- Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
- // wait for a while until we see the shard
- long waitUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
- boolean created = false;
- while (System.nanoTime() < waitUntil) {
- Thread.sleep(100);
- created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
- if (created) break;
- }
- if (!created)
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
-
-
- String configName = message.getStr(COLL_CONF);
- for (int j = 1; j <= repFactor; j++) {
- String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
- String shardName = collectionName + "_" + sliceName + "_replica" + j;
- log.info("Creating shard " + shardName + " as part of slice "
- + sliceName + " of collection " + collectionName + " on "
- + nodeName);
-
- // Need to create new params for each request
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-
- params.set(CoreAdminParams.NAME, shardName);
- params.set(COLL_CONF, configName);
- params.set(CoreAdminParams.COLLECTION, collectionName);
- params.set(CoreAdminParams.SHARD, sliceName);
- params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
- addPropertyParams(message, params);
-
- ShardRequest sreq = new ShardRequest();
- params.set("qt", adminPath);
- sreq.purpose = 1;
- String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
- sreq.shards = new String[]{replica};
- sreq.actualShards = sreq.shards;
- sreq.params = params;
-
- shardHandler.submit(sreq, replica, sreq.params);
-
- }
-
- processResponses(results, shardHandler);
-
- log.info("Finished create command on all shards for collection: "
- + collectionName);
-
- return true;
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
+
+ log.info("Create shard invoked: {}", message);
+ if (collectionName == null || sliceName == null)
+ throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
+ int numSlices = 1;
+
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+ String createNodeSetStr = message.getStr(CREATE_NODE_SET);
+ List<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, sliceName, repFactor,
+ createNodeSetStr, overseer.getZkController().getCoreContainer());
+
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
+ // wait for a while until we see the shard
+ long waitUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ boolean created = false;
+ while (System.nanoTime() < waitUntil) {
+ Thread.sleep(100);
+ created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
+ if (created) break;
+ }
+ if (!created)
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
+
+ String configName = message.getStr(COLL_CONF);
+ for (int j = 1; j <= repFactor; j++) {
+ String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
+ String shardName = collectionName + "_" + sliceName + "_replica" + j;
+ log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
+ + " on " + nodeName);
+
+ // Need to create new params for each request
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, shardName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collectionName);
+ params.set(CoreAdminParams.SHARD, sliceName);
+ params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+ addPropertyParams(message, params);
+
+ ShardRequest sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+ sreq.shards = new String[] {replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+
}
+
+ processResponses(results, shardHandler);
+
+ log.info("Finished create command on all shards for collection: " + collectionName);
+
+ return true;
}
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
String collectionName = message.getStr("collection");
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setMDC(collectionName, slice, null, null);
- try {
- log.info("Split shard invoked");
- String splitKey = message.getStr("split.key");
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
- DocCollection collection = clusterState.getCollection(collectionName);
- DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
-
- Slice parentSlice = null;
-
- if (slice == null) {
- if (router instanceof CompositeIdRouter) {
- Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
- if (searchSlices.isEmpty()) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
- }
- if (searchSlices.size() > 1) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
- }
- parentSlice = searchSlices.iterator().next();
- slice = parentSlice.getName();
- log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
- } else {
+
+ log.info("Split shard invoked");
+ String splitKey = message.getStr("split.key");
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ DocCollection collection = clusterState.getCollection(collectionName);
+ DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+
+ Slice parentSlice = null;
+
+ if (slice == null) {
+ if (router instanceof CompositeIdRouter) {
+ Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
+ if (searchSlices.isEmpty()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
+ }
+ if (searchSlices.size() > 1) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName());
+ "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
}
+ parentSlice = searchSlices.iterator().next();
+ slice = parentSlice.getName();
+ log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
} else {
- parentSlice = clusterState.getSlice(collectionName, slice);
- }
-
- if (parentSlice == null) {
- if (clusterState.hasCollection(collectionName)) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
- }
- }
-
- // find the leader for the shard
- Replica parentShardLeader = null;
- try {
- parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
+ + router.getClass().getName());
}
-
- DocRouter.Range range = parentSlice.getRange();
- if (range == null) {
- range = new PlainIdRouter().fullRange();
+ } else {
+ parentSlice = clusterState.getSlice(collectionName, slice);
+ }
+
+ if (parentSlice == null) {
+ if (clusterState.hasCollection(collectionName)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "No collection with the specified name exists: " + collectionName);
}
-
- List<DocRouter.Range> subRanges = null;
- String rangesStr = message.getStr(CoreAdminParams.RANGES);
- if (rangesStr != null) {
- String[] ranges = rangesStr.split(",");
- if (ranges.length == 0 || ranges.length == 1) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
- } else {
- subRanges = new ArrayList<>(ranges.length);
- for (int i = 0; i < ranges.length; i++) {
- String r = ranges[i];
- try {
- subRanges.add(DocRouter.DEFAULT.fromString(r));
- } catch (Exception e) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
- }
- if (!subRanges.get(i).isSubsetOf(range)) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
- }
+ }
+
+ // find the leader for the shard
+ Replica parentShardLeader = null;
+ try {
+ parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ DocRouter.Range range = parentSlice.getRange();
+ if (range == null) {
+ range = new PlainIdRouter().fullRange();
+ }
+
+ List<DocRouter.Range> subRanges = null;
+ String rangesStr = message.getStr(CoreAdminParams.RANGES);
+ if (rangesStr != null) {
+ String[] ranges = rangesStr.split(",");
+ if (ranges.length == 0 || ranges.length == 1) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
+ } else {
+ subRanges = new ArrayList<>(ranges.length);
+ for (int i = 0; i < ranges.length; i++) {
+ String r = ranges[i];
+ try {
+ subRanges.add(DocRouter.DEFAULT.fromString(r));
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
}
- List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
- Collections.sort(temp);
- if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
+ if (!subRanges.get(i).isSubsetOf(range)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
- }
- for (int i = 1; i < temp.size(); i++) {
- if (temp.get(i - 1).max + 1 != temp.get(i).min) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "Specified hash ranges: " + rangesStr + " either overlap with each other or " +
- "do not cover the entire range of parent shard: " + range);
- }
+ "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
}
}
- } else if (splitKey != null) {
- if (router instanceof CompositeIdRouter) {
- CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
- subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
- if (subRanges.size() == 1) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice);
- }
- for (DocRouter.Range subRange : subRanges) {
- if (subRange.min == subRange.max) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
- }
- }
- log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
- rangesStr = "";
- for (int i = 0; i < subRanges.size(); i++) {
- DocRouter.Range subRange = subRanges.get(i);
- rangesStr += subRange.toString();
- if (i < subRanges.size() - 1)
- rangesStr += ',';
+ List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
+ Collections.sort(temp);
+ if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
+ }
+ for (int i = 1; i < temp.size(); i++) {
+ if (temp.get(i - 1).max + 1 != temp.get(i).min) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
+ + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
}
}
- } else {
- // todo: fixed to two partitions?
- subRanges = router.partitionRange(2, range);
}
-
- try {
- List<String> subSlices = new ArrayList<>(subRanges.size());
- List<String> subShardNames = new ArrayList<>(subRanges.size());
- String nodeName = parentShardLeader.getNodeName();
+ } else if (splitKey != null) {
+ if (router instanceof CompositeIdRouter) {
+ CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
+ subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
+ if (subRanges.size() == 1) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
+ + " has a hash range that is exactly equal to hash range of shard: " + slice);
+ }
+ for (DocRouter.Range subRange : subRanges) {
+ if (subRange.min == subRange.max) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
+ }
+ }
+ log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
+ rangesStr = "";
for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = slice + "_" + i;
- subSlices.add(subSlice);
- String subShardName = collectionName + "_" + subSlice + "_replica1";
- subShardNames.add(subShardName);
-
- Slice oSlice = clusterState.getSlice(collectionName, subSlice);
- if (oSlice != null) {
- final Slice.State state = oSlice.getState();
- if (state == Slice.State.ACTIVE) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
- } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
- // delete the shards
- for (String sub : subSlices) {
- log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, sub);
- ZkNodeProps m = new ZkNodeProps(propMap);
- try {
- deleteShard(clusterState, m, new NamedList());
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e);
- }
+ DocRouter.Range subRange = subRanges.get(i);
+ rangesStr += subRange.toString();
+ if (i < subRanges.size() - 1) rangesStr += ',';
+ }
+ }
+ } else {
+ // todo: fixed to two partitions?
+ subRanges = router.partitionRange(2, range);
+ }
+
+ try {
+ List<String> subSlices = new ArrayList<>(subRanges.size());
+ List<String> subShardNames = new ArrayList<>(subRanges.size());
+ String nodeName = parentShardLeader.getNodeName();
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = slice + "_" + i;
+ subSlices.add(subSlice);
+ String subShardName = collectionName + "_" + subSlice + "_replica1";
+ subShardNames.add(subShardName);
+
+ Slice oSlice = clusterState.getSlice(collectionName, subSlice);
+ if (oSlice != null) {
+ final Slice.State state = oSlice.getState();
+ if (state == Slice.State.ACTIVE) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
+ } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
+ // delete the shards
+ for (String sub : subSlices) {
+ log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
+ Map<String,Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, sub);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ try {
+ deleteShard(clusterState, m, new NamedList());
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub,
+ e);
}
}
}
}
-
- // do not abort splitshard if the unloading fails
- // this can happen because the replicas created previously may be down
- // the only side effect of this is that the sub shard may end up having more replicas than we want
- collectShardResponses(results, false, null, shardHandler);
-
- String asyncId = message.getStr(ASYNC);
- HashMap<String, String> requestMap = new HashMap<String, String>();
-
- for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = subSlices.get(i);
- String subShardName = subShardNames.get(i);
- DocRouter.Range subRange = subRanges.get(i);
-
- log.info("Creating slice "
- + subSlice + " of collection " + collectionName + " on "
- + nodeName);
-
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
- propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
- propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
- propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
- DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
- inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap)));
-
- // wait until we are able to see the new shard in cluster state
- waitForNewShard(collectionName, subSlice);
-
- // refresh cluster state
- clusterState = zkStateReader.getClusterState();
-
- log.info("Adding replica " + subShardName + " as part of slice "
- + subSlice + " of collection " + collectionName + " on "
- + nodeName);
- propMap = new HashMap<>();
+ }
+
+ // do not abort splitshard if the unloading fails
+ // this can happen because the replicas created previously may be down
+ // the only side effect of this is that the sub shard may end up having more replicas than we want
+ collectShardResponses(results, false, null, shardHandler);
+
+ String asyncId = message.getStr(ASYNC);
+ HashMap<String,String> requestMap = new HashMap<String,String>();
+
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = subSlices.get(i);
+ String subShardName = subShardNames.get(i);
+ DocRouter.Range subRange = subRanges.get(i);
+
+ log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
+
+ Map<String,Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
+ propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
+ propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
+ propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
+ propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
+ DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+ inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap)));
+
+ // wait until we are able to see the new shard in cluster state
+ waitForNewShard(collectionName, subSlice);
+
+ // refresh cluster state
+ clusterState = zkStateReader.getClusterState();
+
+ log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
+ + " on " + nodeName);
+ propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, subSlice);
+ propMap.put("node", nodeName);
+ propMap.put(CoreAdminParams.NAME, subShardName);
+ // copy over property params:
+ for (String key : message.keySet()) {
+ if (key.startsWith(COLL_PROP_PREFIX)) {
+ propMap.put(key, message.getStr(key));
+ }
+ }
+ // add async param
+ if (asyncId != null) {
+ propMap.put(ASYNC, asyncId);
+ }
+ addReplica(clusterState, new ZkNodeProps(propMap), results);
+ }
+
+ collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler);
+
+ completeAsyncRequest(asyncId, requestMap, results);
+
+ for (String subShardName : subShardNames) {
+ // wait for parent leader to acknowledge the sub-shard core
+ log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+ String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
+ CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(subShardName);
+ cmd.setNodeName(nodeName);
+ cmd.setCoreNodeName(coreNodeName);
+ cmd.setState(Replica.State.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+
+ ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+ sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
+ }
+
+ collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
+ shardHandler);
+
+ completeAsyncRequest(asyncId, requestMap, results);
+
+ log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
+ + " on: " + parentShardLeader);
+
+ log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
+ + collectionName + " on " + parentShardLeader);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
+ params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
+ for (int i = 0; i < subShardNames.size(); i++) {
+ String subShardName = subShardNames.get(i);
+ params.add(CoreAdminParams.TARGET_CORE, subShardName);
+ }
+ params.set(CoreAdminParams.RANGES, rangesStr);
+
+ sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+ collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler);
+ completeAsyncRequest(asyncId, requestMap, results);
+
+ log.info("Index on shard: " + nodeName + " split into two successfully");
+
+ // apply buffered updates on sub-shards
+ for (int i = 0; i < subShardNames.size(); i++) {
+ String subShardName = subShardNames.get(i);
+
+ log.info("Applying buffered updates on : " + subShardName);
+
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+ params.set(CoreAdminParams.NAME, subShardName);
+
+ sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
+ }
+
+ collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
+ shardHandler);
+
+ completeAsyncRequest(asyncId, requestMap, results);
+
+ log.info("Successfully applied buffered updates on : " + subShardNames);
+
+ // Replica creation for the new Slices
+
+ // look at the replication factor and see if it matches reality
+ // if it does not, find best nodes to create more cores
+
+ // TODO: Have replication factor decided in some other way instead of numShards for the parent
+
+ int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
+
+ // we need to look at every node and see how many cores it serves
+ // add our new cores to existing nodes serving the least number of cores
+ // but (for now) require that each core goes on a distinct node.
+
+ // TODO: add smarter options that look at the current number of cores per
+ // node?
+ // for now we just go random
+ Set<String> nodes = clusterState.getLiveNodes();
+ List<String> nodeList = new ArrayList<>(nodes.size());
+ nodeList.addAll(nodes);
+
+ Collections.shuffle(nodeList, RANDOM);
+
+ // TODO: Have maxShardsPerNode param for this operation?
+
+ // Remove the node that hosts the parent shard for replica creation.
+ nodeList.remove(nodeName);
+
+ // TODO: change this to handle sharding a slice into > 2 sub-shards.
+
+ for (int i = 1; i <= subSlices.size(); i++) {
+ Collections.shuffle(nodeList, RANDOM);
+ String sliceName = subSlices.get(i - 1);
+ for (int j = 2; j <= repFactor; j++) {
+ String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
+ String shardName = collectionName + "_" + sliceName + "_replica" + (j);
+
+ log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
+ + collectionName + " on " + subShardNodeName);
+
+ HashMap<String,Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, subSlice);
- propMap.put("node", nodeName);
- propMap.put(CoreAdminParams.NAME, subShardName);
+ propMap.put(SHARD_ID_PROP, sliceName);
+ propMap.put("node", subShardNodeName);
+ propMap.put(CoreAdminParams.NAME, shardName);
// copy over property params:
for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) {
@@ -1541,203 +1639,69 @@ public class OverseerCollectionProcessor
}
}
// add async param
- if(asyncId != null) {
+ if (asyncId != null) {
propMap.put(ASYNC, asyncId);
}
addReplica(clusterState, new ZkNodeProps(propMap), results);
- }
-
- collectShardResponses(results, true,
- "SPLITSHARD failed to create subshard leaders", shardHandler);
-
- completeAsyncRequest(asyncId, requestMap, results);
-
- for (String subShardName : subShardNames) {
- // wait for parent leader to acknowledge the sub-shard core
- log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
- String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
+
+ String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
+ // wait for the replicas to be seen as active on sub shard leader
+ log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
- cmd.setCoreName(subShardName);
- cmd.setNodeName(nodeName);
+ cmd.setCoreName(subShardNames.get(i - 1));
+ cmd.setNodeName(subShardNodeName);
cmd.setCoreNodeName(coreNodeName);
- cmd.setState(Replica.State.ACTIVE);
+ cmd.setState(Replica.State.RECOVERING);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
-
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+
sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
+
}
-
- collectShardResponses(results, true,
- "SPLITSHARD timed out waiting for subshard leaders to come up", shardHandler);
-
- completeAsyncRequest(asyncId, requestMap, results);
-
- log.info("Successfully created all sub-shards for collection "
- + collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
-
- log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice "
- + slice + " of collection " + collectionName + " on "
- + parentShardLeader);
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
- params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
- for (int i = 0; i < subShardNames.size(); i++) {
- String subShardName = subShardNames.get(i);
- params.add(CoreAdminParams.TARGET_CORE, subShardName);
- }
- params.set(CoreAdminParams.RANGES, rangesStr);
-
- sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
- collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command",
- shardHandler);
- completeAsyncRequest(asyncId, requestMap, results);
-
- log.info("Index on shard: " + nodeName + " split into two successfully");
-
- // apply buffered updates on sub-shards
- for (int i = 0; i < subShardNames.size(); i++) {
- String subShardName = subShardNames.get(i);
-
- log.info("Applying buffered updates on : " + subShardName);
-
- params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
- params.set(CoreAdminParams.NAME, subShardName);
-
- sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
- }
-
- collectShardResponses(results, true,
- "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
- shardHandler);
-
- completeAsyncRequest(asyncId, requestMap, results);
-
- log.info("Successfully applied buffered updates on : " + subShardNames);
-
- // Replica creation for the new Slices
-
- // look at the replication factor and see if it matches reality
- // if it does not, find best nodes to create more cores
-
- // TODO: Have replication factor decided in some other way instead of numShards for the parent
-
- int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
-
- // we need to look at every node and see how many cores it serves
- // add our new cores to existing nodes serving the least number of cores
- // but (for now) require that each core goes on a distinct node.
-
- // TODO: add smarter options that look at the current number of cores per
- // node?
- // for now we just go random
- Set<String> nodes = clusterState.getLiveNodes();
- List<String> nodeList = new ArrayList<>(nodes.size());
- nodeList.addAll(nodes);
-
- Collections.shuffle(nodeList, RANDOM);
-
- // TODO: Have maxShardsPerNode param for this operation?
-
- // Remove the node that hosts the parent shard for replica creation.
- nodeList.remove(nodeName);
-
- // TODO: change this to handle sharding a slice into > 2 sub-shards.
-
- for (int i = 1; i <= subSlices.size(); i++) {
- Collections.shuffle(nodeList, RANDOM);
- String sliceName = subSlices.get(i - 1);
- for (int j = 2; j <= repFactor; j++) {
- String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
- String shardName = collectionName + "_" + sliceName + "_replica" + (j);
-
- log.info("Creating replica shard " + shardName + " as part of slice "
- + sliceName + " of collection " + collectionName + " on "
- + subShardNodeName);
-
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, sliceName);
- propMap.put("node", subShardNodeName);
- propMap.put(CoreAdminParams.NAME, shardName);
- // copy over property params:
- for (String key : message.keySet()) {
- if (key.startsWith(COLL_PROP_PREFIX)) {
- propMap.put(key, message.getStr(key));
- }
- }
- // add async param
- if (asyncId != null) {
- propMap.put(ASYNC, asyncId);
- }
- addReplica(clusterState, new ZkNodeProps(propMap), results);
-
- String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
- // wait for the replicas to be seen as active on sub shard leader
- log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
- CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
- cmd.setCoreName(subShardNames.get(i - 1));
- cmd.setNodeName(subShardNodeName);
- cmd.setCoreNodeName(coreNodeName);
- cmd.setState(Replica.State.RECOVERING);
- cmd.setCheckLive(true);
- cmd.setOnlyIfLeader(true);
- ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
-
- sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
-
- }
- }
-
- collectShardResponses(results, true,
- "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up",
- shardHandler);
-
- completeAsyncRequest(asyncId, requestMap, results);
-
- log.info("Successfully created all replica shards for all sub-slices " + subSlices);
-
- commit(results, slice, parentShardLeader);
-
- if (repFactor == 1) {
- // switch sub shard states to 'active'
- log.info("Replication factor is 1 so switching shard states");
- DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- propMap.put(slice, Slice.State.INACTIVE.toString());
- for (String subSlice : subSlices) {
- propMap.put(subSlice, Slice.State.ACTIVE.toString());
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(ZkStateReader.toJSON(m));
- } else {
- log.info("Requesting shard state be set to 'recovery'");
- DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- for (String subSlice : subSlices) {
- propMap.put(subSlice, Slice.State.RECOVERY.toString());
- }
- propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(ZkStateReader.toJSON(m));
- }
-
- return true;
- } catch (SolrException e) {
- throw e;
- } catch (Exception e) {
- log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
- throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
+
+ collectShardResponses(results, true,
+ "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up", shardHandler);
+
+ completeAsyncRequest(asyncId, requestMap, results);
+
+ log.info("Successfully created all replica shards for all sub-slices " + subSlices);
+
+ commit(results, slice, parentShardLeader);
+
+ if (repFactor == 1) {
+ // switch sub shard states to 'active'
+ log.info("Replication factor is 1 so switching shard states");
+ DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+ Map<String,Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+ propMap.put(slice, Slice.State.INACTIVE.toString());
+ for (String subSlice : subSlices) {
+ propMap.put(subSlice, Slice.State.ACTIVE.toString());
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(ZkStateReader.toJSON(m));
+ } else {
+ log.info("Requesting shard state be set to 'recovery'");
+ DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+ Map<String,Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+ for (String subSlice : subSlices) {
+ propMap.put(subSlice, Slice.State.RECOVERY.toString());
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(ZkStateReader.toJSON(m));
+ }
+
+ return true;
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
}
@@ -1846,71 +1810,64 @@ public class OverseerCollectionProcessor
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setMDC(collection, sliceId, null, null);
- try {
- log.info("Delete shard invoked");
- Slice slice = clusterState.getSlice(collection, sliceId);
-
- if (slice == null) {
- if (clusterState.hasCollection(collection)) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "No shard with name " + sliceId + " exists for collection " + collection);
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "No collection with the specified name exists: " + collection);
- }
- }
- // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
- // TODO: Add check for range gaps on Slice deletion
- final Slice.State state = slice.getState();
- if (!(slice.getRange() == null || state == Slice.State.INACTIVE
- || state == Slice.State.RECOVERY || state == Slice.State.CONSTRUCTION)) {
+
+ log.info("Delete shard invoked");
+ Slice slice = clusterState.getSlice(collection, sliceId);
+
+ if (slice == null) {
+ if (clusterState.hasCollection(collection)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- "The slice: " + slice.getName() + " is currently "
- + state + ". Only non-active (or custom-hashed) slices can be deleted.");
+ "No shard with name " + sliceId + " exists for collection " + collection);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
}
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
- try {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
- params.set(CoreAdminParams.DELETE_INDEX, "true");
- sliceCmd(clusterState, params, null, slice, shardHandler);
-
- processResponses(results, shardHandler);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, collection,
- ZkStateReader.SHARD_ID_PROP, sliceId);
- Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
-
- // wait for a while until we don't see the shard
- long now = System.nanoTime();
- long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
- boolean removed = false;
- while (System.nanoTime() < timeout) {
- Thread.sleep(100);
- removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null;
- if (removed) {
- Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
- break;
- }
- }
- if (!removed) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not fully remove collection: " + collection + " shard: " + sliceId);
+ }
+ // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+ // TODO: Add check for range gaps on Slice deletion
+ final Slice.State state = slice.getState();
+ if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
+ || state == Slice.State.CONSTRUCTION)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
+ + ". Only non-active (or custom-hashed) slices can be deleted.");
+ }
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.DELETE_INDEX, "true");
+ sliceCmd(clusterState, params, null, slice, shardHandler);
+
+ processResponses(results, shardHandler);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
+ collection, ZkStateReader.SHARD_ID_PROP, sliceId);
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+ // wait for a while until we don't see the shard
+ long now = System.nanoTime();
+ long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ boolean removed = false;
+ while (System.nanoTime() < timeout) {
+ Thread.sleep(100);
+ removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null;
+ if (removed) {
+ Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+ break;
}
-
- log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId);
-
- } catch (SolrException e) {
- throw e;
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e);
}
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
+ if (!removed) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not fully remove collection: " + collection + " shard: " + sliceId);
+ }
+
+ log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId);
+
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e);
}
}
@@ -2505,110 +2462,101 @@ public class OverseerCollectionProcessor
}
}
- private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+ private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
+ throws KeeperException, InterruptedException {
String collection = message.getStr(COLLECTION_PROP);
String node = message.getStr("node");
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setMDC(collection, shard, null, coreName);
- try {
- String asyncId = message.getStr("async");
-
- DocCollection coll = clusterState.getCollection(collection);
- if (coll == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
- }
- if (coll.getSlice(shard) == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "Collection: " + collection + " shard: " + shard + " does not exist");
- }
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
- if (node == null) {
-
- node = getNodesForNewShard(clusterState, collection, shard, 1,
- null, overseer.getZkController().getCoreContainer()).get(0).nodeName;
- log.info("Node not provided, Identified {} for creating new replica", node);
- }
-
-
- if (!clusterState.liveNodesContain(node)) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
- }
- if (coreName == null) {
- // assign a name to this core
- Slice slice = coll.getSlice(shard);
- int replicaNum = slice.getReplicas().size();
- for (; ; ) {
- String replicaName = collection + "_" + shard + "_replica" + replicaNum;
- boolean exists = false;
- for (Replica replica : slice.getReplicas()) {
- if (replicaName.equals(replica.getStr("core"))) {
- exists = true;
- break;
- }
+
+ String asyncId = message.getStr("async");
+
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ if (coll.getSlice(shard) == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " shard: " + shard + " does not exist");
+ }
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+ if (node == null) {
+
+ node = getNodesForNewShard(clusterState, collection, shard, 1, null,
+ overseer.getZkController().getCoreContainer()).get(0).nodeName;
+ log.info("Node not provided, Identified {} for creating new replica", node);
+ }
+
+ if (!clusterState.liveNodesContain(node)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+ }
+ if (coreName == null) {
+ // assign a name to this core
+ Slice slice = coll.getSlice(shard);
+ int replicaNum = slice.getReplicas().size();
+ for (;;) {
+ String replicaName = collection + "_" + shard + "_replica" + replicaNum;
+ boolean exists = false;
+ for (Replica replica : slice.getReplicas()) {
+ if (replicaName.equals(replica.getStr("core"))) {
+ exists = true;
+ break;
}
- if (exists) replicaNum++;
- else break;
- }
- coreName = collection + "_" + shard + "_replica" + replicaNum;
- }
- ModifiableSolrParams params = new ModifiableSolrParams();
-
- if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
- ZkNodeProps props = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
- ZkStateReader.COLLECTION_PROP, collection,
- ZkStateReader.SHARD_ID_PROP, shard,
- ZkStateReader.CORE_NAME_PROP, coreName,
- ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
- ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node));
- Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
- params.set(CoreAdminParams.CORE_NODE_NAME, waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
- }
-
-
- String configName = zkStateReader.readConfigName(collection);
- String routeKey = message.getStr(ShardParams._ROUTE_);
- String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
- String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
-
- params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
- params.set(CoreAdminParams.NAME, coreName);
- params.set(COLL_CONF, configName);
- params.set(CoreAdminParams.COLLECTION, collection);
- if (shard != null) {
- params.set(CoreAdminParams.SHARD, shard);
- } else if (routeKey != null) {
- Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
- if (slices.isEmpty()) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
- } else {
- params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
}
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
- }
- if (dataDir != null) {
- params.set(CoreAdminParams.DATA_DIR, dataDir);
+ if (exists) replicaNum++;
+ else break;
}
- if (instanceDir != null) {
- params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
+ coreName = collection + "_" + shard + "_replica" + replicaNum;
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
+ ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP,
+ collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
+ zkStateReader.getBaseUrlForNodeName(node));
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
+ params.set(CoreAdminParams.CORE_NODE_NAME,
+ waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
+ }
+
+ String configName = zkStateReader.readConfigName(collection);
+ String routeKey = message.getStr(ShardParams._ROUTE_);
+ String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
+ String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
+
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+ params.set(CoreAdminParams.NAME, coreName);
+ params.set(COLL_CONF, configName);
+ params.set(CoreAdminParams.COLLECTION, collection);
+ if (shard != null) {
+ params.set(CoreAdminParams.SHARD, shard);
+ } else if (routeKey != null) {
+ Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
+ if (slices.isEmpty()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
+ } else {
+ params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
}
- addPropertyParams(message, params);
-
- // For tracking async calls.
- HashMap<String, String> requestMap = new HashMap<>();
- sendShardRequest(node, params, shardHandler, asyncId, requestMap);
-
- collectShardResponses(results, true,
- "ADDREPLICA failed to create replica", shardHandler);
-
- completeAsyncRequest(asyncId, requestMap, results);
- } finally {
- MDCUtils.cleanupMDC(previousMDCContext);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
+ }
+ if (dataDir != null) {
+ params.set(CoreAdminParams.DATA_DIR, dataDir);
+ }
+ if (instanceDir != null) {
+ params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
}
+ addPropertyParams(message, params);
+
+ // For tracking async calls.
+ HashMap<String,String> requestMap = new HashMap<>();
+ sendShardRequest(node, params, shardHandler, asyncId, requestMap);
+
+ collectShardResponses(results, true, "ADDREPLICA failed to create replica", shardHandler);
+
+ completeAsyncRequest(asyncId, requestMap, results);
}
private void processResponses(NamedList results, ShardHandler shardHandler) {
@@ -2867,8 +2815,7 @@ public class OverseerCollectionProcessor
String asyncId = message.getStr(ASYNC);
String collectionName = message.containsKey(COLLECTION_PROP) ?
message.getStr(COLLECTION_PROP) : message.getStr(NAME);
- Map previousMDCContext = MDC.getCopyOfContextMap();
- MDCUtils.setCollection(collectionName);
+
try {
try {
log.debug("Runner processing {}", head.getId());
@@ -2913,7 +2860,6 @@ public class OverseerCollectionProcessor
synchronized (waitLock){
waitLock.notifyAll();
}
- MDCUtils.cleanupMDC(previousMDCContext);
}
}
Re: svn commit: r1682031 [1/2] - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/logging/ solr/...
Posted by Steve Rowe <sa...@gmail.com>.
Java7 compilation borked?:
[javac] Compiling 807 source files to /var/lib/jenkins/jobs/Solr-core-tests-5x-Java7/workspace/solr/build/solr-core/classes/java
[javac] /var/lib/jenkins/jobs/Solr-core-tests-5x-Java7/workspace/solr/core/src/java/org/apache/solr/logging/MDCLoggingContext.java:26: error: package java.util.function does not exist
[javac] import java.util.function.Supplier;
[javac] ^
[javac] /var/lib/jenkins/jobs/Solr-core-tests-5x-Java7/workspace/solr/core/src/java/org/apache/solr/logging/MDCLoggingContext.java:42: error: cannot find symbol
[javac] private static ThreadLocal<Integer> CALL_DEPTH = ThreadLocal.withInitial(new Supplier<Integer>() {
[javac] ^
[javac] symbol: class Supplier
[javac] location: class MDCLoggingContext
[javac] Note: Some input files use or override a deprecated API.
[javac] Note: Recompile with -Xlint:deprecation for details.
[javac] Note: Some input files use unchecked or unsafe operations.
[javac] Note: Recompile with -Xlint:unchecked for details.
[javac] 2 errors
Steve
> On May 27, 2015, at 9:58 AM, markrmiller@apache.org wrote:
>
> Author: markrmiller
> Date: Wed May 27 13:58:30 2015
> New Revision: 1682031
>
> URL: http://svn.apache.org/r1682031
> Log:
> SOLR-7590: Finish and improve MDC context logging support.
>
> Added:
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/logging/MDCLoggingContext.java
> - copied unchanged from r1682020, lucene/dev/trunk/solr/core/src/java/org/apache/solr/logging/MDCLoggingContext.java
> Removed:
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/logging/MDCUtils.java
> Modified:
> lucene/dev/branches/branch_5x/ (props changed)
> lucene/dev/branches/branch_5x/solr/ (props changed)
> lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
> lucene/dev/branches/branch_5x/solr/core/ (props changed)
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/SolrCore.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/SolrCores.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/ZkContainer.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
> lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/PeerSync.java
> lucene/dev/branches/branch_5x/solr/core/src/test-files/log4j.properties
> lucene/dev/branches/branch_5x/solr/server/ (props changed)
> lucene/dev/branches/branch_5x/solr/server/resources/log4j.properties
> lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
> lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
> lucene/dev/branches/branch_5x/solr/solrj/src/test-files/log4j.properties
> lucene/dev/branches/branch_5x/solr/test-framework/ (props changed)
> lucene/dev/branches/branch_5x/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
> lucene/dev/branches/branch_5x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
>
> Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1682031&r1=1682030&r2=1682031&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
> +++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Wed May 27 13:58:30 2015
> @@ -45,6 +45,8 @@ Other Changes
> * SOLR-7146: MiniSolrCloudCluster based tests can fail with ZooKeeperException NoNode for /live_nodes.
> (Vamsee Yarlagadda via shalin)
>
> +* SOLR-7590: Finish and improve MDC context logging support. (Mark Miller)
> +
> ================== 5.2.0 ==================
>
> Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release
>
> Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1682031&r1=1682030&r2=1682031&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
> +++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed May 27 13:58:30 2015
> @@ -1,5 +1,11 @@
> package org.apache.solr.cloud;
>
> +import java.io.Closeable;
> +import java.io.IOException;
> +import java.util.List;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.TimeUnit;
> +
> import org.apache.lucene.search.MatchAllDocsQuery;
> import org.apache.solr.cloud.overseer.OverseerAction;
> import org.apache.solr.common.SolrException;
> @@ -15,7 +21,7 @@ import org.apache.solr.common.util.Retry
> import org.apache.solr.common.util.RetryUtil.RetryCmd;
> import org.apache.solr.core.CoreContainer;
> import org.apache.solr.core.SolrCore;
> -import org.apache.solr.logging.MDCUtils;
> +import org.apache.solr.logging.MDCLoggingContext;
> import org.apache.solr.search.SolrIndexSearcher;
> import org.apache.solr.update.UpdateLog;
> import org.apache.solr.util.RefCounted;
> @@ -25,14 +31,6 @@ import org.apache.zookeeper.KeeperExcept
> import org.apache.zookeeper.KeeperException.NodeExistsException;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> -import org.slf4j.MDC;
> -
> -import java.io.Closeable;
> -import java.io.IOException;
> -import java.util.List;
> -import java.util.Map;
> -import java.util.concurrent.ExecutorService;
> -import java.util.concurrent.TimeUnit;
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> @@ -117,8 +115,6 @@ class ShardLeaderElectionContextBase ext
> this.shardId = shardId;
> this.collection = collection;
>
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setMDC(collection, shardId, null, null);
> try {
> new ZkCmdExecutor(zkStateReader.getZkClient().getZkClientTimeout())
> .ensureExists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection,
> @@ -128,8 +124,6 @@ class ShardLeaderElectionContextBase ext
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> throw new SolrException(ErrorCode.SERVER_ERROR, e);
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> }
> }
>
> @@ -203,158 +197,152 @@ final class ShardLeaderElectionContext e
> */
> @Override
> void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
> - InterruptedException, IOException {
> - log.info("Running the leader process for shard " + shardId);
> -
> + InterruptedException, IOException {
> String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
> ActionThrottle lt;
> try (SolrCore core = cc.getCore(coreName)) {
> -
> if (core == null) {
> cancelElection();
> - throw new SolrException(ErrorCode.SERVER_ERROR,
> - "SolrCore not found:" + coreName + " in "
> - + cc.getCoreNames());
> + throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
> }
> -
> + MDCLoggingContext.setCore(core);
> lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
> }
> -
> - lt.minimumWaitBetweenActions();
> - lt.markAttemptingAction();
> -
> - // clear the leader in clusterstate
> - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
> - ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
> - collection);
> - Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
> -
> - int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
> - if (!weAreReplacement) {
> - waitForReplicasToComeUp(leaderVoteWait);
> - }
>
> - try (SolrCore core = cc.getCore(coreName)) {
> -
> - if (core == null) {
> - cancelElection();
> - throw new SolrException(ErrorCode.SERVER_ERROR,
> - "SolrCore not found:" + coreName + " in "
> - + cc.getCoreNames());
> - }
> + try {
> + lt.minimumWaitBetweenActions();
> + lt.markAttemptingAction();
>
> - // should I be leader?
> - if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
> - rejoinLeaderElection(core);
> - return;
> + log.info("Running the leader process for shard " + shardId);
> + // clear the leader in clusterstate
> + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
> + ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
> + Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
> +
> + int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
> + if (!weAreReplacement) {
> + waitForReplicasToComeUp(leaderVoteWait);
> }
>
> - log.info("I may be the new leader - try and sync");
> -
> -
> - // we are going to attempt to be the leader
> - // first cancel any current recovery
> - core.getUpdateHandler().getSolrCoreState().cancelRecovery();
> -
> - if (weAreReplacement) {
> - // wait a moment for any floating updates to finish
> - try {
> - Thread.sleep(2500);
> - } catch (InterruptedException e) {
> - Thread.currentThread().interrupt();
> - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
> + try (SolrCore core = cc.getCore(coreName)) {
> +
> + if (core == null) {
> + cancelElection();
> + throw new SolrException(ErrorCode.SERVER_ERROR,
> + "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
> }
> - }
> -
> - boolean success = false;
> - try {
> - success = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
> - } catch (Exception e) {
> - SolrException.log(log, "Exception while trying to sync", e);
> - success = false;
> - }
> -
> - UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
> -
> - if (!success) {
> - boolean hasRecentUpdates = false;
> - if (ulog != null) {
> - // TODO: we could optimize this if necessary
> - UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
> +
> + // should I be leader?
> + if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
> + rejoinLeaderElection(core);
> + return;
> + }
> +
> + log.info("I may be the new leader - try and sync");
> +
> + // we are going to attempt to be the leader
> + // first cancel any current recovery
> + core.getUpdateHandler().getSolrCoreState().cancelRecovery();
> +
> + if (weAreReplacement) {
> + // wait a moment for any floating updates to finish
> try {
> - hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
> - } finally {
> - recentUpdates.close();
> + Thread.sleep(2500);
> + } catch (InterruptedException e) {
> + Thread.currentThread().interrupt();
> + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
> }
> }
> -
> - if (!hasRecentUpdates) {
> - // we failed sync, but we have no versions - we can't sync in that case
> - // - we were active
> - // before, so become leader anyway
> - log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
> - success = true;
> - }
> - }
> -
> - // solrcloud_debug
> - if (log.isDebugEnabled()) {
> +
> + boolean success = false;
> try {
> - RefCounted<SolrIndexSearcher> searchHolder = core
> - .getNewestSearcher(false);
> - SolrIndexSearcher searcher = searchHolder.get();
> + success = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
> + } catch (Exception e) {
> + SolrException.log(log, "Exception while trying to sync", e);
> + success = false;
> + }
> +
> + UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
> +
> + if (!success) {
> + boolean hasRecentUpdates = false;
> + if (ulog != null) {
> + // TODO: we could optimize this if necessary
> + UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
> + try {
> + hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
> + } finally {
> + recentUpdates.close();
> + }
> + }
> +
> + if (!hasRecentUpdates) {
> + // we failed sync, but we have no versions - we can't sync in that case
> + // - we were active
> + // before, so become leader anyway
> + log.info(
> + "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
> + success = true;
> + }
> + }
> +
> + // solrcloud_debug
> + if (log.isDebugEnabled()) {
> try {
> - log.debug(core.getCoreDescriptor().getCoreContainer()
> - .getZkController().getNodeName()
> - + " synched "
> - + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
> - } finally {
> - searchHolder.decref();
> + RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
> + SolrIndexSearcher searcher = searchHolder.get();
> + try {
> + log.debug(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " synched "
> + + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
> + } finally {
> + searchHolder.decref();
> + }
> + } catch (Exception e) {
> + log.error("Error in solrcloud_debug block", e);
> }
> - } catch (Exception e) {
> - log.error("Error in solrcloud_debug block", e);
> }
> - }
> - if (!success) {
> - rejoinLeaderElection(core);
> - return;
> - }
> -
> - log.info("I am the new leader: "
> - + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
> - core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
> - }
> -
> - boolean isLeader = true;
> - try {
> - super.runLeaderProcess(weAreReplacement, 0);
> - } catch (Exception e) {
> - isLeader = false;
> - SolrException.log(log, "There was a problem trying to register as the leader", e);
> -
> - try (SolrCore core = cc.getCore(coreName)) {
> -
> - if (core == null) {
> - log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
> + if (!success) {
> + rejoinLeaderElection(core);
> return;
> }
>
> - core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
> -
> - // we could not publish ourselves as leader - try and rejoin election
> - rejoinLeaderElection(core);
> + log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
> + core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
> }
> - }
> -
> - if (isLeader) {
> - // check for any replicas in my shard that were set to down by the previous leader
> +
> + boolean isLeader = true;
> try {
> - startLeaderInitiatedRecoveryOnReplicas(coreName);
> - } catch (Exception exc) {
> - // don't want leader election to fail because of
> - // an error trying to tell others to recover
> + super.runLeaderProcess(weAreReplacement, 0);
> + } catch (Exception e) {
> + isLeader = false;
> + SolrException.log(log, "There was a problem trying to register as the leader", e);
> +
> + try (SolrCore core = cc.getCore(coreName)) {
> +
> + if (core == null) {
> + log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
> + return;
> + }
> +
> + core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
> +
> + // we could not publish ourselves as leader - try and rejoin election
> + rejoinLeaderElection(core);
> + }
> }
> - }
> +
> + if (isLeader) {
> + // check for any replicas in my shard that were set to down by the previous leader
> + try {
> + startLeaderInitiatedRecoveryOnReplicas(coreName);
> + } catch (Exception exc) {
> + // don't want leader election to fail because of
> + // an error trying to tell others to recover
> + }
> + }
> + } finally {
> + MDCLoggingContext.clear();
> + }
> }
>
> private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
> @@ -493,8 +481,7 @@ final class ShardLeaderElectionContext e
> }
>
> private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
> - log.info("Checking if I (core={},coreNodeName={}) should try and be the leader.", core.getName(),
> - core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
> + log.info("Checking if I should try and be the leader.");
>
> if (isClosed) {
> log.info("Bailing on leader process because we have been closed");
> @@ -534,7 +521,7 @@ final class ShardLeaderElectionContext e
> }
>
> final class OverseerElectionContext extends ElectionContext {
> -
> + private static Logger log = LoggerFactory.getLogger(OverseerElectionContext.class);
> private final SolrZkClient zkClient;
> private Overseer overseer;
> public static final String PATH = "/overseer_elect";
>
> Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1682031&r1=1682030&r2=1682031&view=diff
> ==============================================================================
> --- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
> +++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Wed May 27 13:58:30 2015
> @@ -91,7 +91,7 @@ import org.apache.solr.handler.component
> import org.apache.solr.handler.component.ShardHandlerFactory;
> import org.apache.solr.handler.component.ShardRequest;
> import org.apache.solr.handler.component.ShardResponse;
> -import org.apache.solr.logging.MDCUtils;
> +import org.apache.solr.logging.MDCLoggingContext;
> import org.apache.solr.update.SolrIndexSplitter;
> import org.apache.solr.util.DefaultSolrThreadFactory;
> import org.apache.solr.util.stats.Snapshot;
> @@ -998,68 +998,66 @@ public class OverseerCollectionProcessor
> }
>
> @SuppressWarnings("unchecked")
> - private void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
> - checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP,REPLICA_PROP);
> + private void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
> + throws KeeperException, InterruptedException {
> + checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
> String collectionName = message.getStr(COLLECTION_PROP);
> String shard = message.getStr(SHARD_ID_PROP);
> String replicaName = message.getStr(REPLICA_PROP);
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setMDC(collectionName, shard, replicaName, null);
> +
> + DocCollection coll = clusterState.getCollection(collectionName);
> + Slice slice = coll.getSlice(shard);
> + ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> + if (slice == null) {
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "Invalid shard name : " + shard + " in collection : " + collectionName);
> + }
> + Replica replica = slice.getReplica(replicaName);
> + if (replica == null) {
> + ArrayList<String> l = new ArrayList<>();
> + for (Replica r : slice.getReplicas())
> + l.add(r.getName());
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
> + + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
> + }
> +
> + // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
> + // on the command.
> + if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
> + + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
> + }
> +
> + String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
> + String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
> +
> + // assume the core exists and try to unload it
> + Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE,
> + core, CoreAdminParams.DELETE_INSTANCE_DIR, "true", CoreAdminParams.DELETE_DATA_DIR, "true");
> +
> + ShardRequest sreq = new ShardRequest();
> + sreq.purpose = 1;
> + sreq.shards = new String[] {baseUrl};
> + sreq.actualShards = sreq.shards;
> + sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
> try {
> - DocCollection coll = clusterState.getCollection(collectionName);
> - Slice slice = coll.getSlice(shard);
> - ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> - if (slice == null) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid shard name : " + shard + " in collection : " + collectionName);
> - }
> - Replica replica = slice.getReplica(replicaName);
> - if (replica == null) {
> - ArrayList<String> l = new ArrayList<>();
> - for (Replica r : slice.getReplicas()) l.add(r.getName());
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
> - + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
> - }
> -
> - // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
> - // on the command.
> - if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Attempted to remove replica : " + collectionName + "/" +
> - shard + "/" + replicaName +
> - " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
> - }
> -
> - String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
> - String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
> -
> - // assume the core exists and try to unload it
> - Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION,
> - CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, core,
> - CoreAdminParams.DELETE_INSTANCE_DIR, "true",
> - CoreAdminParams.DELETE_DATA_DIR, "true");
> -
> - ShardRequest sreq = new ShardRequest();
> - sreq.purpose = 1;
> - sreq.shards = new String[]{baseUrl};
> - sreq.actualShards = sreq.shards;
> - sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
> - try {
> - shardHandler.submit(sreq, baseUrl, sreq.params);
> - } catch (Exception e) {
> - log.warn("Exception trying to unload core " + sreq, e);
> - }
> -
> - collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results,
> - false, null, shardHandler);
> -
> - if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000))
> - return;//check if the core unload removed the corenode zk enry
> - deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
> - if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
> -
> - throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> + shardHandler.submit(sreq, baseUrl, sreq.params);
> + } catch (Exception e) {
> + log.warn("Exception trying to unload core " + sreq, e);
> }
> +
> + collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results, false, null,
> + shardHandler);
> +
> + if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;// check if the core unload removed the
> + // corenode zk enry
> + deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
> + if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
> +
> + throw new SolrException(ErrorCode.SERVER_ERROR,
> + "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
> +
> }
>
> private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
> @@ -1155,42 +1153,33 @@ public class OverseerCollectionProcessor
> private void createAlias(Aliases aliases, ZkNodeProps message) {
> String aliasName = message.getStr(NAME);
> String collections = message.getStr("collections");
> -
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setCollection(aliasName);
> -
> +
> + Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
> + Map<String,String> newCollectionAliasesMap = new HashMap<>();
> + Map<String,String> prevColAliases = aliases.getCollectionAliasMap();
> + if (prevColAliases != null) {
> + newCollectionAliasesMap.putAll(prevColAliases);
> + }
> + newCollectionAliasesMap.put(aliasName, collections);
> + newAliasesMap.put("collection", newCollectionAliasesMap);
> + Aliases newAliases = new Aliases(newAliasesMap);
> + byte[] jsonBytes = null;
> + if (newAliases.collectionAliasSize() > 0) { // only sub map right now
> + jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap());
> + }
> try {
> - Map<String, Map<String, String>> newAliasesMap = new HashMap<>();
> - Map<String, String> newCollectionAliasesMap = new HashMap<>();
> - Map<String, String> prevColAliases = aliases.getCollectionAliasMap();
> - if (prevColAliases != null) {
> - newCollectionAliasesMap.putAll(prevColAliases);
> - }
> - newCollectionAliasesMap.put(aliasName, collections);
> - newAliasesMap.put("collection", newCollectionAliasesMap);
> - Aliases newAliases = new Aliases(newAliasesMap);
> - byte[] jsonBytes = null;
> - if (newAliases.collectionAliasSize() > 0) { // only sub map right now
> - jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap());
> - }
> - try {
> - zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
> - jsonBytes, true);
> -
> - checkForAlias(aliasName, collections);
> - // some fudge for other nodes
> - Thread.sleep(100);
> - } catch (KeeperException e) {
> - log.error("", e);
> - throw new SolrException(ErrorCode.SERVER_ERROR, e);
> - } catch (InterruptedException e) {
> - log.warn("", e);
> - throw new SolrException(ErrorCode.SERVER_ERROR, e);
> - }
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> + zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
> +
> + checkForAlias(aliasName, collections);
> + // some fudge for other nodes
> + Thread.sleep(100);
> + } catch (KeeperException e) {
> + log.error("", e);
> + throw new SolrException(ErrorCode.SERVER_ERROR, e);
> + } catch (InterruptedException e) {
> + log.warn("", e);
> + throw new SolrException(ErrorCode.SERVER_ERROR, e);
> }
> -
> }
>
> private void checkForAlias(String name, String value) {
> @@ -1233,8 +1222,6 @@ public class OverseerCollectionProcessor
>
> private void deleteAlias(Aliases aliases, ZkNodeProps message) {
> String aliasName = message.getStr(NAME);
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setCollection(aliasName);
>
> Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
> Map<String,String> newCollectionAliasesMap = new HashMap<>();
> @@ -1258,282 +1245,393 @@ public class OverseerCollectionProcessor
> } catch (InterruptedException e) {
> log.warn("", e);
> throw new SolrException(ErrorCode.SERVER_ERROR, e);
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> - }
> + }
>
> }
>
> private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results)
> throws KeeperException, InterruptedException {
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> String collectionName = message.getStr(COLLECTION_PROP);
> String sliceName = message.getStr(SHARD_ID_PROP);
> -
> - MDCUtils.setMDC(collectionName, sliceName, null, null);
> - try {
> - log.info("Create shard invoked: {}", message);
> - if (collectionName == null || sliceName == null)
> - throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
> - int numSlices = 1;
> -
> - ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> - DocCollection collection = clusterState.getCollection(collectionName);
> - int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
> - String createNodeSetStr = message.getStr(CREATE_NODE_SET);
> - List<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, sliceName, repFactor,
> - createNodeSetStr, overseer.getZkController().getCoreContainer());
> -
> - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
> - // wait for a while until we see the shard
> - long waitUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
> - boolean created = false;
> - while (System.nanoTime() < waitUntil) {
> - Thread.sleep(100);
> - created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
> - if (created) break;
> - }
> - if (!created)
> - throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
> -
> -
> - String configName = message.getStr(COLL_CONF);
> - for (int j = 1; j <= repFactor; j++) {
> - String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
> - String shardName = collectionName + "_" + sliceName + "_replica" + j;
> - log.info("Creating shard " + shardName + " as part of slice "
> - + sliceName + " of collection " + collectionName + " on "
> - + nodeName);
> -
> - // Need to create new params for each request
> - ModifiableSolrParams params = new ModifiableSolrParams();
> - params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
> -
> - params.set(CoreAdminParams.NAME, shardName);
> - params.set(COLL_CONF, configName);
> - params.set(CoreAdminParams.COLLECTION, collectionName);
> - params.set(CoreAdminParams.SHARD, sliceName);
> - params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
> - addPropertyParams(message, params);
> -
> - ShardRequest sreq = new ShardRequest();
> - params.set("qt", adminPath);
> - sreq.purpose = 1;
> - String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
> - sreq.shards = new String[]{replica};
> - sreq.actualShards = sreq.shards;
> - sreq.params = params;
> -
> - shardHandler.submit(sreq, replica, sreq.params);
> -
> - }
> -
> - processResponses(results, shardHandler);
> -
> - log.info("Finished create command on all shards for collection: "
> - + collectionName);
> -
> - return true;
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> +
> + log.info("Create shard invoked: {}", message);
> + if (collectionName == null || sliceName == null)
> + throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
> + int numSlices = 1;
> +
> + ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> + DocCollection collection = clusterState.getCollection(collectionName);
> + int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
> + String createNodeSetStr = message.getStr(CREATE_NODE_SET);
> + List<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, sliceName, repFactor,
> + createNodeSetStr, overseer.getZkController().getCoreContainer());
> +
> + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
> + // wait for a while until we see the shard
> + long waitUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
> + boolean created = false;
> + while (System.nanoTime() < waitUntil) {
> + Thread.sleep(100);
> + created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
> + if (created) break;
> + }
> + if (!created)
> + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
> +
> + String configName = message.getStr(COLL_CONF);
> + for (int j = 1; j <= repFactor; j++) {
> + String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
> + String shardName = collectionName + "_" + sliceName + "_replica" + j;
> + log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
> + + " on " + nodeName);
> +
> + // Need to create new params for each request
> + ModifiableSolrParams params = new ModifiableSolrParams();
> + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
> +
> + params.set(CoreAdminParams.NAME, shardName);
> + params.set(COLL_CONF, configName);
> + params.set(CoreAdminParams.COLLECTION, collectionName);
> + params.set(CoreAdminParams.SHARD, sliceName);
> + params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
> + addPropertyParams(message, params);
> +
> + ShardRequest sreq = new ShardRequest();
> + params.set("qt", adminPath);
> + sreq.purpose = 1;
> + String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
> + sreq.shards = new String[] {replica};
> + sreq.actualShards = sreq.shards;
> + sreq.params = params;
> +
> + shardHandler.submit(sreq, replica, sreq.params);
> +
> }
> +
> + processResponses(results, shardHandler);
> +
> + log.info("Finished create command on all shards for collection: " + collectionName);
> +
> + return true;
> }
>
>
> private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
> String collectionName = message.getStr("collection");
> String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setMDC(collectionName, slice, null, null);
> - try {
> - log.info("Split shard invoked");
> - String splitKey = message.getStr("split.key");
> - ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> -
> - DocCollection collection = clusterState.getCollection(collectionName);
> - DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
> -
> - Slice parentSlice = null;
> -
> - if (slice == null) {
> - if (router instanceof CompositeIdRouter) {
> - Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
> - if (searchSlices.isEmpty()) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
> - }
> - if (searchSlices.size() > 1) {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
> - }
> - parentSlice = searchSlices.iterator().next();
> - slice = parentSlice.getName();
> - log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
> - } else {
> +
> + log.info("Split shard invoked");
> + String splitKey = message.getStr("split.key");
> + ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> +
> + DocCollection collection = clusterState.getCollection(collectionName);
> + DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
> +
> + Slice parentSlice = null;
> +
> + if (slice == null) {
> + if (router instanceof CompositeIdRouter) {
> + Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
> + if (searchSlices.isEmpty()) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
> + }
> + if (searchSlices.size() > 1) {
> throw new SolrException(ErrorCode.BAD_REQUEST,
> - "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName());
> + "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
> }
> + parentSlice = searchSlices.iterator().next();
> + slice = parentSlice.getName();
> + log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
> } else {
> - parentSlice = clusterState.getSlice(collectionName, slice);
> - }
> -
> - if (parentSlice == null) {
> - if (clusterState.hasCollection(collectionName)) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
> - } else {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
> - }
> - }
> -
> - // find the leader for the shard
> - Replica parentShardLeader = null;
> - try {
> - parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
> - } catch (InterruptedException e) {
> - Thread.currentThread().interrupt();
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
> + + router.getClass().getName());
> }
> -
> - DocRouter.Range range = parentSlice.getRange();
> - if (range == null) {
> - range = new PlainIdRouter().fullRange();
> + } else {
> + parentSlice = clusterState.getSlice(collectionName, slice);
> + }
> +
> + if (parentSlice == null) {
> + if (clusterState.hasCollection(collectionName)) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
> + } else {
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "No collection with the specified name exists: " + collectionName);
> }
> -
> - List<DocRouter.Range> subRanges = null;
> - String rangesStr = message.getStr(CoreAdminParams.RANGES);
> - if (rangesStr != null) {
> - String[] ranges = rangesStr.split(",");
> - if (ranges.length == 0 || ranges.length == 1) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
> - } else {
> - subRanges = new ArrayList<>(ranges.length);
> - for (int i = 0; i < ranges.length; i++) {
> - String r = ranges[i];
> - try {
> - subRanges.add(DocRouter.DEFAULT.fromString(r));
> - } catch (Exception e) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
> - }
> - if (!subRanges.get(i).isSubsetOf(range)) {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
> - }
> + }
> +
> + // find the leader for the shard
> + Replica parentShardLeader = null;
> + try {
> + parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
> + } catch (InterruptedException e) {
> + Thread.currentThread().interrupt();
> + }
> +
> + DocRouter.Range range = parentSlice.getRange();
> + if (range == null) {
> + range = new PlainIdRouter().fullRange();
> + }
> +
> + List<DocRouter.Range> subRanges = null;
> + String rangesStr = message.getStr(CoreAdminParams.RANGES);
> + if (rangesStr != null) {
> + String[] ranges = rangesStr.split(",");
> + if (ranges.length == 0 || ranges.length == 1) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
> + } else {
> + subRanges = new ArrayList<>(ranges.length);
> + for (int i = 0; i < ranges.length; i++) {
> + String r = ranges[i];
> + try {
> + subRanges.add(DocRouter.DEFAULT.fromString(r));
> + } catch (Exception e) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
> }
> - List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
> - Collections.sort(temp);
> - if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
> + if (!subRanges.get(i).isSubsetOf(range)) {
> throw new SolrException(ErrorCode.BAD_REQUEST,
> - "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
> - }
> - for (int i = 1; i < temp.size(); i++) {
> - if (temp.get(i - 1).max + 1 != temp.get(i).min) {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "Specified hash ranges: " + rangesStr + " either overlap with each other or " +
> - "do not cover the entire range of parent shard: " + range);
> - }
> + "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
> }
> }
> - } else if (splitKey != null) {
> - if (router instanceof CompositeIdRouter) {
> - CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
> - subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
> - if (subRanges.size() == 1) {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice);
> - }
> - for (DocRouter.Range subRange : subRanges) {
> - if (subRange.min == subRange.max) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
> - }
> - }
> - log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
> - rangesStr = "";
> - for (int i = 0; i < subRanges.size(); i++) {
> - DocRouter.Range subRange = subRanges.get(i);
> - rangesStr += subRange.toString();
> - if (i < subRanges.size() - 1)
> - rangesStr += ',';
> + List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
> + Collections.sort(temp);
> + if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
> + }
> + for (int i = 1; i < temp.size(); i++) {
> + if (temp.get(i - 1).max + 1 != temp.get(i).min) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
> + + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
> }
> }
> - } else {
> - // todo: fixed to two partitions?
> - subRanges = router.partitionRange(2, range);
> }
> -
> - try {
> - List<String> subSlices = new ArrayList<>(subRanges.size());
> - List<String> subShardNames = new ArrayList<>(subRanges.size());
> - String nodeName = parentShardLeader.getNodeName();
> + } else if (splitKey != null) {
> + if (router instanceof CompositeIdRouter) {
> + CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
> + subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
> + if (subRanges.size() == 1) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
> + + " has a hash range that is exactly equal to hash range of shard: " + slice);
> + }
> + for (DocRouter.Range subRange : subRanges) {
> + if (subRange.min == subRange.max) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
> + }
> + }
> + log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
> + rangesStr = "";
> for (int i = 0; i < subRanges.size(); i++) {
> - String subSlice = slice + "_" + i;
> - subSlices.add(subSlice);
> - String subShardName = collectionName + "_" + subSlice + "_replica1";
> - subShardNames.add(subShardName);
> -
> - Slice oSlice = clusterState.getSlice(collectionName, subSlice);
> - if (oSlice != null) {
> - final Slice.State state = oSlice.getState();
> - if (state == Slice.State.ACTIVE) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
> - } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
> - // delete the shards
> - for (String sub : subSlices) {
> - log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
> - Map<String, Object> propMap = new HashMap<>();
> - propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
> - propMap.put(COLLECTION_PROP, collectionName);
> - propMap.put(SHARD_ID_PROP, sub);
> - ZkNodeProps m = new ZkNodeProps(propMap);
> - try {
> - deleteShard(clusterState, m, new NamedList());
> - } catch (Exception e) {
> - throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e);
> - }
> + DocRouter.Range subRange = subRanges.get(i);
> + rangesStr += subRange.toString();
> + if (i < subRanges.size() - 1) rangesStr += ',';
> + }
> + }
> + } else {
> + // todo: fixed to two partitions?
> + subRanges = router.partitionRange(2, range);
> + }
> +
> + try {
> + List<String> subSlices = new ArrayList<>(subRanges.size());
> + List<String> subShardNames = new ArrayList<>(subRanges.size());
> + String nodeName = parentShardLeader.getNodeName();
> + for (int i = 0; i < subRanges.size(); i++) {
> + String subSlice = slice + "_" + i;
> + subSlices.add(subSlice);
> + String subShardName = collectionName + "_" + subSlice + "_replica1";
> + subShardNames.add(subShardName);
> +
> + Slice oSlice = clusterState.getSlice(collectionName, subSlice);
> + if (oSlice != null) {
> + final Slice.State state = oSlice.getState();
> + if (state == Slice.State.ACTIVE) {
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
> + } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
> + // delete the shards
> + for (String sub : subSlices) {
> + log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
> + Map<String,Object> propMap = new HashMap<>();
> + propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
> + propMap.put(COLLECTION_PROP, collectionName);
> + propMap.put(SHARD_ID_PROP, sub);
> + ZkNodeProps m = new ZkNodeProps(propMap);
> + try {
> + deleteShard(clusterState, m, new NamedList());
> + } catch (Exception e) {
> + throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub,
> + e);
> }
> }
> }
> }
> -
> - // do not abort splitshard if the unloading fails
> - // this can happen because the replicas created previously may be down
> - // the only side effect of this is that the sub shard may end up having more replicas than we want
> - collectShardResponses(results, false, null, shardHandler);
> -
> - String asyncId = message.getStr(ASYNC);
> - HashMap<String, String> requestMap = new HashMap<String, String>();
> -
> - for (int i = 0; i < subRanges.size(); i++) {
> - String subSlice = subSlices.get(i);
> - String subShardName = subShardNames.get(i);
> - DocRouter.Range subRange = subRanges.get(i);
> -
> - log.info("Creating slice "
> - + subSlice + " of collection " + collectionName + " on "
> - + nodeName);
> -
> - Map<String, Object> propMap = new HashMap<>();
> - propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
> - propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
> - propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
> - propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
> - propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
> - propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
> - DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
> - inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap)));
> -
> - // wait until we are able to see the new shard in cluster state
> - waitForNewShard(collectionName, subSlice);
> -
> - // refresh cluster state
> - clusterState = zkStateReader.getClusterState();
> -
> - log.info("Adding replica " + subShardName + " as part of slice "
> - + subSlice + " of collection " + collectionName + " on "
> - + nodeName);
> - propMap = new HashMap<>();
> + }
> +
> + // do not abort splitshard if the unloading fails
> + // this can happen because the replicas created previously may be down
> + // the only side effect of this is that the sub shard may end up having more replicas than we want
> + collectShardResponses(results, false, null, shardHandler);
> +
> + String asyncId = message.getStr(ASYNC);
> + HashMap<String,String> requestMap = new HashMap<String,String>();
> +
> + for (int i = 0; i < subRanges.size(); i++) {
> + String subSlice = subSlices.get(i);
> + String subShardName = subShardNames.get(i);
> + DocRouter.Range subRange = subRanges.get(i);
> +
> + log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
> +
> + Map<String,Object> propMap = new HashMap<>();
> + propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
> + propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
> + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
> + propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
> + propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
> + propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
> + DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
> + inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap)));
> +
> + // wait until we are able to see the new shard in cluster state
> + waitForNewShard(collectionName, subSlice);
> +
> + // refresh cluster state
> + clusterState = zkStateReader.getClusterState();
> +
> + log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
> + + " on " + nodeName);
> + propMap = new HashMap<>();
> + propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
> + propMap.put(COLLECTION_PROP, collectionName);
> + propMap.put(SHARD_ID_PROP, subSlice);
> + propMap.put("node", nodeName);
> + propMap.put(CoreAdminParams.NAME, subShardName);
> + // copy over property params:
> + for (String key : message.keySet()) {
> + if (key.startsWith(COLL_PROP_PREFIX)) {
> + propMap.put(key, message.getStr(key));
> + }
> + }
> + // add async param
> + if (asyncId != null) {
> + propMap.put(ASYNC, asyncId);
> + }
> + addReplica(clusterState, new ZkNodeProps(propMap), results);
> + }
> +
> + collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler);
> +
> + completeAsyncRequest(asyncId, requestMap, results);
> +
> + for (String subShardName : subShardNames) {
> + // wait for parent leader to acknowledge the sub-shard core
> + log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
> + String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
> + CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
> + cmd.setCoreName(subShardName);
> + cmd.setNodeName(nodeName);
> + cmd.setCoreNodeName(coreNodeName);
> + cmd.setState(Replica.State.ACTIVE);
> + cmd.setCheckLive(true);
> + cmd.setOnlyIfLeader(true);
> +
> + ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
> + sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
> + }
> +
> + collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
> + shardHandler);
> +
> + completeAsyncRequest(asyncId, requestMap, results);
> +
> + log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
> + + " on: " + parentShardLeader);
> +
> + log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
> + + collectionName + " on " + parentShardLeader);
> +
> + ModifiableSolrParams params = new ModifiableSolrParams();
> + params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
> + params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
> + for (int i = 0; i < subShardNames.size(); i++) {
> + String subShardName = subShardNames.get(i);
> + params.add(CoreAdminParams.TARGET_CORE, subShardName);
> + }
> + params.set(CoreAdminParams.RANGES, rangesStr);
> +
> + sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
> +
> + collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler);
> + completeAsyncRequest(asyncId, requestMap, results);
> +
> + log.info("Index on shard: " + nodeName + " split into two successfully");
> +
> + // apply buffered updates on sub-shards
> + for (int i = 0; i < subShardNames.size(); i++) {
> + String subShardName = subShardNames.get(i);
> +
> + log.info("Applying buffered updates on : " + subShardName);
> +
> + params = new ModifiableSolrParams();
> + params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
> + params.set(CoreAdminParams.NAME, subShardName);
> +
> + sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
> + }
> +
> + collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
> + shardHandler);
> +
> + completeAsyncRequest(asyncId, requestMap, results);
> +
> + log.info("Successfully applied buffered updates on : " + subShardNames);
> +
> + // Replica creation for the new Slices
> +
> + // look at the replication factor and see if it matches reality
> + // if it does not, find best nodes to create more cores
> +
> + // TODO: Have replication factor decided in some other way instead of numShards for the parent
> +
> + int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
> +
> + // we need to look at every node and see how many cores it serves
> + // add our new cores to existing nodes serving the least number of cores
> + // but (for now) require that each core goes on a distinct node.
> +
> + // TODO: add smarter options that look at the current number of cores per
> + // node?
> + // for now we just go random
> + Set<String> nodes = clusterState.getLiveNodes();
> + List<String> nodeList = new ArrayList<>(nodes.size());
> + nodeList.addAll(nodes);
> +
> + Collections.shuffle(nodeList, RANDOM);
> +
> + // TODO: Have maxShardsPerNode param for this operation?
> +
> + // Remove the node that hosts the parent shard for replica creation.
> + nodeList.remove(nodeName);
> +
> + // TODO: change this to handle sharding a slice into > 2 sub-shards.
> +
> + for (int i = 1; i <= subSlices.size(); i++) {
> + Collections.shuffle(nodeList, RANDOM);
> + String sliceName = subSlices.get(i - 1);
> + for (int j = 2; j <= repFactor; j++) {
> + String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
> + String shardName = collectionName + "_" + sliceName + "_replica" + (j);
> +
> + log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
> + + collectionName + " on " + subShardNodeName);
> +
> + HashMap<String,Object> propMap = new HashMap<>();
> propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
> propMap.put(COLLECTION_PROP, collectionName);
> - propMap.put(SHARD_ID_PROP, subSlice);
> - propMap.put("node", nodeName);
> - propMap.put(CoreAdminParams.NAME, subShardName);
> + propMap.put(SHARD_ID_PROP, sliceName);
> + propMap.put("node", subShardNodeName);
> + propMap.put(CoreAdminParams.NAME, shardName);
> // copy over property params:
> for (String key : message.keySet()) {
> if (key.startsWith(COLL_PROP_PREFIX)) {
> @@ -1541,203 +1639,69 @@ public class OverseerCollectionProcessor
> }
> }
> // add async param
> - if(asyncId != null) {
> + if (asyncId != null) {
> propMap.put(ASYNC, asyncId);
> }
> addReplica(clusterState, new ZkNodeProps(propMap), results);
> - }
> -
> - collectShardResponses(results, true,
> - "SPLITSHARD failed to create subshard leaders", shardHandler);
> -
> - completeAsyncRequest(asyncId, requestMap, results);
> -
> - for (String subShardName : subShardNames) {
> - // wait for parent leader to acknowledge the sub-shard core
> - log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
> - String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
> +
> + String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
> + // wait for the replicas to be seen as active on sub shard leader
> + log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
> CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
> - cmd.setCoreName(subShardName);
> - cmd.setNodeName(nodeName);
> + cmd.setCoreName(subShardNames.get(i - 1));
> + cmd.setNodeName(subShardNodeName);
> cmd.setCoreNodeName(coreNodeName);
> - cmd.setState(Replica.State.ACTIVE);
> + cmd.setState(Replica.State.RECOVERING);
> cmd.setCheckLive(true);
> cmd.setOnlyIfLeader(true);
> -
> ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
> +
> sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
> +
> }
> -
> - collectShardResponses(results, true,
> - "SPLITSHARD timed out waiting for subshard leaders to come up", shardHandler);
> -
> - completeAsyncRequest(asyncId, requestMap, results);
> -
> - log.info("Successfully created all sub-shards for collection "
> - + collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
> -
> - log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice "
> - + slice + " of collection " + collectionName + " on "
> - + parentShardLeader);
> -
> - ModifiableSolrParams params = new ModifiableSolrParams();
> - params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
> - params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
> - for (int i = 0; i < subShardNames.size(); i++) {
> - String subShardName = subShardNames.get(i);
> - params.add(CoreAdminParams.TARGET_CORE, subShardName);
> - }
> - params.set(CoreAdminParams.RANGES, rangesStr);
> -
> - sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
> -
> - collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command",
> - shardHandler);
> - completeAsyncRequest(asyncId, requestMap, results);
> -
> - log.info("Index on shard: " + nodeName + " split into two successfully");
> -
> - // apply buffered updates on sub-shards
> - for (int i = 0; i < subShardNames.size(); i++) {
> - String subShardName = subShardNames.get(i);
> -
> - log.info("Applying buffered updates on : " + subShardName);
> -
> - params = new ModifiableSolrParams();
> - params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
> - params.set(CoreAdminParams.NAME, subShardName);
> -
> - sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
> - }
> -
> - collectShardResponses(results, true,
> - "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
> - shardHandler);
> -
> - completeAsyncRequest(asyncId, requestMap, results);
> -
> - log.info("Successfully applied buffered updates on : " + subShardNames);
> -
> - // Replica creation for the new Slices
> -
> - // look at the replication factor and see if it matches reality
> - // if it does not, find best nodes to create more cores
> -
> - // TODO: Have replication factor decided in some other way instead of numShards for the parent
> -
> - int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
> -
> - // we need to look at every node and see how many cores it serves
> - // add our new cores to existing nodes serving the least number of cores
> - // but (for now) require that each core goes on a distinct node.
> -
> - // TODO: add smarter options that look at the current number of cores per
> - // node?
> - // for now we just go random
> - Set<String> nodes = clusterState.getLiveNodes();
> - List<String> nodeList = new ArrayList<>(nodes.size());
> - nodeList.addAll(nodes);
> -
> - Collections.shuffle(nodeList, RANDOM);
> -
> - // TODO: Have maxShardsPerNode param for this operation?
> -
> - // Remove the node that hosts the parent shard for replica creation.
> - nodeList.remove(nodeName);
> -
> - // TODO: change this to handle sharding a slice into > 2 sub-shards.
> -
> - for (int i = 1; i <= subSlices.size(); i++) {
> - Collections.shuffle(nodeList, RANDOM);
> - String sliceName = subSlices.get(i - 1);
> - for (int j = 2; j <= repFactor; j++) {
> - String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
> - String shardName = collectionName + "_" + sliceName + "_replica" + (j);
> -
> - log.info("Creating replica shard " + shardName + " as part of slice "
> - + sliceName + " of collection " + collectionName + " on "
> - + subShardNodeName);
> -
> - HashMap<String, Object> propMap = new HashMap<>();
> - propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
> - propMap.put(COLLECTION_PROP, collectionName);
> - propMap.put(SHARD_ID_PROP, sliceName);
> - propMap.put("node", subShardNodeName);
> - propMap.put(CoreAdminParams.NAME, shardName);
> - // copy over property params:
> - for (String key : message.keySet()) {
> - if (key.startsWith(COLL_PROP_PREFIX)) {
> - propMap.put(key, message.getStr(key));
> - }
> - }
> - // add async param
> - if (asyncId != null) {
> - propMap.put(ASYNC, asyncId);
> - }
> - addReplica(clusterState, new ZkNodeProps(propMap), results);
> -
> - String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
> - // wait for the replicas to be seen as active on sub shard leader
> - log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
> - CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
> - cmd.setCoreName(subShardNames.get(i - 1));
> - cmd.setNodeName(subShardNodeName);
> - cmd.setCoreNodeName(coreNodeName);
> - cmd.setState(Replica.State.RECOVERING);
> - cmd.setCheckLive(true);
> - cmd.setOnlyIfLeader(true);
> - ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
> -
> - sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
> -
> - }
> - }
> -
> - collectShardResponses(results, true,
> - "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up",
> - shardHandler);
> -
> - completeAsyncRequest(asyncId, requestMap, results);
> -
> - log.info("Successfully created all replica shards for all sub-slices " + subSlices);
> -
> - commit(results, slice, parentShardLeader);
> -
> - if (repFactor == 1) {
> - // switch sub shard states to 'active'
> - log.info("Replication factor is 1 so switching shard states");
> - DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
> - Map<String, Object> propMap = new HashMap<>();
> - propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
> - propMap.put(slice, Slice.State.INACTIVE.toString());
> - for (String subSlice : subSlices) {
> - propMap.put(subSlice, Slice.State.ACTIVE.toString());
> - }
> - propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
> - ZkNodeProps m = new ZkNodeProps(propMap);
> - inQueue.offer(ZkStateReader.toJSON(m));
> - } else {
> - log.info("Requesting shard state be set to 'recovery'");
> - DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
> - Map<String, Object> propMap = new HashMap<>();
> - propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
> - for (String subSlice : subSlices) {
> - propMap.put(subSlice, Slice.State.RECOVERY.toString());
> - }
> - propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
> - ZkNodeProps m = new ZkNodeProps(propMap);
> - inQueue.offer(ZkStateReader.toJSON(m));
> - }
> -
> - return true;
> - } catch (SolrException e) {
> - throw e;
> - } catch (Exception e) {
> - log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
> - throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
> }
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> +
> + collectShardResponses(results, true,
> + "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up", shardHandler);
> +
> + completeAsyncRequest(asyncId, requestMap, results);
> +
> + log.info("Successfully created all replica shards for all sub-slices " + subSlices);
> +
> + commit(results, slice, parentShardLeader);
> +
> + if (repFactor == 1) {
> + // switch sub shard states to 'active'
> + log.info("Replication factor is 1 so switching shard states");
> + DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
> + Map<String,Object> propMap = new HashMap<>();
> + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
> + propMap.put(slice, Slice.State.INACTIVE.toString());
> + for (String subSlice : subSlices) {
> + propMap.put(subSlice, Slice.State.ACTIVE.toString());
> + }
> + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
> + ZkNodeProps m = new ZkNodeProps(propMap);
> + inQueue.offer(ZkStateReader.toJSON(m));
> + } else {
> + log.info("Requesting shard state be set to 'recovery'");
> + DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
> + Map<String,Object> propMap = new HashMap<>();
> + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
> + for (String subSlice : subSlices) {
> + propMap.put(subSlice, Slice.State.RECOVERY.toString());
> + }
> + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
> + ZkNodeProps m = new ZkNodeProps(propMap);
> + inQueue.offer(ZkStateReader.toJSON(m));
> + }
> +
> + return true;
> + } catch (SolrException e) {
> + throw e;
> + } catch (Exception e) {
> + log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
> + throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
> }
> }
>
> @@ -1846,71 +1810,64 @@ public class OverseerCollectionProcessor
> private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
> String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
> String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setMDC(collection, sliceId, null, null);
> - try {
> - log.info("Delete shard invoked");
> - Slice slice = clusterState.getSlice(collection, sliceId);
> -
> - if (slice == null) {
> - if (clusterState.hasCollection(collection)) {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "No shard with name " + sliceId + " exists for collection " + collection);
> - } else {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "No collection with the specified name exists: " + collection);
> - }
> - }
> - // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
> - // TODO: Add check for range gaps on Slice deletion
> - final Slice.State state = slice.getState();
> - if (!(slice.getRange() == null || state == Slice.State.INACTIVE
> - || state == Slice.State.RECOVERY || state == Slice.State.CONSTRUCTION)) {
> +
> + log.info("Delete shard invoked");
> + Slice slice = clusterState.getSlice(collection, sliceId);
> +
> + if (slice == null) {
> + if (clusterState.hasCollection(collection)) {
> throw new SolrException(ErrorCode.BAD_REQUEST,
> - "The slice: " + slice.getName() + " is currently "
> - + state + ". Only non-active (or custom-hashed) slices can be deleted.");
> + "No shard with name " + sliceId + " exists for collection " + collection);
> + } else {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
> }
> - ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> -
> - try {
> - ModifiableSolrParams params = new ModifiableSolrParams();
> - params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
> - params.set(CoreAdminParams.DELETE_INDEX, "true");
> - sliceCmd(clusterState, params, null, slice, shardHandler);
> -
> - processResponses(results, shardHandler);
> -
> - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
> - DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, collection,
> - ZkStateReader.SHARD_ID_PROP, sliceId);
> - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
> -
> - // wait for a while until we don't see the shard
> - long now = System.nanoTime();
> - long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
> - boolean removed = false;
> - while (System.nanoTime() < timeout) {
> - Thread.sleep(100);
> - removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null;
> - if (removed) {
> - Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
> - break;
> - }
> - }
> - if (!removed) {
> - throw new SolrException(ErrorCode.SERVER_ERROR,
> - "Could not fully remove collection: " + collection + " shard: " + sliceId);
> + }
> + // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
> + // TODO: Add check for range gaps on Slice deletion
> + final Slice.State state = slice.getState();
> + if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
> + || state == Slice.State.CONSTRUCTION)) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
> + + ". Only non-active (or custom-hashed) slices can be deleted.");
> + }
> + ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> +
> + try {
> + ModifiableSolrParams params = new ModifiableSolrParams();
> + params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
> + params.set(CoreAdminParams.DELETE_INDEX, "true");
> + sliceCmd(clusterState, params, null, slice, shardHandler);
> +
> + processResponses(results, shardHandler);
> +
> + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
> + collection, ZkStateReader.SHARD_ID_PROP, sliceId);
> + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
> +
> + // wait for a while until we don't see the shard
> + long now = System.nanoTime();
> + long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
> + boolean removed = false;
> + while (System.nanoTime() < timeout) {
> + Thread.sleep(100);
> + removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null;
> + if (removed) {
> + Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
> + break;
> }
> -
> - log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId);
> -
> - } catch (SolrException e) {
> - throw e;
> - } catch (Exception e) {
> - throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e);
> }
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> + if (!removed) {
> + throw new SolrException(ErrorCode.SERVER_ERROR,
> + "Could not fully remove collection: " + collection + " shard: " + sliceId);
> + }
> +
> + log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId);
> +
> + } catch (SolrException e) {
> + throw e;
> + } catch (Exception e) {
> + throw new SolrException(ErrorCode.SERVER_ERROR,
> + "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e);
> }
> }
>
> @@ -2505,110 +2462,101 @@ public class OverseerCollectionProcessor
> }
> }
>
> - private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
> + private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
> + throws KeeperException, InterruptedException {
> String collection = message.getStr(COLLECTION_PROP);
> String node = message.getStr("node");
> String shard = message.getStr(SHARD_ID_PROP);
> String coreName = message.getStr(CoreAdminParams.NAME);
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setMDC(collection, shard, null, coreName);
> - try {
> - String asyncId = message.getStr("async");
> -
> - DocCollection coll = clusterState.getCollection(collection);
> - if (coll == null) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
> - }
> - if (coll.getSlice(shard) == null) {
> - throw new SolrException(ErrorCode.BAD_REQUEST,
> - "Collection: " + collection + " shard: " + shard + " does not exist");
> - }
> - ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> -
> - if (node == null) {
> -
> - node = getNodesForNewShard(clusterState, collection, shard, 1,
> - null, overseer.getZkController().getCoreContainer()).get(0).nodeName;
> - log.info("Node not provided, Identified {} for creating new replica", node);
> - }
> -
> -
> - if (!clusterState.liveNodesContain(node)) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
> - }
> - if (coreName == null) {
> - // assign a name to this core
> - Slice slice = coll.getSlice(shard);
> - int replicaNum = slice.getReplicas().size();
> - for (; ; ) {
> - String replicaName = collection + "_" + shard + "_replica" + replicaNum;
> - boolean exists = false;
> - for (Replica replica : slice.getReplicas()) {
> - if (replicaName.equals(replica.getStr("core"))) {
> - exists = true;
> - break;
> - }
> +
> + String asyncId = message.getStr("async");
> +
> + DocCollection coll = clusterState.getCollection(collection);
> + if (coll == null) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
> + }
> + if (coll.getSlice(shard) == null) {
> + throw new SolrException(ErrorCode.BAD_REQUEST,
> + "Collection: " + collection + " shard: " + shard + " does not exist");
> + }
> + ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
> +
> + if (node == null) {
> +
> + node = getNodesForNewShard(clusterState, collection, shard, 1, null,
> + overseer.getZkController().getCoreContainer()).get(0).nodeName;
> + log.info("Node not provided, Identified {} for creating new replica", node);
> + }
> +
> + if (!clusterState.liveNodesContain(node)) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
> + }
> + if (coreName == null) {
> + // assign a name to this core
> + Slice slice = coll.getSlice(shard);
> + int replicaNum = slice.getReplicas().size();
> + for (;;) {
> + String replicaName = collection + "_" + shard + "_replica" + replicaNum;
> + boolean exists = false;
> + for (Replica replica : slice.getReplicas()) {
> + if (replicaName.equals(replica.getStr("core"))) {
> + exists = true;
> + break;
> }
> - if (exists) replicaNum++;
> - else break;
> - }
> - coreName = collection + "_" + shard + "_replica" + replicaNum;
> - }
> - ModifiableSolrParams params = new ModifiableSolrParams();
> -
> - if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
> - ZkNodeProps props = new ZkNodeProps(
> - Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
> - ZkStateReader.COLLECTION_PROP, collection,
> - ZkStateReader.SHARD_ID_PROP, shard,
> - ZkStateReader.CORE_NAME_PROP, coreName,
> - ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
> - ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node));
> - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
> - params.set(CoreAdminParams.CORE_NODE_NAME, waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
> - }
> -
> -
> - String configName = zkStateReader.readConfigName(collection);
> - String routeKey = message.getStr(ShardParams._ROUTE_);
> - String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
> - String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
> -
> - params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
> - params.set(CoreAdminParams.NAME, coreName);
> - params.set(COLL_CONF, configName);
> - params.set(CoreAdminParams.COLLECTION, collection);
> - if (shard != null) {
> - params.set(CoreAdminParams.SHARD, shard);
> - } else if (routeKey != null) {
> - Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
> - if (slices.isEmpty()) {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
> - } else {
> - params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
> }
> - } else {
> - throw new SolrException(ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
> - }
> - if (dataDir != null) {
> - params.set(CoreAdminParams.DATA_DIR, dataDir);
> + if (exists) replicaNum++;
> + else break;
> }
> - if (instanceDir != null) {
> - params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
> + coreName = collection + "_" + shard + "_replica" + replicaNum;
> + }
> + ModifiableSolrParams params = new ModifiableSolrParams();
> +
> + if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
> + ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP,
> + collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,
> + ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
> + zkStateReader.getBaseUrlForNodeName(node));
> + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
> + params.set(CoreAdminParams.CORE_NODE_NAME,
> + waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
> + }
> +
> + String configName = zkStateReader.readConfigName(collection);
> + String routeKey = message.getStr(ShardParams._ROUTE_);
> + String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
> + String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
> +
> + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
> + params.set(CoreAdminParams.NAME, coreName);
> + params.set(COLL_CONF, configName);
> + params.set(CoreAdminParams.COLLECTION, collection);
> + if (shard != null) {
> + params.set(CoreAdminParams.SHARD, shard);
> + } else if (routeKey != null) {
> + Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
> + if (slices.isEmpty()) {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
> + } else {
> + params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
> }
> - addPropertyParams(message, params);
> -
> - // For tracking async calls.
> - HashMap<String, String> requestMap = new HashMap<>();
> - sendShardRequest(node, params, shardHandler, asyncId, requestMap);
> -
> - collectShardResponses(results, true,
> - "ADDREPLICA failed to create replica", shardHandler);
> -
> - completeAsyncRequest(asyncId, requestMap, results);
> - } finally {
> - MDCUtils.cleanupMDC(previousMDCContext);
> + } else {
> + throw new SolrException(ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
> + }
> + if (dataDir != null) {
> + params.set(CoreAdminParams.DATA_DIR, dataDir);
> + }
> + if (instanceDir != null) {
> + params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
> }
> + addPropertyParams(message, params);
> +
> + // For tracking async calls.
> + HashMap<String,String> requestMap = new HashMap<>();
> + sendShardRequest(node, params, shardHandler, asyncId, requestMap);
> +
> + collectShardResponses(results, true, "ADDREPLICA failed to create replica", shardHandler);
> +
> + completeAsyncRequest(asyncId, requestMap, results);
> }
>
> private void processResponses(NamedList results, ShardHandler shardHandler) {
> @@ -2867,8 +2815,7 @@ public class OverseerCollectionProcessor
> String asyncId = message.getStr(ASYNC);
> String collectionName = message.containsKey(COLLECTION_PROP) ?
> message.getStr(COLLECTION_PROP) : message.getStr(NAME);
> - Map previousMDCContext = MDC.getCopyOfContextMap();
> - MDCUtils.setCollection(collectionName);
> +
> try {
> try {
> log.debug("Runner processing {}", head.getId());
> @@ -2913,7 +2860,6 @@ public class OverseerCollectionProcessor
> synchronized (waitLock){
> waitLock.notifyAll();
> }
> - MDCUtils.cleanupMDC(previousMDCContext);
> }
> }
>
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org