You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rj...@apache.org on 2015/03/31 07:22:50 UTC
svn commit: r1670257 [21/39] - in /lucene/dev/branches/lucene6271: ./
dev-tools/ dev-tools/idea/.idea/libraries/ dev-tools/scripts/ lucene/
lucene/analysis/ lucene/analysis/common/
lucene/analysis/common/src/java/org/apache/lucene/analysis/miscellaneou...
Modified: lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1670257&r1=1670256&r2=1670257&view=diff
==============================================================================
--- lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Mar 31 05:22:40 2015
@@ -45,12 +45,15 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.logging.MDCUtils;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.zookeeper.CreateMode;
@@ -58,11 +61,13 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -98,29 +103,20 @@ import static org.apache.solr.common.clo
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-
/**
* Handle ZooKeeper interactions.
- *
+ * <p>
* notes: loads everything on init, creates what's not there - further updates
* are prompted with Watches.
- *
+ * <p>
* TODO: exceptions during close on attempts to update cloud state
- *
*/
public final class ZkController {
private static Logger log = LoggerFactory.getLogger(ZkController.class);
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
-
+
private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue;
@@ -128,19 +124,19 @@ public final class ZkController {
private final DistributedMap overseerCompletedMap;
private final DistributedMap overseerFailureMap;
- public final static String COLLECTION_PARAM_PREFIX="collection.";
- public final static String CONFIGNAME_PROP="configName";
+ public final static String COLLECTION_PARAM_PREFIX = "collection.";
+ public final static String CONFIGNAME_PROP = "configName";
static class ContextKey {
private String collection;
private String coreNodeName;
-
+
public ContextKey(String collection, String coreNodeName) {
this.collection = collection;
this.coreNodeName = coreNodeName;
}
-
+
@Override
public int hashCode() {
final int prime = 31;
@@ -167,25 +163,26 @@ public final class ZkController {
return true;
}
}
- private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<ContextKey, ElectionContext>());
-
+
+ private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
+
private final SolrZkClient zkClient;
private final ZkCmdExecutor cmdExecutor;
private final ZkStateReader zkStateReader;
private final LeaderElector leaderElector;
-
+
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
- private final String localHostPort; // example: 54065
- private final String localHostContext; // example: solr
+ private final int localHostPort; // example: 54065
private final String hostName; // example: 127.0.0.1
private final String nodeName; // example: 127.0.0.1:54065_solr
private final String baseURL; // example: http://127.0.0.1:54065/solr
+ private final CloudConfig cloudConfig;
private LeaderElector overseerElector;
-
+
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -195,16 +192,16 @@ public final class ZkController {
private int leaderVoteWait;
private int leaderConflictResolveWait;
-
+
private boolean genericCoreNodeNames;
private int clientTimeout;
private volatile boolean isClosed;
-
+
// keeps track of replicas that have been asked to recover by leaders running on this node
- private final Map<String,String> replicasInLeaderInitiatedRecovery = new HashMap<String,String>();
-
+ private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
+
// This is an expert and unsupported development mode that does not create
// an Overseer or register a /live node. This let's you monitor the cluster
// and interact with zookeeper via the Solr admin UI on a node outside the cluster,
@@ -214,33 +211,33 @@ public final class ZkController {
// keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
- public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
- String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect)
+ public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
throws InterruptedException, TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
this.cc = cc;
- this.genericCoreNodeNames = genericCoreNodeNames;
+
+ this.cloudConfig = cloudConfig;
+
+ this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames();
+
// be forgiving and strip this off leading/trailing slashes
// this allows us to support users specifying hostContext="/" in
// solr.xml to indicate the root context, instead of hostContext=""
// which means the default of "solr"
- localHostContext = trimLeadingAndTrailingSlashes(localHostContext);
+ String localHostContext = trimLeadingAndTrailingSlashes(cloudConfig.getSolrHostContext());
this.zkServerAddress = zkServerAddress;
- this.localHostPort = locaHostPort;
- this.localHostContext = localHostContext;
- this.hostName = normalizeHostName(localHost);
- this.nodeName = generateNodeName(this.hostName,
- this.localHostPort,
- this.localHostContext);
+ this.localHostPort = cloudConfig.getSolrHostPort();
+ this.hostName = normalizeHostName(cloudConfig.getHost());
+ this.nodeName = generateNodeName(this.hostName, Integer.toString(this.localHostPort), localHostContext);
- this.leaderVoteWait = leaderVoteWait;
- this.leaderConflictResolveWait = leaderConflictResolveWait;
+ this.leaderVoteWait = cloudConfig.getLeaderVoteWait();
+ this.leaderConflictResolveWait = cloudConfig.getLeaderConflictResolveWait();
- this.clientTimeout = zkClientTimeout;
+ this.clientTimeout = cloudConfig.getZkClientTimeout();
DefaultConnectionStrategy strat = new DefaultConnectionStrategy();
- String zkACLProviderClass = cc.getConfig().getZkACLProviderClass();
+ String zkACLProviderClass = cloudConfig.getZkACLProviderClass();
ZkACLProvider zkACLProvider = null;
if (zkACLProviderClass != null && zkACLProviderClass.trim().length() > 0) {
zkACLProvider = cc.getResourceLoader().newInstance(zkACLProviderClass, ZkACLProvider.class);
@@ -248,7 +245,7 @@ public final class ZkController {
zkACLProvider = new DefaultZkACLProvider();
}
- String zkCredentialsProviderClass = cc.getConfig().getZkCredentialsProviderClass();
+ String zkCredentialsProviderClass = cloudConfig.getZkCredentialsProviderClass();
if (zkCredentialsProviderClass != null && zkCredentialsProviderClass.trim().length() > 0) {
strat.setZkCredentialsToAddAutomatically(cc.getResourceLoader().newInstance(zkCredentialsProviderClass, ZkCredentialsProvider.class));
} else {
@@ -256,8 +253,7 @@ public final class ZkController {
}
addOnReconnectListener(getConfigDirListener());
- zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
- zkClientConnectTimeout, strat,
+ zkClient = new SolrZkClient(zkServerAddress, clientTimeout, zkClientConnectTimeout, strat,
// on reconnect, reload cloud info
new OnReconnect() {
@@ -362,7 +358,7 @@ public final class ZkController {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
- cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
+ cmdExecutor = new ZkCmdExecutor(clientTimeout);
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
@@ -374,7 +370,7 @@ public final class ZkController {
public int getLeaderVoteWait() {
return leaderVoteWait;
}
-
+
public int getLeaderConflictResolveWait() {
return leaderConflictResolveWait;
}
@@ -408,7 +404,7 @@ public final class ZkController {
}
}
}
-
+
for (CoreDescriptor descriptor : descriptors) {
// if it looks like we are going to be the leader, we don't
// want to wait for the following stuff
@@ -416,7 +412,7 @@ public final class ZkController {
String collection = cloudDesc.getCollectionName();
String slice = cloudDesc.getShardId();
try {
-
+
int children = zkStateReader
.getZkClient()
.getChildren(
@@ -439,7 +435,7 @@ public final class ZkController {
final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
try {
- log.debug("calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", new Object[] {coreZkNodeName, collection, slice});
+ log.debug("calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", new Object[]{coreZkNodeName, collection, slice});
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
SolrException.log(log, "", e);
@@ -455,7 +451,7 @@ public final class ZkController {
}
}
}
-
+
private void markAllAsNotLeader(
final CurrentCoreDescriptorProvider registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect
@@ -503,7 +499,7 @@ public final class ZkController {
}
}
}
-
+
}
/**
@@ -534,7 +530,7 @@ public final class ZkController {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"Config file contains no data:" + zkPath);
}
-
+
return bytes;
}
@@ -572,19 +568,19 @@ public final class ZkController {
}
host = hostaddress;
} else {
- if(URLUtil.hasScheme(host)) {
+ if (URLUtil.hasScheme(host)) {
host = URLUtil.removeScheme(host);
}
}
return host;
}
-
+
public String getHostName() {
return hostName;
}
-
- public String getHostPort() {
+
+ public int getHostPort() {
return localHostPort;
}
@@ -599,42 +595,55 @@ public final class ZkController {
return zkServerAddress;
}
+ /**
+ * Create the zknodes necessary for a cluster to operate
+ *
+ * @param zkClient a SolrZkClient
+ * @throws KeeperException if there is a Zookeeper error
+ * @throws InterruptedException on interrupt
+ */
+ public static void createClusterZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
+ cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
+ }
+
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
try {
boolean createdWatchesAndUpdated = false;
Stat stat = zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, null, true);
- if (stat!= null && stat.getNumChildren()>0) {
+ if (stat != null && stat.getNumChildren() > 0) {
zkStateReader.createClusterStateWatchersAndUpdate();
createdWatchesAndUpdated = true;
publishAndWaitForDownStates();
}
-
- // makes nodes zkNode
- cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
-
+
+ createClusterZkNodes(zkClient);
+
createEphemeralLiveNode();
- cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
ShardHandler shardHandler;
UpdateShardHandler updateShardHandler;
shardHandler = cc.getShardHandlerFactory().getShardHandler();
updateShardHandler = cc.getUpdateShardHandler();
-
+
if (!zkRunOnly) {
overseerElector = new LeaderElector(zkClient);
this.overseer = new Overseer(shardHandler, updateShardHandler,
- CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cc.getConfig());
+ CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
ElectionContext context = new OverseerElectionContext(zkClient,
overseer, getNodeName());
overseerElector.setup(context);
overseerElector.joinElection(context, false);
}
-
+
if (!createdWatchesAndUpdated) {
zkStateReader.createClusterStateWatchersAndUpdate();
}
-
+
} catch (IOException e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -655,7 +664,7 @@ public final class ZkController {
public void publishAndWaitForDownStates() throws KeeperException,
InterruptedException {
-
+
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> collections = clusterState.getCollections();
List<String> updatedNodes = new ArrayList<>();
@@ -667,7 +676,7 @@ public final class ZkController {
for (Replica replica : replicas) {
if (getNodeName().equals(replica.getNodeName())
&& !(replica.getStr(ZkStateReader.STATE_PROP)
- .equals(ZkStateReader.DOWN))) {
+ .equals(ZkStateReader.DOWN))) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
@@ -686,7 +695,7 @@ public final class ZkController {
}
}
}
-
+
// now wait till the updates are in our state
long now = System.nanoTime();
long timeout = now + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
@@ -703,12 +712,12 @@ public final class ZkController {
if (replica.getStr(ZkStateReader.STATE_PROP).equals(
ZkStateReader.DOWN)) {
updatedNodes.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
+
}
}
}
}
-
+
if (updatedNodes.size() == 0) {
foundStates = true;
Thread.sleep(1000);
@@ -719,16 +728,16 @@ public final class ZkController {
if (!foundStates) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
}
-
+
}
-
+
/**
* Validates if the chroot exists in zk (or if it is successfully created).
* Optionally, if create is set to true this method will create the path in
* case it doesn't exist
- *
+ *
* @return true if the path exists or is created false if the path doesn't
- * exist and 'create' = false
+ * exist and 'create' = false
*/
public static boolean checkChrootPath(String zkHost, boolean create)
throws KeeperException, InterruptedException {
@@ -761,7 +770,7 @@ public final class ZkController {
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
log.info("Register node as live in ZooKeeper:" + nodePath);
-
+
try {
boolean nodeDeleted = true;
try {
@@ -787,9 +796,9 @@ public final class ZkController {
if (e.code() != KeeperException.Code.NODEEXISTS) {
throw e;
}
- }
+ }
}
-
+
public String getNodeName() {
return nodeName;
}
@@ -805,114 +814,120 @@ public final class ZkController {
/**
* Register shard with ZooKeeper.
- *
+ *
* @return the shardId for the SolrCore
*/
- public String register(String coreName, final CoreDescriptor desc) throws Exception {
+ public String register(String coreName, final CoreDescriptor desc) throws Exception {
return register(coreName, desc, false, false);
}
-
+
/**
* Register shard with ZooKeeper.
- *
+ *
* @return the shardId for the SolrCore
*/
- public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {
+ public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {
// pre register has published our down state
-
final String baseUrl = getBaseUrl();
-
+
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
+ Map previousMDCContext = MDC.getCopyOfContextMap();
+ MDCUtils.setCollection(collection);
+
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
-
- String shardId = cloudDesc.getShardId();
- Map<String,Object> props = new HashMap<>();
- // we only put a subset of props into the leader node
+ String shardId = cloudDesc.getShardId();
+ MDCUtils.setShard(shardId);
+ Map<String, Object> props = new HashMap<>();
+ // we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
props.put(ZkStateReader.CORE_NAME_PROP, coreName);
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
if (log.isInfoEnabled()) {
- log.info("Register replica - core:" + coreName + " address:"
- + baseUrl + " collection:" + cloudDesc.getCollectionName() + " shard:" + shardId);
+ log.info("Register replica - core:" + coreName + " address:"
+ + baseUrl + " collection:" + cloudDesc.getCollectionName() + " shard:" + shardId);
}
ZkNodeProps leaderProps = new ZkNodeProps(props);
-
+
try {
- // If we're a preferred leader, insert ourselves at the head of the queue
- boolean joinAtHead = false;
- Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
- if (replica != null) {
- joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+ try {
+ // If we're a preferred leader, insert ourselves at the head of the queue
+ boolean joinAtHead = false;
+ Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
+ if (replica != null) {
+ joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+ }
+ joinElection(desc, afterExpiration, joinAtHead);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (KeeperException | IOException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- joinElection(desc, afterExpiration, joinAtHead);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (KeeperException | IOException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- // in this case, we want to wait for the leader as long as the leader might
- // wait for a vote, at least - but also long enough that a large cluster has
- // time to get its act together
- String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);
-
- String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
- log.info("We are " + ourUrl + " and leader is " + leaderUrl);
- boolean isLeader = leaderUrl.equals(ourUrl);
-
- try (SolrCore core = cc.getCore(desc.getName())) {
-
- // recover from local transaction log and wait for it to complete before
- // going active
- // TODO: should this be moved to another thread? To recoveryStrat?
- // TODO: should this actually be done earlier, before (or as part of)
- // leader election perhaps?
-
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (!core.isReloaded() && ulog != null) {
- // disable recovery in case shard is in construction state (for shard splits)
- Slice slice = getClusterState().getSlice(collection, shardId);
- if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) {
- Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
- .getUpdateLog().recoverFromLog();
- if (recoveryFuture != null) {
- log.info("Replaying tlog for "+ourUrl+" during startup... NOTE: This can take a while.");
- recoveryFuture.get(); // NOTE: this could potentially block for
- // minutes or more!
- // TODO: public as recovering in the mean time?
- // TODO: in the future we could do peersync in parallel with recoverFromLog
- } else {
- log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
+ // in this case, we want to wait for the leader as long as the leader might
+ // wait for a vote, at least - but also long enough that a large cluster has
+ // time to get its act together
+ String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);
+
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+ log.info("We are " + ourUrl + " and leader is " + leaderUrl);
+ boolean isLeader = leaderUrl.equals(ourUrl);
+
+ try (SolrCore core = cc.getCore(desc.getName())) {
+
+ // recover from local transaction log and wait for it to complete before
+ // going active
+ // TODO: should this be moved to another thread? To recoveryStrat?
+ // TODO: should this actually be done earlier, before (or as part of)
+ // leader election perhaps?
+
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (!core.isReloaded() && ulog != null) {
+ // disable recovery in case shard is in construction state (for shard splits)
+ Slice slice = getClusterState().getSlice(collection, shardId);
+ if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) {
+ Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+ .getUpdateLog().recoverFromLog();
+ if (recoveryFuture != null) {
+ log.info("Replaying tlog for " + ourUrl + " during startup... NOTE: This can take a while.");
+ recoveryFuture.get(); // NOTE: this could potentially block for
+ // minutes or more!
+ // TODO: public as recovering in the mean time?
+ // TODO: in the future we could do peersync in parallel with recoverFromLog
+ } else {
+ log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
+ }
+ }
+ boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+ collection, coreZkNodeName, shardId, leaderProps, core, cc);
+ if (!didRecovery) {
+ publish(desc, ZkStateReader.ACTIVE);
}
- }
- boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
- collection, coreZkNodeName, shardId, leaderProps, core, cc);
- if (!didRecovery) {
- publish(desc, ZkStateReader.ACTIVE);
}
}
+
+ // make sure we have an update cluster state right away
+ zkStateReader.updateClusterState(true);
+ return shardId;
+ } finally {
+ MDCUtils.cleanupMDC(previousMDCContext);
}
-
- // make sure we have an update cluster state right away
- zkStateReader.updateClusterState(true);
- return shardId;
}
// timeoutms is the timeout for the first call to get the leader - there is then
// a longer wait to make sure that leader matches our local state
private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {
-
+
String collection = cloudDesc.getCollectionName();
String shardId = cloudDesc.getShardId();
// rather than look in the cluster state file, we go straight to the zknodes
@@ -922,14 +937,14 @@ public final class ZkController {
try {
leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
.getCoreUrl();
-
+
// now wait until our currently cloud state contains the latest leader
String clusterStateLeaderUrl = zkStateReader.getLeaderUrl(collection,
shardId, timeoutms * 2); // since we found it in zk, we are willing to
- // wait a while to find it in state
+ // wait a while to find it in state
int tries = 0;
final long msInSec = 1000L;
- int maxTries = (int)Math.floor(leaderConflictResolveWait/msInSec);
+ int maxTries = (int) Math.floor(leaderConflictResolveWait / msInSec);
while (!leaderUrl.equals(clusterStateLeaderUrl)) {
if (tries > maxTries) {
throw new SolrException(ErrorCode.SERVER_ERROR,
@@ -940,7 +955,7 @@ public final class ZkController {
tries++;
if (tries % 30 == 0) {
String warnMsg = String.format(Locale.ENGLISH, "Still seeing conflicting information about the leader "
- + "of shard %s for collection %s after %d seconds; our state says %s, but ZooKeeper says %s",
+ + "of shard %s for collection %s after %d seconds; our state says %s, but ZooKeeper says %s",
cloudDesc.getShardId(), collection, tries, clusterStateLeaderUrl, leaderUrl);
log.warn(warnMsg);
}
@@ -950,30 +965,30 @@ public final class ZkController {
leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
.getCoreUrl();
}
-
+
} catch (Exception e) {
log.error("Error getting leader from zk", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error getting leader from zk for shard " + shardId, e);
- }
+ }
return leaderUrl;
}
-
+
/**
* Get leader props directly from zk nodes.
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice, int timeoutms) throws InterruptedException {
+ final String slice, int timeoutms) throws InterruptedException {
return getLeaderProps(collection, slice, timeoutms, false);
}
-
+
/**
* Get leader props directly from zk nodes.
- *
+ *
* @return leader props
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
+ final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
int iterCount = timeoutms / 1000;
Exception exp = null;
while (iterCount-- > 0) {
@@ -992,7 +1007,7 @@ public final class ZkController {
}
exp = e;
Thread.sleep(1000);
- } catch (Exception e) {
+ } catch (Exception e) {
exp = e;
Thread.sleep(1000);
}
@@ -1009,24 +1024,24 @@ public final class ZkController {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
-
+
ContextKey contextKey = new ContextKey(collection, coreNodeName);
-
+
ElectionContext prevContext = electionContexts.get(contextKey);
-
+
if (prevContext != null) {
prevContext.cancelElection();
}
-
+
String shardId = cd.getCloudDescriptor().getShardId();
-
- Map<String,Object> props = new HashMap<>();
+
+ Map<String, Object> props = new HashMap<>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-
-
+
+
ZkNodeProps ourProps = new ZkNodeProps(props);
@@ -1043,39 +1058,39 @@ public final class ZkController {
* Returns whether or not a recovery was started
*/
private boolean checkRecovery(String coreName, final CoreDescriptor desc,
- boolean recoverReloadedCores, final boolean isLeader,
- final CloudDescriptor cloudDesc, final String collection,
- final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
- SolrCore core, CoreContainer cc) {
+ boolean recoverReloadedCores, final boolean isLeader,
+ final CloudDescriptor cloudDesc, final String collection,
+ final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
+ SolrCore core, CoreContainer cc) {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
return false;
}
boolean doRecovery = true;
if (!isLeader) {
-
+
if (core.isReloaded() && !recoverReloadedCores) {
doRecovery = false;
}
-
+
if (doRecovery) {
log.info("Core needs to recover:" + core.getName());
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
-
+
// see if the leader told us to recover
String lirState = getLeaderInitiatedRecoveryState(collection, shardId,
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if (ZkStateReader.DOWN.equals(lirState)) {
- log.info("Leader marked core "+core.getName()+" down; starting recovery process");
+ log.info("Leader marked core " + core.getName() + " down; starting recovery process");
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
- return true;
+ return true;
}
} else {
log.info("I am the leader, no recovery necessary");
}
-
+
return false;
}
@@ -1087,11 +1102,11 @@ public final class ZkController {
public void publish(final CoreDescriptor cd, final String state) throws KeeperException, InterruptedException {
publish(cd, state, true);
}
-
+
public void publish(final CoreDescriptor cd, final String state, boolean updateLastState) throws KeeperException, InterruptedException {
publish(cd, state, updateLastState, false);
}
-
+
/**
* Publish core state to overseer.
*/
@@ -1104,81 +1119,92 @@ public final class ZkController {
}
}
String collection = cd.getCloudDescriptor().getCollectionName();
- log.info("publishing core={} state={} collection={}", cd.getName(), state, collection);
- //System.out.println(Thread.currentThread().getStackTrace()[3]);
- Integer numShards = cd.getCloudDescriptor().getNumShards();
- if (numShards == null) { //XXX sys prop hack
- log.info("numShards not found on descriptor - reading it from system property");
- numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
- }
-
- assert collection != null && collection.length() > 0;
-
- String shardId = cd.getCloudDescriptor().getShardId();
- String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
- // If the leader initiated recovery, then verify that this replica has performed
- // recovery as requested before becoming active; don't even look at lirState if going down
- if (!ZkStateReader.DOWN.equals(state)) {
- String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
- if (lirState != null) {
- if ("active".equals(state)) {
- // trying to become active, so leader-initiated state must be recovering
- if (ZkStateReader.RECOVERING.equals(lirState)) {
- updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE);
- } else if (ZkStateReader.DOWN.equals(lirState)) {
- throw new SolrException(ErrorCode.INVALID_STATE,
- "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
- }
- } else if (ZkStateReader.RECOVERING.equals(state)) {
- // if it is currently DOWN, then trying to enter into recovering state is good
- if (ZkStateReader.DOWN.equals(lirState)) {
- updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING);
+
+ Map previousMDCContext = MDC.getCopyOfContextMap();
+ MDCUtils.setCollection(collection);
+
+ try {
+ if (cd != null && cd.getName() != null)
+ MDCUtils.setCore(cd.getName());
+ log.info("publishing core={} state={} collection={}", cd.getName(), state, collection);
+ //System.out.println(Thread.currentThread().getStackTrace()[3]);
+ Integer numShards = cd.getCloudDescriptor().getNumShards();
+ if (numShards == null) { //XXX sys prop hack
+ log.info("numShards not found on descriptor - reading it from system property");
+ numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
+ }
+
+ assert collection != null && collection.length() > 0;
+
+ String shardId = cd.getCloudDescriptor().getShardId();
+ MDCUtils.setShard(shardId);
+ String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+ // If the leader initiated recovery, then verify that this replica has performed
+ // recovery as requested before becoming active; don't even look at lirState if going down
+ if (!ZkStateReader.DOWN.equals(state)) {
+ String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
+ if (lirState != null) {
+ if (ZkStateReader.ACTIVE.equals(state)) {
+ // trying to become active, so leader-initiated state must be recovering
+ if (ZkStateReader.RECOVERING.equals(lirState)) {
+ updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true);
+ } else if (ZkStateReader.DOWN.equals(lirState)) {
+ throw new SolrException(ErrorCode.INVALID_STATE,
+ "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
+ }
+ } else if (ZkStateReader.RECOVERING.equals(state)) {
+ // if it is currently DOWN, then trying to enter into recovering state is good
+ if (ZkStateReader.DOWN.equals(lirState)) {
+ updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true);
+ }
}
}
}
- }
-
- Map<String, Object> props = new HashMap<String, Object>();
- props.put(Overseer.QUEUE_OPERATION, "state");
- props.put(ZkStateReader.STATE_PROP, state);
- props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
- props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
- props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
- props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
- props.put(ZkStateReader.COLLECTION_PROP, collection);
- if (numShards != null) {
- props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
- }
- if (coreNodeName != null) {
- props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
- }
-
- if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) {
- try (SolrCore core = cc.getCore(cd.getName())) {
- if (core != null && core.getDirectoryFactory().isSharedStorage()) {
- props.put("dataDir", core.getDataDir());
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog != null) {
- props.put("ulogDir", ulog.getLogDir());
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(Overseer.QUEUE_OPERATION, "state");
+ props.put(ZkStateReader.STATE_PROP, state);
+ props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+ props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
+ props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
+ props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
+ props.put(ZkStateReader.COLLECTION_PROP, collection);
+ if (numShards != null) {
+ props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
+ }
+ if (coreNodeName != null) {
+ props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+ }
+
+ if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) {
+ try (SolrCore core = cc.getCore(cd.getName())) {
+ if (core != null && core.getDirectoryFactory().isSharedStorage()) {
+ props.put("dataDir", core.getDataDir());
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog != null) {
+ props.put("ulogDir", ulog.getLogDir());
+ }
}
}
}
+
+ ZkNodeProps m = new ZkNodeProps(props);
+
+ if (updateLastState) {
+ cd.getCloudDescriptor().lastPublished = state;
+ }
+ overseerJobQueue.offer(ZkStateReader.toJSON(m));
+ } finally {
+ MDCUtils.cleanupMDC(previousMDCContext);
}
-
- ZkNodeProps m = new ZkNodeProps(props);
-
- if (updateLastState) {
- cd.getCloudDescriptor().lastPublished = state;
- }
- overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
-
+
private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
- final ClusterState state, final String coreNodeName) {
+ final ClusterState state, final String coreNodeName) {
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
-
+
final String shardId = state.getShardId(getNodeName(), desc.getName());
if (shardId != null) {
@@ -1193,14 +1219,14 @@ public final class ZkController {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
assert collection != null;
-
+
if (collection == null || collection.trim().length() == 0) {
log.error("No collection was specified.");
return;
}
-
+
ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
-
+
if (context != null) {
context.cancelElection();
}
@@ -1209,7 +1235,7 @@ public final class ZkController {
boolean removeWatch = true;
// if there is no SolrCore which is a member of this collection, remove the watch
for (SolrCore solrCore : cc.getCores()) {
- if (((ZkSolrResourceLoader)solrCore.getResourceLoader()).getConfigSetZkPath().equals(configLocation))
+ if (((ZkSolrResourceLoader) solrCore.getResourceLoader()).getConfigSetZkPath().equals(configLocation))
configLocation = null; //if a core uses this config dir , then set it to null
@@ -1217,7 +1243,7 @@ public final class ZkController {
.getCloudDescriptor();
if (cloudDesc != null
&& cloudDescriptor.getCollectionName().equals(
- cloudDesc.getCollectionName())) {
+ cloudDesc.getCollectionName())) {
removeWatch = false;
break;
}
@@ -1230,14 +1256,14 @@ public final class ZkController {
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
overseerJobQueue.offer(ZkStateReader.toJSON(m));
- if(configLocation != null) {
+ if (configLocation != null) {
synchronized (confDirectoryListeners) {
- log.info("This conf directory is no more watched {0}",configLocation);
+ log.info("This conf directory is no more watched {0}", configLocation);
confDirectoryListeners.remove(configLocation);
}
}
}
-
+
public void createCollection(String collection) throws KeeperException,
InterruptedException {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
@@ -1253,20 +1279,20 @@ public final class ZkController {
public void createCollectionZkNode(CloudDescriptor cd) {
String collection = cd.getCollectionName();
-
+
log.info("Check for collection zkNode:" + collection);
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
-
+
try {
- if(!zkClient.exists(collectionPath, true)) {
+ if (!zkClient.exists(collectionPath, true)) {
log.info("Creating collection in ZooKeeper:" + collection);
- SolrParams params = cd.getParams();
+ SolrParams params = cd.getParams();
try {
- Map<String,Object> collectionProps = new HashMap<>();
+ Map<String, Object> collectionProps = new HashMap<>();
// TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
- String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, collection);
+ String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX + CONFIGNAME_PROP, collection);
// params passed in - currently only done via core admin (create core commmand).
if (params != null) {
@@ -1283,8 +1309,8 @@ public final class ZkController {
// TODO: getting the configName from the collectionPath should fail since we already know it doesn't exist?
getConfName(collection, collectionPath, collectionProps);
}
-
- } else if(System.getProperty("bootstrap_confdir") != null) {
+
+ } else if (System.getProperty("bootstrap_confdir") != null) {
// if we are bootstrapping a collection, default the config for
// a new collection to the collection we are bootstrapping
log.info("Setting config for collection:" + collection + " to " + defaultConfigName);
@@ -1292,17 +1318,17 @@ public final class ZkController {
Properties sysProps = System.getProperties();
for (String sprop : System.getProperties().stringPropertyNames()) {
if (sprop.startsWith(COLLECTION_PARAM_PREFIX)) {
- collectionProps.put(sprop.substring(COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));
+ collectionProps.put(sprop.substring(COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));
}
}
-
+
// if the config name wasn't passed in, use the default
if (!collectionProps.containsKey(CONFIGNAME_PROP))
- collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
+ collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
} else if (Boolean.getBoolean("bootstrap_conf")) {
// the conf name should should be the collection name of this core
- collectionProps.put(CONFIGNAME_PROP, cd.getCollectionName());
+ collectionProps.put(CONFIGNAME_PROP, cd.getCollectionName());
} else {
getConfName(collection, collectionPath, collectionProps);
}
@@ -1321,24 +1347,23 @@ public final class ZkController {
} else {
log.info("Collection zkNode exists");
}
-
+
} catch (KeeperException e) {
// it's okay if another beats us creating the node
if (e.code() == KeeperException.Code.NODEEXISTS) {
return;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
}
-
+
}
private void getConfName(String collection, String collectionPath,
- Map<String,Object> collectionProps) throws KeeperException,
+ Map<String, Object> collectionProps) throws KeeperException,
InterruptedException {
// check for configName
log.info("Looking for collection configName");
@@ -1352,7 +1377,7 @@ public final class ZkController {
break;
}
}
-
+
// if there is only one conf, use that
try {
configNames = zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null,
@@ -1363,16 +1388,16 @@ public final class ZkController {
if (configNames != null && configNames.size() == 1) {
// no config set named, but there is only 1 - use it
log.info("Only one config set found in zk - using it:" + configNames.get(0));
- collectionProps.put(CONFIGNAME_PROP, configNames.get(0));
+ collectionProps.put(CONFIGNAME_PROP, configNames.get(0));
break;
}
-
+
if (configNames != null && configNames.contains(collection)) {
log.info("Could not find explicit collection configName, but found config name matching collection name - using that set.");
- collectionProps.put(CONFIGNAME_PROP, collection);
+ collectionProps.put(CONFIGNAME_PROP, collection);
break;
}
-
+
log.info("Could not find collection configName - pausing for 3 seconds and trying again - try: " + retry);
Thread.sleep(3000);
}
@@ -1383,7 +1408,7 @@ public final class ZkController {
"Could not find configName for collection " + collection + " found:" + configNames);
}
}
-
+
public ZkStateReader getZkStateReader() {
return zkStateReader;
}
@@ -1404,17 +1429,17 @@ public final class ZkController {
int retryCount = 320;
log.info("look for our core node name");
while (retryCount-- > 0) {
- Map<String,Slice> slicesMap = zkStateReader.getClusterState()
+ Map<String, Slice> slicesMap = zkStateReader.getClusterState()
.getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
if (slicesMap != null) {
-
+
for (Slice slice : slicesMap.values()) {
for (Replica replica : slice.getReplicas()) {
// TODO: for really large clusters, we could 'index' on this
-
+
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
+
String msgNodeName = getNodeName();
String msgCore = descriptor.getName();
@@ -1449,23 +1474,23 @@ public final class ZkController {
Thread.currentThread().interrupt();
}
}
-
+
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not get shard id for core: " + cd.getName());
}
- public String getCoreNodeName(CoreDescriptor descriptor){
+ public String getCoreNodeName(CoreDescriptor descriptor) {
String coreNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
if (coreNodeName == null && !genericCoreNodeNames) {
// it's the default
return getNodeName() + "_" + descriptor.getName();
}
-
+
return coreNodeName;
}
- public void preRegister(CoreDescriptor cd ) {
+ public void preRegister(CoreDescriptor cd) {
String coreNodeName = getCoreNodeName(cd);
@@ -1483,8 +1508,8 @@ public final class ZkController {
publish(cd, ZkStateReader.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
- if(collection !=null && collection.getStateFormat()>1 ){
- log.info("Registering watch for external collection {}",cd.getCloudDescriptor().getCollectionName());
+ if (collection != null && collection.getStateFormat() > 1) {
+ log.info("Registering watch for external collection {}", cd.getCloudDescriptor().getCollectionName());
zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
}
} catch (KeeperException e) {
@@ -1495,7 +1520,7 @@ public final class ZkController {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
-
+
if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), coreNodeName)) {
doGetShardIdAndNodeNameProcess(cd);
} else {
@@ -1510,21 +1535,33 @@ public final class ZkController {
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
String coreNodeName = cloudDesc.getCoreNodeName();
assert coreNodeName != null;
- if (cloudDesc.getShardId() == null) throw new SolrException(ErrorCode.SERVER_ERROR ,"No shard id for :" + cd);
+ if (cloudDesc.getShardId() == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for :" + cd);
+ }
long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);
- String errMessage= null;
- for (; System.nanoTime()<endTime; ) {
- Thread.sleep(100);
- errMessage = null;
+ String errMessage = null;
+ while (System.nanoTime() < endTime) {
Slice slice = zkStateReader.getClusterState().getSlice(cd.getCollectionName(), cloudDesc.getShardId());
if (slice == null) {
errMessage = "Invalid slice : " + cloudDesc.getShardId();
continue;
}
- if (slice.getReplica(coreNodeName) != null) return;
+ if (slice.getReplica(coreNodeName) != null) {
+ Replica replica = slice.getReplica(coreNodeName);
+ String baseUrl = replica.getStr(BASE_URL_PROP);
+ String coreName = replica.getStr(CORE_NAME_PROP);
+ if (baseUrl.equals(this.baseURL) && coreName.equals(cd.getName())) {
+ return;
+ } else {
+ errMessage = "replica with coreNodeName " + coreNodeName + " exists but with a different name or base_url";
+ }
+ }
+ Thread.sleep(100);
}
- if(errMessage == null) errMessage = " no_such_replica in clusterstate ,replicaName : " + coreNodeName;
- throw new SolrException(ErrorCode.SERVER_ERROR,errMessage + "state : "+ zkStateReader.getClusterState().getCollection(cd.getCollectionName()));
+ if (errMessage == null) {
+ errMessage = "replica " + coreNodeName + " is not present in cluster state";
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, errMessage + ". state : " + zkStateReader.getClusterState().getCollection(cd.getCollectionName()));
}
}
@@ -1534,7 +1571,7 @@ public final class ZkController {
String collection = cloudDesc.getCollectionName();
String shard = cloudDesc.getShardId();
ZkCoreNodeProps leaderProps = null;
-
+
int retries = 6;
for (int i = 0; i < retries; i++) {
try {
@@ -1542,7 +1579,7 @@ public final class ZkController {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"We have been closed");
}
-
+
// go straight to zk, not the cloud state - we must have current info
leaderProps = getLeaderProps(collection, shard, 30000);
break;
@@ -1565,28 +1602,28 @@ public final class ZkController {
String myCoreNodeName = cloudDesc.getCoreNodeName();
String myCoreName = descriptor.getName();
String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), myCoreName);
-
+
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) {
-
+
// detect if this core is in leader-initiated recovery and if so,
// then we don't need the leader to wait on seeing the down state
String lirState = null;
try {
lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreNodeName);
} catch (Exception exc) {
- log.error("Failed to determine if replica "+myCoreNodeName+
- " is in leader-initiated recovery due to: "+exc, exc);
+ log.error("Failed to determine if replica " + myCoreNodeName +
+ " is in leader-initiated recovery due to: " + exc, exc);
}
-
+
if (lirState != null) {
- log.info("Replica "+myCoreNodeName+
+ log.info("Replica " + myCoreNodeName +
" is already in leader-initiated recovery, so not waiting for leader to see down state.");
} else {
-
- log.info("Replica "+myCoreNodeName+
+
+ log.info("Replica " + myCoreNodeName +
" NOT in leader-initiated recovery, need to wait for leader to see down state.");
-
+
try (HttpSolrClient client = new HttpSolrClient(leaderBaseUrl)) {
client.setConnectionTimeout(15000);
client.setSoTimeout(120000);
@@ -1595,7 +1632,7 @@ public final class ZkController {
prepCmd.setNodeName(getNodeName());
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.DOWN);
-
+
// let's retry a couple times - perhaps the leader just went down,
// or perhaps he is just not quite ready for us yet
retries = 6;
@@ -1620,8 +1657,8 @@ public final class ZkController {
// if there was a communication error talking to the leader, see if the leader is even alive
if (!zkStateReader.getClusterState().liveNodesContain(leaderProps.getNodeName())) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Node "+leaderProps.getNodeName()+" hosting leader for "+
- shard+" in "+collection+" is not live!");
+ "Node " + leaderProps.getNodeName() + " hosting leader for " +
+ shard + " in " + collection + " is not live!");
}
}
@@ -1645,7 +1682,7 @@ public final class ZkController {
}
return leaderProps;
}
-
+
public static void linkConfSet(SolrZkClient zkClient, String collection, String confSetName) throws KeeperException, InterruptedException {
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
if (log.isInfoEnabled()) {
@@ -1675,21 +1712,21 @@ public final class ZkController {
}
// we found existing data, let's update it
ZkNodeProps props = null;
- if(data != null) {
+ if (data != null) {
props = ZkNodeProps.load(data);
- Map<String,Object> newProps = new HashMap<>();
+ Map<String, Object> newProps = new HashMap<>();
newProps.putAll(props.getProperties());
newProps.put(CONFIGNAME_PROP, confSetName);
props = new ZkNodeProps(newProps);
} else {
props = new ZkNodeProps(CONFIGNAME_PROP, confSetName);
}
-
+
// TODO: we should consider using version
zkClient.setData(path, ZkStateReader.toJSON(props), true);
}
-
+
/**
* If in SolrCloud mode, upload config sets for each SolrCore in solr.xml.
*/
@@ -1699,7 +1736,7 @@ public final class ZkController {
//List<String> allCoreNames = cfg.getAllCoreNames();
List<CoreDescriptor> cds = cc.getCoresLocator().discover(cc);
-
+
log.info("bootstrapping config for " + cds.size() + " cores into ZooKeeper using solr.xml from " + solrHome);
for (CoreDescriptor cd : cds) {
@@ -1733,7 +1770,7 @@ public final class ZkController {
public DistributedMap getOverseerFailureMap() {
return overseerFailureMap;
}
-
+
public int getClientTimeout() {
return clientTimeout;
}
@@ -1745,12 +1782,12 @@ public final class ZkController {
public LeaderElector getOverseerElector() {
return overseerElector;
}
-
+
/**
* Returns the nodeName that should be used based on the specified properties.
*
- * @param hostName - must not be null or the empty string
- * @param hostPort - must consist only of digits, must not be null or the empty string
+ * @param hostName - must not be null or the empty string
+ * @param hostPort - must consist only of digits, must not be null or the empty string
* @param hostContext - should not begin or end with a slash (leading/trailin slashes will be ignored), must not be null, may be the empty string to denote the root context
* @lucene.experimental
* @see ZkStateReader#getBaseUrlForNodeName
@@ -1759,56 +1796,56 @@ public final class ZkController {
final String hostPort,
final String hostContext) {
try {
- return hostName + ':' + hostPort + '_' +
- URLEncoder.encode(trimLeadingAndTrailingSlashes(hostContext), "UTF-8");
+ return hostName + ':' + hostPort + '_' +
+ URLEncoder.encode(trimLeadingAndTrailingSlashes(hostContext), "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new Error("JVM Does not seem to support UTF-8", e);
}
}
-
+
/**
- * Utility method for trimming and leading and/or trailing slashes from
- * its input. May return the empty string. May return null if and only
+ * Utility method for trimming and leading and/or trailing slashes from
+ * its input. May return the empty string. May return null if and only
* if the input is null.
*/
public static String trimLeadingAndTrailingSlashes(final String in) {
if (null == in) return in;
-
+
String out = in;
if (out.startsWith("/")) {
out = out.substring(1);
}
if (out.endsWith("/")) {
- out = out.substring(0,out.length()-1);
+ out = out.substring(0, out.length() - 1);
}
return out;
}
public void rejoinOverseerElection(String electionNode, boolean joinAtHead) {
try {
- if(electionNode !=null){
+ if (electionNode != null) {
//this call is from inside the JVM . not from CoreAdminHandler
- if(overseerElector.getContext() == null || overseerElector.getContext().leaderSeqPath == null){
+ if (overseerElector.getContext() == null || overseerElector.getContext().leaderSeqPath == null) {
overseerElector.retryElection(new OverseerElectionContext(zkClient,
overseer, getNodeName()), joinAtHead);
return;
}
- if(!overseerElector.getContext().leaderSeqPath.endsWith(electionNode)){
- log.warn("Asked to rejoin with wrong election node : {}, current node is {}",electionNode, overseerElector.getContext().leaderSeqPath);
+ if (!overseerElector.getContext().leaderSeqPath.endsWith(electionNode)) {
+ log.warn("Asked to rejoin with wrong election node : {}, current node is {}", electionNode, overseerElector.getContext().leaderSeqPath);
//however delete it . This is possible when the last attempt at deleting the election node failed.
- if(electionNode.startsWith(getNodeName())){
+ if (electionNode.startsWith(getNodeName())) {
try {
- zkClient.delete(OverseerElectionContext.PATH+LeaderElector.ELECTION_NODE+"/"+electionNode,-1,true);
+ zkClient.delete(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE + "/" + electionNode, -1, true);
} catch (NoNodeException e) {
//no problem
- } catch (InterruptedException e){
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
- } catch(Exception e) {
- log.warn("Old election node exists , could not be removed ",e);
+ } catch (Exception e) {
+ log.warn("Old election node exists , could not be removed ", e);
}
}
}
- }else {
+ } else {
overseerElector.retryElection(overseerElector.getContext(), joinAtHead);
}
} catch (Exception e) {
@@ -1845,50 +1882,52 @@ public final class ZkController {
public void checkOverseerDesignate() {
try {
byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);
- if(data ==null) return;
+ if (data == null) return;
Map roles = (Map) ZkStateReader.fromJSON(data);
- if(roles ==null) return;
- List nodeList= (List) roles.get("overseer");
- if(nodeList == null) return;
- if(nodeList.contains(getNodeName())){
+ if (roles == null) return;
+ List nodeList = (List) roles.get("overseer");
+ if (nodeList == null) return;
+ if (nodeList.contains(getNodeName())) {
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDROLE.toString().toLowerCase(Locale.ROOT),
"node", getNodeName(),
"role", "overseer");
- log.info("Going to add role {} ",props);
+ log.info("Going to add role {} ", props);
getOverseerCollectionQueue().offer(ZkStateReader.toJSON(props));
}
- } catch (NoNodeException nne){
+ } catch (NoNodeException nne) {
return;
} catch (Exception e) {
- log.warn("could not readd the overseer designate ",e);
+ log.warn("could not readd the overseer designate ", e);
}
}
- CoreContainer getCoreContainer(){
+ CoreContainer getCoreContainer() {
return cc;
}
-
+
/**
* When a leader receives a communication error when trying to send a request to a replica,
* it calls this method to ensure the replica enters recovery when connectivity is restored.
- *
+ * <p>
* returns true if the node hosting the replica is still considered "live" by ZooKeeper;
* false means the node is not live either, so no point in trying to send recovery commands
* to it.
*/
- public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection,
- final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState)
- throws KeeperException, InterruptedException
- {
+ public boolean ensureReplicaInLeaderInitiatedRecovery(
+ final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
+ String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ final String replicaUrl = replicaCoreProps.getCoreUrl();
+
if (collection == null)
- throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
+ throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
if (shardId == null)
- throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
-
+ throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
+
if (replicaUrl == null)
throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery");
-
+
// First, determine if this replica is already in recovery handling
// which is needed because there can be many concurrent errors flooding in
// about the same replica having trouble and we only need to send the "needs"
@@ -1896,10 +1935,10 @@ public final class ZkController {
boolean nodeIsLive = true;
boolean publishDownState = false;
String replicaNodeName = replicaCoreProps.getNodeName();
- String replicaCoreNodeName = ((Replica)replicaCoreProps.getNodeProps()).getName();
- assert replicaCoreNodeName != null : "No core name for replica "+replicaNodeName;
+ String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
+ assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
synchronized (replicasInLeaderInitiatedRecovery) {
- if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
+ if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
if (!forcePublishState) {
log.debug("Replica {} already in leader-initiated recovery handling.", replicaUrl);
return false; // already in this recovery process
@@ -1910,12 +1949,12 @@ public final class ZkController {
// we only really need to try to send the recovery command if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
- updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN);
+ updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName, retryOnConnLoss);
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
- log.info("Put replica core={} coreNodeName={} on "+
- replicaNodeName+" into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
- publishDownState = true;
+ log.info("Put replica core={} coreNodeName={} on " +
+ replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
+ publishDownState = true;
} else {
nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
log.info("Node " + replicaNodeName +
@@ -1923,23 +1962,23 @@ public final class ZkController {
replicaCoreProps.getCoreName(), replicaCoreNodeName);
// publishDownState will be false to avoid publishing the "down" state too many times
// as many errors can occur together and will each call into this method (SOLR-6189)
- }
- }
-
+ }
+ }
+
if (publishDownState || forcePublishState) {
- String replicaCoreName = replicaCoreProps.getCoreName();
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
- ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
- ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
+ String replicaCoreName = replicaCoreProps.getCoreName();
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
+ ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection);
- log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? "+forcePublishState,
+ log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? " + forcePublishState,
replicaCoreName, replicaCoreNodeName, ZkStateReader.DOWN, replicaUrl);
- overseerJobQueue.offer(ZkStateReader.toJSON(m));
+ overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
-
+
return nodeIsLive;
}
@@ -1950,23 +1989,23 @@ public final class ZkController {
}
return exists;
}
-
+
public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
- synchronized(replicasInLeaderInitiatedRecovery) {
- replicasInLeaderInitiatedRecovery.remove(replicaUrl);
+ synchronized (replicasInLeaderInitiatedRecovery) {
+ replicasInLeaderInitiatedRecovery.remove(replicaUrl);
}
- }
-
+ }
+
public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
- Map<String,Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
- return (stateObj != null) ? (String)stateObj.get("state") : null;
+ Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+ return (stateObj != null) ? (String) stateObj.get("state") : null;
}
- public Map<String,Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
+ public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
if (collection == null || shardId == null || coreNodeName == null)
return null; // if we don't have complete data about a core in cloud mode, return null
-
+
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
byte[] stateData = null;
try {
@@ -1976,26 +2015,26 @@ public final class ZkController {
} catch (ConnectionLossException | SessionExpiredException cle) {
// sort of safe to ignore ??? Usually these are seen when the core is going down
// or there are bigger issues to deal with than reading this znode
- log.warn("Unable to read "+znodePath+" due to: "+cle);
+ log.warn("Unable to read " + znodePath + " due to: " + cle);
} catch (Exception exc) {
- log.error("Failed to read data from znode "+znodePath+" due to: "+exc);
+ log.error("Failed to read data from znode " + znodePath + " due to: " + exc);
if (exc instanceof SolrException) {
- throw (SolrException)exc;
+ throw (SolrException) exc;
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Failed to read data from znodePath: "+znodePath, exc);
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Failed to read data from znodePath: " + znodePath, exc);
}
}
- Map<String,Object> stateObj = null;
+ Map<String, Object> stateObj = null;
if (stateData != null && stateData.length > 0) {
// TODO: Remove later ... this is for upgrading from 4.8.x to 4.10.3 (see: SOLR-6732)
- if (stateData[0] == (byte)'{') {
+ if (stateData[0] == (byte) '{') {
Object parsedJson = ZkStateReader.fromJSON(stateData);
if (parsedJson instanceof Map) {
- stateObj = (Map<String,Object>)parsedJson;
+ stateObj = (Map<String, Object>) parsedJson;
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Leader-initiated recovery state data is invalid! "+parsedJson);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Leader-initiated recovery state data is invalid! " + parsedJson);
}
} else {
// old format still in ZK
@@ -2005,27 +2044,28 @@ public final class ZkController {
return stateObj;
}
-
- private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
+
+ private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
+ String leaderCoreNodeName, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
- log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
- "; shardId="+shardId+"; coreNodeName="+coreNodeName);
+ log.warn("Cannot set leader-initiated recovery state znode to " + state + " using: collection=" + collection +
+ "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
return; // if we don't have complete data about a core in cloud mode, do nothing
}
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
-
+
if (ZkStateReader.ACTIVE.equals(state)) {
// since we're marking it active, we don't need this znode anymore, so delete instead of update
try {
zkClient.delete(znodePath, -1, false);
} catch (Exception justLogIt) {
- log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
+ log.warn("Failed to delete znode " + znodePath, justLogIt);
}
return;
}
- Map<String,Object> stateObj = null;
+ Map<String, Object> stateObj = null;
try {
stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
} catch (Exception exc) {
@@ -2040,44 +2080,82 @@ public final class ZkController {
stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
byte[] znodeData = ZkStateReader.toJSON(stateObj);
- boolean retryOnConnLoss = true; // be a little more robust when trying to write data
+
try {
- if (zkClient.exists(znodePath, retryOnConnLoss)) {
- zkClient.setData(znodePath, znodeData, retryOnConnLoss);
+ if (ZkStateReader.DOWN.equals(state)) {
+ markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
} else {
- zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
+ if (zkClient.exists(znodePath, true)) {
+ zkClient.setData(znodePath, znodeData, true);
+ } else {
+ zkClient.makePath(znodePath, znodeData, true);
+ }
}
- log.info("Wrote "+state+" to "+znodePath);
+ log.info("Wrote " + state + " to " + znodePath);
} catch (Exception exc) {
if (exc instanceof SolrException) {
- throw (SolrException)exc;
+ throw (SolrException) exc;
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Failed to update data to "+state+" for znode: "+znodePath, exc);
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Failed to update data to " + state + " for znode: " + znodePath, exc);
}
}
}
-
+
+ /**
+ * we use ZK's multi-transactional semantics to ensure that we are able to
+ * publish a replica as 'down' only if our leader election node still exists
+ * in ZK. This ensures that a long running network partition caused by GC etc
+ * doesn't let us mark a node as down *after* we've already lost our session
+ */
+ private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
+ String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
+ String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
+ if (leaderSeqPath == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Failed to update data to 'down' for znode: " + znodePath +
+ " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
+ }
+ if (zkClient.exists(znodePath, true)) {
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+ ops.add(Op.setData(znodePath, znodeData, -1));
+ zkClient.multi(ops, true);
+ } else {
+ String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
+ try {
+ zkClient.makePath(parentZNodePath, true);
+ } catch (KeeperException.NodeExistsException nee) {
+ // if it exists, that's great!
+ }
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+ ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
+ CreateMode.PERSISTENT));
+ zkClient.multi(ops, true);
+ }
+ }
+
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
- return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
- }
-
+ return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
+ }
+
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
- return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreNodeName;
+ return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
}
-
+
public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
- ClusterState clusterState = getZkStateReader().getClusterState();
- if (clusterState != null) {
- DocCollection collection = clusterState.getCollectionOrNull(desc
- .getCloudDescriptor().getCollectionName());
- if (collection != null) {
- boolean autoAddReplicas = ClusterStateUtil.isAutoAddReplicas( getZkStateReader(), collection.getName());
- if (autoAddReplicas) {
- CloudUtil.checkSharedFSFailoverReplaced(cc, desc);
- }
+ ClusterState clusterState = getZkStateReader().getClusterState();
+ if (clusterState != null) {
+ DocCollection collection = clusterState.getCollectionOrNull(desc
+ .getCloudDescriptor().getCollectionName());
+ if (collection != null) {
+ boolean autoAddReplicas = ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection.getName());
+ if (autoAddReplicas) {
+ CloudUtil.checkSharedFSFailoverReplaced(cc, desc);
}
}
+ }
}
/**
@@ -2122,8 +2200,8 @@ public final class ZkController {
log.warn("could not get stat");
}
- log.info(MessageFormat.format(errMsg, resourceLocation, znodeVersion));
- throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg, resourceLocation, znodeVersion) + ", retry.");
+ log.info(StrUtils.formatString(errMsg, resourceLocation, znodeVersion));
+ throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry.");
}
}
}
@@ -2137,8 +2215,8 @@ public final class ZkController {
log.error(e.getMessage());
}
- log.info(MessageFormat.format(errMsg + " zkVersion= " + v, resourceLocation, znodeVersion));
- throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg, resourceLocation, znodeVersion) + ", retry.");
+ log.info(StrUtils.formatString(errMsg + " zkVersion= " + v, resourceLocation, znodeVersion));
+ throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry.");
} catch (ResourceModifiedInZkException e) {
throw e;
} catch (Exception e) {
@@ -2167,18 +2245,18 @@ public final class ZkController {
}
}
- public static class ResourceModifiedInZkException extends SolrException {
+ public static class ResourceModifiedInZkException extends SolrException {
public ResourceModifiedInZkException(ErrorCode code, String msg) {
super(code, msg);
}
}
public void unRegisterConfListener(Runnable listener) {
- if(listener == null) return;
- synchronized (confDirectoryListeners){
+ if (listener == null) return;
+ synchronized (confDirectoryListeners) {
for (Set<Runnable> listeners : confDirectoryListeners.values()) {
- if(listeners != null) {
- if(listeners.remove(listener)) {
+ if (listeners != null) {
+ if (listeners.remove(listener)) {
log.info(" a listener was removed because of core close");
}
}
@@ -2187,15 +2265,16 @@ public final class ZkController {
}
- /**This will give a callback to the listener whenever a child is modified in the
+ /**
+ * This will give a callback to the listener whenever a child is modified in the
* conf directory. It is the responsibility of the listener to check if the individual
* item of interest has been modified. When the last core which was interested in
* this conf directory is gone the listeners will be removed automatically.
*/
- public void registerConfListenerForCore(String confDir,SolrCore core, final Runnable listener){
- if(listener==null) throw new NullPointerException("listener cannot be null");
- synchronized (confDirectoryListeners){
- if(confDirectoryListeners.containsKey(confDir)){
+ public void registerConfListenerForCore(String confDir, SolrCore core, final Runnable listener) {
+ if (listener == null) throw new NullPointerException("listener cannot be null");
+ synchronized (confDirectoryListeners) {
+ if (confDirectoryListeners.containsKey(confDir)) {
confDirectoryListeners.get(confDir).add(listener);
core.addCloseHook(new CloseHook() {
@Override
@@ -2204,76 +2283,95 @@ public final class ZkController {
}
@Override
- public void postClose(SolrCore core) { }
+ public void postClose(SolrCore core) {
+ }
});
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR,"This conf directory is not valid");
+ throw new SolrException(ErrorCode.SERVER_ERROR, "This conf directory is not valid");
}
}
}
- private final Map<String , Set<Runnable>> confDirectoryListeners = new HashMap<>();
+ private final Map<String, Set<Runnable>> confDirectoryListeners = new HashMap<>();
void watchZKConfDir(final String zkDir) {
- log.info("watch zkdir " + zkDir);
+ log.info("watch zkdir {}" , zkDir);
if (!confDirectoryListeners.containsKey(zkDir)) {
- confDirectoryListeners.put(zkDir, new HashSet<Runnable>());
- setConfWatcher(zkDir, new WatcherImpl(zkDir));
-
+ confDirectoryListeners.put(zkDir, new HashSet<>());
+ setConfWatcher(zkDir, new WatcherImpl(zkDir), null);
}
-
-
}
- private class WatcherImpl implements Watcher{
- private final String zkDir ;
+
+ private class WatcherImpl implements Watcher {
+ private final String zkDir;
private WatcherImpl(String dir) {
this.zkDir = dir;
}
@Override
- public void process(WatchedEvent event) {
- try {
+ public void process(WatchedEvent event) {
+ Stat stat = null;
+ try {
+ stat = zkClient.exists(zkDir, null, true);
+ } catch (KeeperException e) {
+ //ignore , it is not a big deal
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
- synchronized (confDirectoryListeners) {
- // if this is not among directories to be watched then don't set the watcher anymore
- if( !confDirectoryListeners.containsKey(zkDir)) {
- log.info("Watcher on {} is removed ", zkDir);
- return;
- }
- Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
- if (listeners != null && !listeners.isEmpty()) {
- final Set<Runnable> listenersCopy = new HashSet<>(listeners);
- new Thread() {
- //run these in a separate thread because this can be long running
- public void run() {
- for (final Runnable listener : listenersCopy) {
- try {
- listener.run();
- } catch (Exception e) {
- log.warn("listener throws error", e);
- }
- }
- }
- }.start();
- }
+ boolean resetWatcher = false;
+ try {
+ resetWatcher = fireEventListeners(zkDir);
+ } finally {
+ if (Event.EventType.None.equals(event.getType())) {
+ log.info("A node got unwatched for {}", zkDir);
+ } else {
+ if (resetWatcher) setConfWatcher(zkDir, this, stat);
+ else log.info("A node got unwatched for {}", zkDir);
+ }
+ }
+ }
- }
+ }
- } finally {
- if (Event.EventType.None.equals(event.getType())) {
- log.info("A node got unwatched for {}", zkDir);
- return;
- } else {
- setConfWatcher(zkDir,this);
+ private boolean fireEventListeners(String zkDir) {
+ synchronized (confDirectoryListeners) {
+ // if this is not among directories to be watched then don't set the watcher anymore
+ if (!confDirectoryListeners.containsKey(zkDir)) {
+ log.info("Watcher on {} is removed ", zkDir);
+ return false;
+ }
+ Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
+ if (listeners != null && !listeners.isEmpty()) {
+ final Set<Runnable> listenersCopy = new HashSet<>(listeners);
+ new Thread() {
+ //run these in a separate thread because this can be long running
+ public void run() {
+ log.info("Running listeners for {}", zkDir);
+ for (final Runnable listener : listenersCopy) {
+ try {
+ listener.run();
+ } catch (Exception e) {
+ log.warn("listener throws error", e);
+ }
+ }
}
- }
+ }.start();
}
+
}
+ return true;
+ }
- private void setConfWatcher(String zkDir, Watcher watcher) {
+ private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) {
try {
- zkClient.exists(zkDir, watcher, true);
+ Stat newStat = zkClient.exists(zkDir, watcher, true);
+ if (stat != null && newStat.getVersion() > stat.getVersion()) {
+ //a race condition where a we missed an even fired
+ //so fire the event listeners
+ fireEventListeners(zkDir);
+ }
} catch (KeeperException e) {
log.error("failed to set watcher for conf dir {} ", zkDir);
} catch (InterruptedException e) {
@@ -2286,13 +2384,19 @@ public final class ZkController {
return new OnReconnect() {
@Override
public void command() {
- synchronized (confDirectoryListeners){
+ synchronized (confDirectoryListeners) {
for (String s : confDirectoryListeners.keySet()) {
watchZKConfDir(s);
+ fireEventListeners(s);
}
}
}
};
}
+ public String getLeaderSeqPath(String collection, String coreNodeName) {
+ ContextKey key = new ContextKey(collection, coreNodeName);
+ ElectionContext context = electionContexts.get(key);
+ return context != null ? context.leaderSeqPath : null;
+ }
}