You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:06:09 UTC
[48/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support
for Solr
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
deleted file mode 100644
index d4f84f9..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ /dev/null
@@ -1,764 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.RetryUtil;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.PeerSync;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.util.RefCounted;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.OpResult.SetDataResult;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CommonParams.ID;
-
-public abstract class ElectionContext implements Closeable {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- final String electionPath;
- final ZkNodeProps leaderProps;
- final String id;
- final String leaderPath;
- volatile String leaderSeqPath;
- private SolrZkClient zkClient;
-
- public ElectionContext(final String coreNodeName,
- final String electionPath, final String leaderPath, final ZkNodeProps leaderProps, final SolrZkClient zkClient) {
- this.id = coreNodeName;
- this.electionPath = electionPath;
- this.leaderPath = leaderPath;
- this.leaderProps = leaderProps;
- this.zkClient = zkClient;
- }
-
- public void close() {
-
- }
-
- public void cancelElection() throws InterruptedException, KeeperException {
- if (leaderSeqPath != null) {
- try {
- log.debug("Canceling election {}", leaderSeqPath);
- zkClient.delete(leaderSeqPath, -1, true);
- } catch (NoNodeException e) {
- // fine
- log.debug("cancelElection did not find election node to remove {}", leaderSeqPath);
- }
- } else {
- log.debug("cancelElection skipped as this context has not been initialized");
- }
- }
-
- abstract void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException, InterruptedException, IOException;
-
- public void checkIfIamLeaderFired() {}
-
- public void joinedElectionFired() {}
-
- public ElectionContext copy(){
- throw new UnsupportedOperationException("copy");
- }
-}
-
-class ShardLeaderElectionContextBase extends ElectionContext {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected final SolrZkClient zkClient;
- protected String shardId;
- protected String collection;
- protected LeaderElector leaderElector;
- protected ZkStateReader zkStateReader;
- private Integer leaderZkNodeParentVersion;
-
- // Prevents a race between cancelling and becoming leader.
- private final Object lock = new Object();
-
- public ShardLeaderElectionContextBase(LeaderElector leaderElector,
- final String shardId, final String collection, final String coreNodeName,
- ZkNodeProps props, ZkStateReader zkStateReader) {
- super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
- + "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath(
- collection, shardId), props, zkStateReader.getZkClient());
- this.leaderElector = leaderElector;
- this.zkClient = zkStateReader.getZkClient();
- this.zkStateReader = zkStateReader;
- this.shardId = shardId;
- this.collection = collection;
- }
-
- @Override
- public void cancelElection() throws InterruptedException, KeeperException {
- super.cancelElection();
- synchronized (lock) {
- if (leaderZkNodeParentVersion != null) {
- try {
- // We need to be careful and make sure we *only* delete our own leader registration node.
- // We do this by using a multi and ensuring the parent znode of the leader registration node
- // matches the version we expect - there is a setData call that increments the parent's znode
- // version whenever a leader registers.
- log.debug("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
- List<Op> ops = new ArrayList<>(2);
- ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
- ops.add(Op.delete(leaderPath, -1));
- zkClient.multi(ops, true);
- } catch (KeeperException.NoNodeException nne) {
- // no problem
- log.debug("No leader registration node found to remove: {}", leaderPath);
- } catch (KeeperException.BadVersionException bve) {
- log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath);
- // no problem
- } catch (InterruptedException e) {
- throw e;
- } catch (Exception e) {
- SolrException.log(log, e);
- }
- leaderZkNodeParentVersion = null;
- } else {
- log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
- }
- }
- }
-
- @Override
- void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
- throws KeeperException, InterruptedException, IOException {
- // register as leader - if an ephemeral is already there, wait to see if it goes away
-
- if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
- log.info("Will not register as leader because collection appears to be gone.");
- return;
- }
-
- String parent = new Path(leaderPath).getParent().toString();
- ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
- // only if /collections/{collection} exists already do we succeed in creating this path
- zcmd.ensureExists(parent, (byte[])null, CreateMode.PERSISTENT, zkClient, 2);
-
- try {
- RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, () -> {
- synchronized (lock) {
- log.debug("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath);
- List<Op> ops = new ArrayList<>(2);
-
- // We use a multi operation to get the parent nodes version, which will
- // be used to make sure we only remove our own leader registration node.
- // The setData call used to get the parent version is also the trigger to
- // increment the version. We also do a sanity check that our leaderSeqPath exists.
-
- ops.add(Op.check(leaderSeqPath, -1));
- ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
- ops.add(Op.setData(parent, null, -1));
- List<OpResult> results;
-
- results = zkClient.multi(ops, true);
- for (OpResult result : results) {
- if (result.getType() == ZooDefs.OpCode.setData) {
- SetDataResult dresult = (SetDataResult) result;
- Stat stat = dresult.getStat();
- leaderZkNodeParentVersion = stat.getVersion();
- return;
- }
- }
- assert leaderZkNodeParentVersion != null;
- }
- });
- } catch (Throwable t) {
- if (t instanceof OutOfMemoryError) {
- throw (OutOfMemoryError) t;
- }
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
- }
-
- assert shardId != null;
- boolean isAlreadyLeader = false;
- if (zkStateReader.getClusterState() != null &&
- zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) {
- Replica leader = zkStateReader.getLeader(collection, shardId);
- if (leader != null
- && leader.getBaseUrl().equals(leaderProps.get(ZkStateReader.BASE_URL_PROP))
- && leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) {
- isAlreadyLeader = true;
- }
- }
- if (!isAlreadyLeader) {
- ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
- ZkStateReader.SHARD_ID_PROP, shardId,
- ZkStateReader.COLLECTION_PROP, collection,
- ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
- ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
- ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
- Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
- }
- }
-
- public LeaderElector getLeaderElector() {
- return leaderElector;
- }
-
- Integer getLeaderZkNodeParentVersion() {
- synchronized (lock) {
- return leaderZkNodeParentVersion;
- }
- }
-}
-
-// add core container and stop passing core around...
-final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final ZkController zkController;
- private final CoreContainer cc;
- private final SyncStrategy syncStrategy;
-
- private volatile boolean isClosed = false;
-
- public ShardLeaderElectionContext(LeaderElector leaderElector,
- final String shardId, final String collection,
- final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
- super(leaderElector, shardId, collection, coreNodeName, props,
- zkController.getZkStateReader());
- this.zkController = zkController;
- this.cc = cc;
- syncStrategy = new SyncStrategy(cc);
- }
-
- @Override
- public void close() {
- super.close();
- this.isClosed = true;
- syncStrategy.close();
- }
-
- @Override
- public void cancelElection() throws InterruptedException, KeeperException {
- String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
- try (SolrCore core = cc.getCore(coreName)) {
- if (core != null) {
- core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
- }
- }
-
- super.cancelElection();
- }
-
- @Override
- public ElectionContext copy() {
- return new ShardLeaderElectionContext(leaderElector, shardId, collection, id, leaderProps, zkController, cc);
- }
-
- /*
- * weAreReplacement: has someone else been the leader already?
- */
- @Override
- void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
- InterruptedException, IOException {
- String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
- ActionThrottle lt;
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null ) {
- if (cc.isShutDown()) {
- return;
- } else {
- throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getLoadedCoreNames());
- }
- }
- MDCLoggingContext.setCore(core);
- lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
- }
-
- try {
- lt.minimumWaitBetweenActions();
- lt.markAttemptingAction();
-
-
- int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
-
- log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
- if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
- // Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
- ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
- Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
- }
-
- boolean allReplicasInLine = false;
- if (!weAreReplacement) {
- allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
- } else {
- allReplicasInLine = areAllReplicasParticipating();
- }
-
- if (isClosed) {
- // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later,
- // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will
- // re-register the cores and handle a new leadership election.
- return;
- }
-
- Replica.Type replicaType;
- String coreNodeName;
- boolean setTermToMax = false;
- try (SolrCore core = cc.getCore(coreName)) {
-
- if (core == null) {
- if (!zkController.getCoreContainer().isShutDown()) {
- cancelElection();
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "SolrCore not found:" + coreName + " in " + cc.getLoadedCoreNames());
- } else {
- return;
- }
- }
-
- replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
- coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- // should I be leader?
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
- if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
- if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) {
- rejoinLeaderElection(core);
- return;
- } else {
- // only log an error if this replica win the election
- setTermToMax = true;
- }
- }
-
- if (isClosed) {
- return;
- }
-
- log.info("I may be the new leader - try and sync");
-
- // we are going to attempt to be the leader
- // first cancel any current recovery
- core.getUpdateHandler().getSolrCoreState().cancelRecovery();
-
- if (weAreReplacement) {
- // wait a moment for any floating updates to finish
- try {
- Thread.sleep(2500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
- }
- }
-
- PeerSync.PeerSyncResult result = null;
- boolean success = false;
- try {
- result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
- success = result.isSuccess();
- } catch (Exception e) {
- SolrException.log(log, "Exception while trying to sync", e);
- result = PeerSync.PeerSyncResult.failure();
- }
-
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-
- if (!success) {
- boolean hasRecentUpdates = false;
- if (ulog != null) {
- // TODO: we could optimize this if necessary
- try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
- }
- }
-
- if (!hasRecentUpdates) {
- // we failed sync, but we have no versions - we can't sync in that case
- // - we were active
- // before, so become leader anyway if no one else has any versions either
- if (result.getOtherHasVersions().orElse(false)) {
- log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
- success = false;
- } else {
- log.info(
- "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
- success = true;
- }
- }
- }
-
- // solrcloud_debug
- if (log.isDebugEnabled()) {
- try {
- RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
- SolrIndexSearcher searcher = searchHolder.get();
- try {
- log.debug(core.getCoreContainer().getZkController().getNodeName() + " synched "
- + searcher.count(new MatchAllDocsQuery()));
- } finally {
- searchHolder.decref();
- }
- } catch (Exception e) {
- log.error("Error in solrcloud_debug block", e);
- }
- }
- if (!success) {
- rejoinLeaderElection(core);
- return;
- }
-
- }
-
- boolean isLeader = true;
- if (!isClosed) {
- try {
- if (replicaType == Replica.Type.TLOG) {
- // stop replicate from old leader
- zkController.stopReplicationFromLeader(coreName);
- if (weAreReplacement) {
- try (SolrCore core = cc.getCore(coreName)) {
- Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
- if (future != null) {
- log.info("Replaying tlog before become new leader");
- future.get();
- } else {
- log.info("New leader does not have old tlog to replay");
- }
- }
- }
- }
- // in case of leaderVoteWait timeout, a replica with lower term can win the election
- if (setTermToMax) {
- log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) " +
- "without being up-to-date with the previous leader", coreNodeName);
- zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
- }
- super.runLeaderProcess(weAreReplacement, 0);
- try (SolrCore core = cc.getCore(coreName)) {
- if (core != null) {
- core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
- publishActiveIfRegisteredAndNotActive(core);
- } else {
- return;
- }
- }
- log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
-
- // we made it as leader - send any recovery requests we need to
- syncStrategy.requestRecoveries();
-
- } catch (Exception e) {
- isLeader = false;
- SolrException.log(log, "There was a problem trying to register as the leader", e);
-
- try (SolrCore core = cc.getCore(coreName)) {
-
- if (core == null) {
- log.debug("SolrCore not found:" + coreName + " in " + cc.getLoadedCoreNames());
- return;
- }
-
- core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
-
- // we could not publish ourselves as leader - try and rejoin election
- rejoinLeaderElection(core);
- }
- }
- } else {
- cancelElection();
- }
- } finally {
- MDCLoggingContext.clear();
- }
- }
-
- /**
- * Wait for other replicas with higher terms participate in the electioon
- * @return true if after {@code timeout} there are no other replicas with higher term participate in the election,
- * false if otherwise
- */
- private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException {
- long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
- while (!isClosed && !cc.isShutDown()) {
- if (System.nanoTime() > timeoutAt) {
- return true;
- }
- if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) {
- log.info("Can't become leader, other replicas with higher term participated in leader election");
- return false;
- }
- Thread.sleep(500L);
- }
- return false;
- }
-
- /**
- * Do other replicas with higher term participated in the election
- * @return true if other replicas with higher term participated in the election, false if otherwise
- */
- private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) {
- ClusterState clusterState = zkController.getClusterState();
- DocCollection docCollection = clusterState.getCollectionOrNull(collection);
- Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
- if (slices == null) return false;
-
- long replicaTerm = zkShardTerms.getTerm(coreNodeName);
- boolean isRecovering = zkShardTerms.isRecovering(coreNodeName);
-
- for (Replica replica : slices.getReplicas()) {
- if (replica.getName().equals(coreNodeName)) continue;
-
- if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
- long otherTerm = zkShardTerms.getTerm(replica.getName());
- boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName());
-
- if (isRecovering && !isOtherReplicaRecovering) return true;
- if (otherTerm > replicaTerm) return true;
- }
- }
- return false;
- }
-
- public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
- if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
- ZkStateReader zkStateReader = zkController.getZkStateReader();
- zkStateReader.forceUpdateCollection(collection);
- ClusterState clusterState = zkStateReader.getClusterState();
- Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
- if (rep == null) return;
- if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
- log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- }
- }
- }
-
- private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) {
- if (clusterState == null) return null;
- final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
- if (docCollection == null) return null;
- return docCollection.getReplica(replicaName);
- }
-
- // returns true if all replicas are found to be up, false if not
- private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
- long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
- final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
-
- DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
- Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
- int cnt = 0;
- while (!isClosed && !cc.isShutDown()) {
- // wait for everyone to be up
- if (slices != null) {
- int found = 0;
- try {
- found = zkClient.getChildren(shardsElectZkPath, null, true).size();
- } catch (KeeperException e) {
- if (e instanceof KeeperException.SessionExpiredException) {
- // if the session has expired, then another election will be launched, so
- // quit here
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "ZK session expired - cancelling election for " + collection + " " + shardId);
- }
- SolrException.log(log,
- "Error checking for the number of election participants", e);
- }
-
- // on startup and after connection timeout, wait for all known shards
- if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
- log.info("Enough replicas found to continue.");
- return true;
- } else {
- if (cnt % 40 == 0) {
- log.info("Waiting until we see more replicas up for shard {}: total={}"
- + " found={}"
- + " timeoutin={}ms",
- shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
- TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
- }
- }
-
- if (System.nanoTime() > timeoutAt) {
- log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
- return false;
- }
- } else {
- log.warn("Shard not found: " + shardId + " for collection " + collection);
-
- return false;
-
- }
-
- Thread.sleep(500);
- docCollection = zkController.getClusterState().getCollectionOrNull(collection);
- slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
- cnt++;
- }
- return false;
- }
-
- // returns true if all replicas are found to be up, false if not
- private boolean areAllReplicasParticipating() throws InterruptedException {
- final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
- final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
-
- if (docCollection != null && docCollection.getSlice(shardId) != null) {
- final Slice slices = docCollection.getSlice(shardId);
- int found = 0;
- try {
- found = zkClient.getChildren(shardsElectZkPath, null, true).size();
- } catch (KeeperException e) {
- if (e instanceof KeeperException.SessionExpiredException) {
- // if the session has expired, then another election will be launched, so
- // quit here
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "ZK session expired - cancelling election for " + collection + " " + shardId);
- }
- SolrException.log(log, "Error checking for the number of election participants", e);
- }
-
- if (found >= slices.getReplicasMap().size()) {
- log.debug("All replicas are ready to participate in election.");
- return true;
- }
-
- } else {
- log.warn("Shard not found: " + shardId + " for collection " + collection);
-
- return false;
- }
-
- return false;
- }
-
- private void rejoinLeaderElection(SolrCore core)
- throws InterruptedException, KeeperException, IOException {
- // remove our ephemeral and re join the election
- if (cc.isShutDown()) {
- log.debug("Not rejoining election because CoreContainer is closed");
- return;
- }
-
- log.info("There may be a better leader candidate than us - going back into recovery");
-
- cancelElection();
-
- core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
-
- leaderElector.joinElection(this, true);
- }
-
-}
-
-final class OverseerElectionContext extends ElectionContext {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final SolrZkClient zkClient;
- private Overseer overseer;
-
- public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
- super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", null, zkClient);
- this.overseer = overseer;
- this.zkClient = zkClient;
- try {
- new ZkCmdExecutor(zkClient.getZkClientTimeout()).ensureExists(Overseer.OVERSEER_ELECT, zkClient);
- } catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
-
- @Override
- void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
- InterruptedException {
- log.info("I am going to be the leader {}", id);
- final String id = leaderSeqPath
- .substring(leaderSeqPath.lastIndexOf("/") + 1);
- ZkNodeProps myProps = new ZkNodeProps(ID, id);
-
- zkClient.makePath(leaderPath, Utils.toJSON(myProps),
- CreateMode.EPHEMERAL, true);
- if(pauseBeforeStartMs >0){
- try {
- Thread.sleep(pauseBeforeStartMs);
- } catch (InterruptedException e) {
- Thread.interrupted();
- log.warn("Wait interrupted ", e);
- }
- }
- if (!overseer.getZkController().isClosed() && !overseer.getZkController().getCoreContainer().isShutDown()) {
- overseer.start(id);
- }
- }
-
- @Override
- public void cancelElection() throws InterruptedException, KeeperException {
- super.cancelElection();
- overseer.close();
- }
-
- @Override
- public void close() {
- overseer.close();
- }
-
- @Override
- public ElectionContext copy() {
- return new OverseerElectionContext(zkClient, overseer ,id);
- }
-
- @Override
- public void joinedElectionFired() {
- overseer.close();
- }
-
- @Override
- public void checkIfIamLeaderFired() {
- // leader changed - close the overseer
- overseer.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
deleted file mode 100644
index 953023f..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
-import org.apache.solr.cloud.overseer.ClusterStateMutator;
-import org.apache.solr.cloud.overseer.CollectionMutator;
-import org.apache.solr.cloud.overseer.SliceMutator;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-
-import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-
-// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
-class ExclusiveSliceProperty {
- private ClusterState clusterState;
- private final boolean onlyActiveNodes;
- private final String property;
- private final DocCollection collection;
- private final String collectionName;
-
- // Key structure. For each node, list all replicas on it regardless of whether they have the property or not.
- private final Map<String, List<SliceReplica>> nodesHostingReplicas = new HashMap<>();
- // Key structure. For each node, a list of the replicas _currently_ hosting the property.
- private final Map<String, List<SliceReplica>> nodesHostingProp = new HashMap<>();
- Set<String> shardsNeedingHosts = new HashSet<>();
- Map<String, Slice> changedSlices = new HashMap<>(); // Work on copies rather than the underlying cluster state.
-
- private int origMaxPropPerNode = 0;
- private int origModulo = 0;
- private int tmpMaxPropPerNode = 0;
- private int tmpModulo = 0;
- Random rand = new Random();
-
- private int assigned = 0;
-
- ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
- this.clusterState = clusterState;
- String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
- if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
- tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
- }
- this.property = tmp.toLowerCase(Locale.ROOT);
- collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-
- if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(property)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Overseer '" + message.getStr(Overseer.QUEUE_OPERATION) + "' requires both the '" + ZkStateReader.COLLECTION_PROP + "' and '" +
- ZkStateReader.PROPERTY_PROP + "' parameters. No action taken ");
- }
-
- Boolean shardUnique = Boolean.parseBoolean(message.getStr(SHARD_UNIQUE));
- if (shardUnique == false &&
- SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(this.property) == false) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
- + " the property be a pre-defined property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true' " +
- " Property: " + this.property + " shardUnique: " + Boolean.toString(shardUnique));
- }
-
- collection = clusterState.getCollection(collectionName);
- if (collection == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Could not find collection ' " + collectionName + "' for overseer operation '" +
- message.getStr(Overseer.QUEUE_OPERATION) + "'. No action taken.");
- }
- onlyActiveNodes = Boolean.parseBoolean(message.getStr(ONLY_ACTIVE_NODES, "true"));
- }
-
-
- DocCollection getDocCollection() {
- return collection;
- }
-
- private boolean isActive(Replica replica) {
- return replica.getState() == Replica.State.ACTIVE;
- }
-
- // Collect a list of all the nodes that _can_ host the indicated property. Along the way, also collect any of
- // the replicas on that node that _already_ host the property as well as any slices that do _not_ have the
- // property hosted.
- //
- // Return true if anything node needs it's property reassigned. False if the property is already balanced for
- // the collection.
-
- private boolean collectCurrentPropStats() {
- int maxAssigned = 0;
- // Get a list of potential replicas that can host the property _and_ their counts
- // Move any obvious entries to a list of replicas to change the property on
- Set<String> allHosts = new HashSet<>();
- for (Slice slice : collection.getSlices()) {
- boolean sliceHasProp = false;
- for (Replica replica : slice.getReplicas()) {
- if (onlyActiveNodes && isActive(replica) == false) {
- if (StringUtils.isNotBlank(replica.getStr(property))) {
- removeProp(slice, replica.getName()); // Note, we won't be committing this to ZK until later.
- }
- continue;
- }
- allHosts.add(replica.getNodeName());
- String nodeName = replica.getNodeName();
- if (StringUtils.isNotBlank(replica.getStr(property))) {
- if (sliceHasProp) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "'" + BALANCESHARDUNIQUE + "' should only be called for properties that have at most one member " +
- "in any slice with the property set. No action taken.");
- }
- if (nodesHostingProp.containsKey(nodeName) == false) {
- nodesHostingProp.put(nodeName, new ArrayList<>());
- }
- nodesHostingProp.get(nodeName).add(new SliceReplica(slice, replica));
- ++assigned;
- maxAssigned = Math.max(maxAssigned, nodesHostingProp.get(nodeName).size());
- sliceHasProp = true;
- }
- if (nodesHostingReplicas.containsKey(nodeName) == false) {
- nodesHostingReplicas.put(nodeName, new ArrayList<>());
- }
- nodesHostingReplicas.get(nodeName).add(new SliceReplica(slice, replica));
- }
- }
-
- // If the total number of already-hosted properties assigned to nodes
- // that have potential to host leaders is equal to the slice count _AND_ none of the current nodes has more than
- // the max number of properties, there's nothing to do.
- origMaxPropPerNode = collection.getSlices().size() / allHosts.size();
-
- // Some nodes can have one more of the proeprty if the numbers aren't exactly even.
- origModulo = collection.getSlices().size() % allHosts.size();
- if (origModulo > 0) {
- origMaxPropPerNode++; // have to have some nodes with 1 more property.
- }
-
- // We can say for sure that we need to rebalance if we don't have as many assigned properties as slices.
- if (assigned != collection.getSlices().size()) {
- return true;
- }
-
- // Make sure there are no more slices at the limit than the "leftovers"
- // Let's say there's 7 slices and 3 nodes. We need to distribute the property as 3 on node1, 2 on node2 and 2 on node3
- // (3, 2, 2) We need to be careful to not distribute them as 3, 3, 1. that's what this check is all about.
- int counter = origModulo;
- for (List<SliceReplica> list : nodesHostingProp.values()) {
- if (list.size() == origMaxPropPerNode) --counter;
- }
- if (counter == 0) return false; // nodes with 1 extra leader are exactly the needed number
-
- return true;
- }
-
- private void removeSliceAlreadyHostedFromPossibles(String sliceName) {
- for (Map.Entry<String, List<SliceReplica>> entReplica : nodesHostingReplicas.entrySet()) {
-
- ListIterator<SliceReplica> iter = entReplica.getValue().listIterator();
- while (iter.hasNext()) {
- SliceReplica sr = iter.next();
- if (sr.slice.getName().equals(sliceName))
- iter.remove();
- }
- }
- }
-
- private void balanceUnassignedReplicas() {
- tmpMaxPropPerNode = origMaxPropPerNode; // A bit clumsy, but don't want to duplicate code.
- tmpModulo = origModulo;
-
- // Get the nodeName and shardName for the node that has the least room for this
-
- while (shardsNeedingHosts.size() > 0) {
- String nodeName = "";
- int minSize = Integer.MAX_VALUE;
- SliceReplica srToChange = null;
- for (String slice : shardsNeedingHosts) {
- for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingReplicas.entrySet()) {
- // A little tricky. If we don't set this to something below, then it means all possible places to
- // put this property are full up, so just put it somewhere.
- if (srToChange == null && ent.getValue().size() > 0) {
- srToChange = ent.getValue().get(0);
- }
- ListIterator<SliceReplica> iter = ent.getValue().listIterator();
- while (iter.hasNext()) {
- SliceReplica sr = iter.next();
- if (StringUtils.equals(slice, sr.slice.getName()) == false) {
- continue;
- }
- if (nodesHostingProp.containsKey(ent.getKey()) == false) {
- nodesHostingProp.put(ent.getKey(), new ArrayList<SliceReplica>());
- }
- if (minSize > nodesHostingReplicas.get(ent.getKey()).size() && nodesHostingProp.get(ent.getKey()).size() < tmpMaxPropPerNode) {
- minSize = nodesHostingReplicas.get(ent.getKey()).size();
- srToChange = sr;
- nodeName = ent.getKey();
- }
- }
- }
- }
- // Now, you have a slice and node to put it on
- shardsNeedingHosts.remove(srToChange.slice.getName());
- if (nodesHostingProp.containsKey(nodeName) == false) {
- nodesHostingProp.put(nodeName, new ArrayList<SliceReplica>());
- }
- nodesHostingProp.get(nodeName).add(srToChange);
- adjustLimits(nodesHostingProp.get(nodeName));
- removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName());
- addProp(srToChange.slice, srToChange.replica.getName());
- }
- }
-
- // Adjust the min/max counts per allowed per node. Special handling here for dealing with the fact
- // that no node should have more than 1 more replica with this property than any other.
- private void adjustLimits(List<SliceReplica> changeList) {
- if (changeList.size() == tmpMaxPropPerNode) {
- if (tmpModulo < 0) return;
-
- --tmpModulo;
- if (tmpModulo == 0) {
- --tmpMaxPropPerNode;
- --tmpModulo; // Prevent dropping tmpMaxPropPerNode again.
- }
- }
- }
-
- // Go through the list of presently-hosted properties and remove any that have too many replicas that host the property
- private void removeOverallocatedReplicas() {
- tmpMaxPropPerNode = origMaxPropPerNode; // A bit clumsy, but don't want to duplicate code.
- tmpModulo = origModulo;
-
- for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingProp.entrySet()) {
- while (ent.getValue().size() > tmpMaxPropPerNode) { // remove delta nodes
- ent.getValue().remove(rand.nextInt(ent.getValue().size()));
- }
- adjustLimits(ent.getValue());
- }
- }
-
- private void removeProp(Slice origSlice, String replicaName) {
- getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property);
- }
-
- private void addProp(Slice origSlice, String replicaName) {
- getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true");
- }
-
- // Just a place to encapsulate the fact that we need to have new slices (copy) to update before we
- // put this all in the cluster state.
- private Replica getReplicaFromChanged(Slice origSlice, String replicaName) {
- Slice newSlice = changedSlices.get(origSlice.getName());
- Replica replica;
- if (newSlice != null) {
- replica = newSlice.getReplica(replicaName);
- } else {
- newSlice = new Slice(origSlice.getName(), origSlice.getReplicasCopy(), origSlice.shallowCopy());
- changedSlices.put(origSlice.getName(), newSlice);
- replica = newSlice.getReplica(replicaName);
- }
- if (replica == null) {
- throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Should have been able to find replica '" +
- replicaName + "' in slice '" + origSlice.getName() + "'. No action taken");
- }
- return replica;
-
- }
- // Main entry point for carrying out the action. Returns "true" if we have actually moved properties around.
-
- boolean balanceProperty() {
- if (collectCurrentPropStats() == false) {
- return false;
- }
-
- // we have two lists based on nodeName
- // 1> all the nodes that _could_ host a property for the slice
- // 2> all the nodes that _currently_ host a property for the slice.
-
- // So, remove a replica from the nodes that have too many
- removeOverallocatedReplicas();
-
- // prune replicas belonging to a slice that have the property currently assigned from the list of replicas
- // that could host the property.
- for (Map.Entry<String, List<SliceReplica>> entProp : nodesHostingProp.entrySet()) {
- for (SliceReplica srHosting : entProp.getValue()) {
- removeSliceAlreadyHostedFromPossibles(srHosting.slice.getName());
- }
- }
-
- // Assemble the list of slices that do not have any replica hosting the property:
- for (Map.Entry<String, List<SliceReplica>> ent : nodesHostingReplicas.entrySet()) {
- ListIterator<SliceReplica> iter = ent.getValue().listIterator();
- while (iter.hasNext()) {
- SliceReplica sr = iter.next();
- shardsNeedingHosts.add(sr.slice.getName());
- }
- }
-
- // At this point, nodesHostingProp should contain _only_ lists of replicas that belong to slices that do _not_
- // have any replica hosting the property. So let's assign them.
-
- balanceUnassignedReplicas();
- for (Slice newSlice : changedSlices.values()) {
- DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice);
- clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection);
- }
- return true;
- }
-
- private static class SliceReplica {
- Slice slice;
- Replica replica;
-
- SliceReplica(Slice slice, Replica replica) {
- this.slice = slice;
- this.replica = replica;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
deleted file mode 100644
index 46f3c88..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.solr.cloud.ZkController.ContextKey;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Leader Election process. This class contains the logic by which a
- * leader is chosen. First call * {@link #setup(ElectionContext)} to ensure
- * the election process is init'd. Next call
- * {@link #joinElection(ElectionContext, boolean)} to start the leader election.
- *
- * The implementation follows the classic ZooKeeper recipe of creating an
- * ephemeral, sequential node for each candidate and then looking at the set
- * of such nodes - if the created node is the lowest sequential node, the
- * candidate that created the node is the leader. If not, the candidate puts
- * a watch on the next lowest node it finds, and if that node goes down,
- * starts the whole process over by checking if it's the lowest sequential node, etc.
- *
- */
-public class LeaderElector {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- static final String ELECTION_NODE = "/election";
-
- public final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
- private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?-.*?)-n_\\d+");
- private final static Pattern NODE_NAME = Pattern.compile(".*?/?(.*?-)(.*?)-n_\\d+");
-
- protected SolrZkClient zkClient;
-
- private ZkCmdExecutor zkCmdExecutor;
-
- private volatile ElectionContext context;
-
- private ElectionWatcher watcher;
-
- private Map<ContextKey,ElectionContext> electionContexts;
- private ContextKey contextKey;
-
- public LeaderElector(SolrZkClient zkClient) {
- this.zkClient = zkClient;
- zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
- }
-
- public LeaderElector(SolrZkClient zkClient, ContextKey key, Map<ContextKey,ElectionContext> electionContexts) {
- this.zkClient = zkClient;
- zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
- this.electionContexts = electionContexts;
- this.contextKey = key;
- }
-
- public ElectionContext getContext() {
- return context;
- }
-
- /**
- * Check if the candidate with the given n_* sequence number is the leader.
- * If it is, set the leaderId on the leader zk node. If it is not, start
- * watching the candidate that is in line before this one - if it goes down, check
- * if this candidate is the leader again.
- *
- * @param replacement has someone else been the leader already?
- */
- private void checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
- InterruptedException, IOException {
- context.checkIfIamLeaderFired();
- // get all other numbers...
- final String holdElectionPath = context.electionPath + ELECTION_NODE;
- List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
- sortSeqs(seqs);
-
- String leaderSeqNodeName = context.leaderSeqPath.substring(context.leaderSeqPath.lastIndexOf('/') + 1);
- if (!seqs.contains(leaderSeqNodeName)) {
- log.warn("Our node is no longer in line to be leader");
- return;
- }
-
- // If any double-registrations exist for me, remove all but this latest one!
- // TODO: can we even get into this state?
- String prefix = zkClient.getSolrZooKeeper().getSessionId() + "-" + context.id + "-";
- Iterator<String> it = seqs.iterator();
- while (it.hasNext()) {
- String elec = it.next();
- if (!elec.equals(leaderSeqNodeName) && elec.startsWith(prefix)) {
- try {
- String toDelete = holdElectionPath + "/" + elec;
- log.warn("Deleting duplicate registration: {}", toDelete);
- zkClient.delete(toDelete, -1, true);
- } catch (KeeperException.NoNodeException e) {
- // ignore
- }
- it.remove();
- }
- }
-
- if (leaderSeqNodeName.equals(seqs.get(0))) {
- // I am the leader
- try {
- runIamLeaderProcess(context, replacement);
- } catch (KeeperException.NodeExistsException e) {
- log.error("node exists",e);
- retryElection(context, false);
- return;
- }
- } else {
- // I am not the leader - watch the node below me
- String toWatch = seqs.get(0);
- for (String node : seqs) {
- if (leaderSeqNodeName.equals(node)) {
- break;
- }
- toWatch = node;
- }
- try {
- String watchedNode = holdElectionPath + "/" + toWatch;
- zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, getSeq(context.leaderSeqPath), context), null, true);
- log.debug("Watching path {} to know if I could be the leader", watchedNode);
- } catch (KeeperException.SessionExpiredException e) {
- throw e;
- } catch (KeeperException.NoNodeException e) {
- // the previous node disappeared, check if we are the leader again
- checkIfIamLeader(context, true);
- } catch (KeeperException e) {
- // we couldn't set our watch for some other reason, retry
- log.warn("Failed setting watch", e);
- checkIfIamLeader(context, true);
- }
- }
- }
-
- // TODO: get this core param out of here
- protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
- InterruptedException, IOException {
- context.runLeaderProcess(weAreReplacement,0);
- }
-
- /**
- * Returns int given String of form n_0000000001 or n_0000000003, etc.
- *
- * @return sequence number
- */
- public static int getSeq(String nStringSequence) {
- int seq = 0;
- Matcher m = LEADER_SEQ.matcher(nStringSequence);
- if (m.matches()) {
- seq = Integer.parseInt(m.group(1));
- } else {
- throw new IllegalStateException("Could not find regex match in:"
- + nStringSequence);
- }
- return seq;
- }
-
- private String getNodeId(String nStringSequence) {
- String id;
- Matcher m = SESSION_ID.matcher(nStringSequence);
- if (m.matches()) {
- id = m.group(1);
- } else {
- throw new IllegalStateException("Could not find regex match in:"
- + nStringSequence);
- }
- return id;
- }
-
- public static String getNodeName(String nStringSequence){
- String result;
- Matcher m = NODE_NAME.matcher(nStringSequence);
- if (m.matches()) {
- result = m.group(2);
- } else {
- throw new IllegalStateException("Could not find regex match in:"
- + nStringSequence);
- }
- return result;
-
- }
-
- public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
- return joinElection(context,replacement, false);
- }
-
- /**
- * Begin participating in the election process. Gets a new sequential number
- * and begins watching the node with the sequence number before it, unless it
- * is the lowest number, in which case, initiates the leader process. If the
- * node that is watched goes down, check if we are the new lowest node, else
- * watch the next lowest numbered node.
- *
- * @return sequential node number
- */
- public int joinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
- context.joinedElectionFired();
-
- final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
-
- long sessionId = zkClient.getSolrZooKeeper().getSessionId();
- String id = sessionId + "-" + context.id;
- String leaderSeqPath = null;
- boolean cont = true;
- int tries = 0;
- while (cont) {
- try {
- if(joinAtHead){
- log.debug("Node {} trying to join election at the head", id);
- List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
- if(nodes.size() <2){
- leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
- CreateMode.EPHEMERAL_SEQUENTIAL, false);
- } else {
- String firstInLine = nodes.get(1);
- log.debug("The current head: {}", firstInLine);
- Matcher m = LEADER_SEQ.matcher(firstInLine);
- if (!m.matches()) {
- throw new IllegalStateException("Could not find regex match in:"
- + firstInLine);
- }
- leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1);
- zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
- }
- } else {
- leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
- CreateMode.EPHEMERAL_SEQUENTIAL, false);
- }
-
- log.debug("Joined leadership election with path: {}", leaderSeqPath);
- context.leaderSeqPath = leaderSeqPath;
- cont = false;
- } catch (ConnectionLossException e) {
- // we don't know if we made our node or not...
- List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);
-
- boolean foundId = false;
- for (String entry : entries) {
- String nodeId = getNodeId(entry);
- if (id.equals(nodeId)) {
- // we did create our node...
- foundId = true;
- break;
- }
- }
- if (!foundId) {
- cont = true;
- if (tries++ > 20) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- try {
- Thread.sleep(50);
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- }
- }
-
- } catch (KeeperException.NoNodeException e) {
- // we must have failed in creating the election node - someone else must
- // be working on it, lets try again
- if (tries++ > 20) {
- context = null;
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- cont = true;
- try {
- Thread.sleep(50);
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- }
- }
- }
- checkIfIamLeader(context, replacement);
-
- return getSeq(context.leaderSeqPath);
- }
-
- private class ElectionWatcher implements Watcher {
- final String myNode,watchedNode;
- final ElectionContext context;
-
- private boolean canceled = false;
-
- private ElectionWatcher(String myNode, String watchedNode, int seq, ElectionContext context) {
- this.myNode = myNode;
- this.watchedNode = watchedNode;
- this.context = context;
- }
-
- void cancel() {
- canceled = true;
-
- }
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events, and do not remove the watcher
- if (EventType.None.equals(event.getType())) {
- return;
- }
- if (canceled) {
- log.debug("This watcher is not active anymore {}", myNode);
- try {
- zkClient.delete(myNode, -1, true);
- } catch (KeeperException.NoNodeException nne) {
- // expected . don't do anything
- } catch (Exception e) {
- log.warn("My watched node still exists and can't remove " + myNode, e);
- }
- return;
- }
- try {
- // am I the next leader?
- checkIfIamLeader(context, true);
- } catch (Exception e) {
- if (!zkClient.isClosed()) {
- log.warn("", e);
- }
- }
- }
- }
-
- /**
- * Set up any ZooKeeper nodes needed for leader election.
- */
- public void setup(final ElectionContext context) throws InterruptedException,
- KeeperException {
- String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE;
- if (context instanceof OverseerElectionContext) {
- zkCmdExecutor.ensureExists(electZKPath, zkClient);
- } else {
- // we use 2 param so that replica won't create /collection/{collection} if it doesn't exist
- zkCmdExecutor.ensureExists(electZKPath, (byte[])null, CreateMode.PERSISTENT, zkClient, 2);
- }
-
- this.context = context;
- }
-
- /**
- * Sort n string sequence list.
- */
- public static void sortSeqs(List<String> seqs) {
- Collections.sort(seqs, (o1, o2) -> {
- int i = getSeq(o1) - getSeq(o2);
- return i == 0 ? o1.compareTo(o2) : i;
- });
- }
-
- void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
- ElectionWatcher watcher = this.watcher;
- ElectionContext ctx = context.copy();
- if (electionContexts != null) {
- electionContexts.put(contextKey, ctx);
- }
- if (watcher != null) watcher.cancel();
- this.context.cancelElection();
- this.context.close();
- this.context = ctx;
- joinElection(ctx, true, joinAtHead);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/LockTree.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LockTree.java b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
deleted file mode 100644
index af0d30e..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/LockTree.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.solr.cloud.OverseerMessageHandler.Lock;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CollectionParams.LockLevel;
-import org.apache.solr.common.util.StrUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a utility class that offers fine grained locking for various Collection Operations
- * This class is designed for single threaded operation. It's safe for multiple threads to use it
- * but internally it is synchronized so that only one thread can perform any operation.
- */
-public class LockTree {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final Node root = new Node(null, LockLevel.CLUSTER, null);
-
- public void clear() {
- synchronized (this) {
- root.clear();
- }
- }
-
- private class LockImpl implements Lock {
- final Node node;
-
- LockImpl( Node node) {
- this.node = node;
- }
-
- @Override
- public void unlock() {
- synchronized (LockTree.this) {
- node.unlock(this);
- }
- }
-
- @Override
- public String toString() {
- return StrUtils.join(node.constructPath(new LinkedList<>()), '/');
- }
- }
-
-
- public class Session {
- private SessionNode root = new SessionNode(LockLevel.CLUSTER);
-
- public Lock lock(CollectionParams.CollectionAction action, List<String> path) {
- synchronized (LockTree.this) {
- if (action.lockLevel == LockLevel.NONE) return FREELOCK;
- if (root.isBusy(action.lockLevel, path)) return null;
- Lock lockObject = LockTree.this.root.lock(action.lockLevel, path);
- if (lockObject == null) root.markBusy(path, 0);
- return lockObject;
- }
- }
- }
-
- private static class SessionNode {
- final LockLevel level;
- Map<String, SessionNode> kids;
- boolean busy = false;
-
- SessionNode(LockLevel level) {
- this.level = level;
- }
-
- void markBusy(List<String> path, int depth) {
- if (path.size() == depth) {
- busy = true;
- } else {
- String s = path.get(depth);
- if (kids == null) kids = new HashMap<>();
- SessionNode node = kids.get(s);
- if (node == null) kids.put(s, node = new SessionNode(level.getChild()));
- node.markBusy(path, depth + 1);
- }
- }
-
- boolean isBusy(LockLevel lockLevel, List<String> path) {
- if (lockLevel.isHigherOrEqual(level)) {
- if (busy) return true;
- String s = path.get(level.level);
- if (kids == null || kids.get(s) == null) return false;
- return kids.get(s).isBusy(lockLevel, path);
- } else {
- return false;
- }
- }
- }
-
- public Session getSession() {
- return new Session();
- }
-
- private class Node {
- final String name;
- final Node mom;
- final LockLevel level;
- HashMap<String, Node> children = new HashMap<>();
- LockImpl myLock;
-
- Node(String name, LockLevel level, Node mom) {
- this.name = name;
- this.level = level;
- this.mom = mom;
- }
-
- //if this or any of its children are locked
- boolean isLocked() {
- if (myLock != null) return true;
- for (Node node : children.values()) if (node.isLocked()) return true;
- return false;
- }
-
-
- void unlock(LockImpl lockObject) {
- if (myLock == lockObject) myLock = null;
- else {
- log.info("Unlocked multiple times : {}", lockObject.toString());
- }
- }
-
-
- Lock lock(LockLevel lockLevel, List<String> path) {
- if (myLock != null) return null;//I'm already locked. no need to go any further
- if (lockLevel == level) {
- //lock is supposed to be acquired at this level
- //If I am locked or any of my children or grandchildren are locked
- // it is not possible to acquire a lock
- if (isLocked()) return null;
- return myLock = new LockImpl(this);
- } else {
- String childName = path.get(level.level);
- Node child = children.get(childName);
- if (child == null)
- children.put(childName, child = new Node(childName, LockLevel.getLevel(level.level + 1), this));
- return child.lock(lockLevel, path);
- }
- }
-
- LinkedList<String> constructPath(LinkedList<String> collect) {
- if (name != null) collect.addFirst(name);
- if (mom != null) mom.constructPath(collect);
- return collect;
- }
-
- void clear() {
- if (myLock != null) {
- log.warn("lock_is_leaked at" + constructPath(new LinkedList<>()));
- myLock = null;
- }
- for (Node node : children.values()) node.clear();
- }
- }
- static final Lock FREELOCK = () -> {};
-
-}