You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2016/04/27 19:11:09 UTC
[1/4] lucene-solr:jira/solr-9045: * Add DefaultRecoveryStrategy.java
as clone of RecoveryStrategy.java (prep for factoring out an abstract
RecoveryStrategy base class).
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-9045 [created] e3d5a1925
* Add DefaultRecoveryStrategy.java as clone of RecoveryStrategy.java (prep for factoring out an abstract RecoveryStrategy base class).
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/85ed896b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/85ed896b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/85ed896b
Branch: refs/heads/jira/solr-9045
Commit: 85ed896b095dcf8576d1b93827d809385e6c779d
Parents: ebd1204
Author: Christine Poerschke <cp...@apache.org>
Authored: Wed Apr 27 16:30:00 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 17:18:05 2016 +0100
----------------------------------------------------------------------
.../solr/cloud/DefaultRecoveryStrategy.java | 597 +++++++++++++++++++
1 file changed, 597 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/85ed896b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
new file mode 100644
index 0000000..b1a8040
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory.DirContext;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.PeerSync;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.util.RefCounted;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultRecoveryStrategy extends Thread implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
+ private static final int MAX_RETRIES = 500;
+ private static final int STARTING_RECOVERY_DELAY = 5000;
+
+ public static interface RecoveryListener {
+ public void recovered();
+ public void failed();
+ }
+
+ private volatile boolean close = false;
+
+ private RecoveryListener recoveryListener;
+ private ZkController zkController;
+ private String baseUrl;
+ private String coreZkNodeName;
+ private ZkStateReader zkStateReader;
+ private volatile String coreName;
+ private int retries;
+ private boolean recoveringAfterStartup;
+ private CoreContainer cc;
+ private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
+
+ // this should only be used from SolrCoreState
+ public DefaultRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
+ this.cc = cc;
+ this.coreName = cd.getName();
+ this.recoveryListener = recoveryListener;
+ setName("RecoveryThread-"+this.coreName);
+ zkController = cc.getZkController();
+ zkStateReader = zkController.getZkStateReader();
+ baseUrl = zkController.getBaseUrl();
+ coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
+ }
+
+ public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
+ this.recoveringAfterStartup = recoveringAfterStartup;
+ }
+
+ // make sure any threads stop retrying
+ @Override
+ public void close() {
+ close = true;
+ if (prevSendPreRecoveryHttpUriRequest != null) {
+ prevSendPreRecoveryHttpUriRequest.abort();
+ }
+ LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
+ }
+
+ private void recoveryFailed(final SolrCore core,
+ final ZkController zkController, final String baseUrl,
+ final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
+ SolrException.log(LOG, "Recovery failed - I give up.");
+ try {
+ zkController.publish(cd, Replica.State.RECOVERY_FAILED);
+ } finally {
+ close();
+ recoveryListener.failed();
+ }
+ }
+
+ private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
+ throws SolrServerException, IOException {
+
+ ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
+ String leaderUrl = leaderCNodeProps.getCoreUrl();
+
+ LOG.info("Attempting to replicate from [{}].", leaderUrl);
+
+ // send commit
+ commitOnLeader(leaderUrl);
+
+ // use rep handler directly, so we can do this sync rather than async
+ SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+ ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+
+ if (replicationHandler == null) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
+
+ if (isClosed()) return; // we check closed on return
+ boolean success = replicationHandler.doFetch(solrParams, false);
+
+ if (!success) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
+ }
+
+ // solrcloud_debug
+ if (LOG.isDebugEnabled()) {
+ try {
+ RefCounted<SolrIndexSearcher> searchHolder = core
+ .getNewestSearcher(false);
+ SolrIndexSearcher searcher = searchHolder.get();
+ Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
+ try {
+ LOG.debug(core.getCoreDescriptor().getCoreContainer()
+ .getZkController().getNodeName()
+ + " replicated "
+ + searcher.search(new MatchAllDocsQuery(), 1).totalHits
+ + " from "
+ + leaderUrl
+ + " gen:"
+ + core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration()
+ + " data:" + core.getDataDir()
+ + " index:" + core.getIndexDir()
+ + " newIndex:" + core.getNewIndexDir()
+ + " files:" + Arrays.asList(dir.listAll()));
+ } finally {
+ core.getDirectoryFactory().release(dir);
+ searchHolder.decref();
+ }
+ } catch (Exception e) {
+ LOG.debug("Error in solrcloud_debug block", e);
+ }
+ }
+
+ }
+
+ private void commitOnLeader(String leaderUrl) throws SolrServerException,
+ IOException {
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
+ client.setConnectionTimeout(30000);
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+ ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
+ client);
+ }
+ }
+
+ @Override
+ public void run() {
+
+ // set request info for logging
+ try (SolrCore core = cc.getCore(coreName)) {
+
+ if (core == null) {
+ SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName);
+ return;
+ }
+ MDCLoggingContext.setCore(core);
+
+ LOG.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
+
+ try {
+ doRecovery(core);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ SolrException.log(LOG, "", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (Exception e) {
+ LOG.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ } finally {
+ MDCLoggingContext.clear();
+ }
+ }
+
+ // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
+ public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+ boolean replayed = false;
+ boolean successfulRecovery = false;
+
+ UpdateLog ulog;
+ ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog == null) {
+ SolrException.log(LOG, "No UpdateLog found - cannot recover.");
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+ core.getCoreDescriptor());
+ return;
+ }
+
+ boolean firstTime = true;
+
+ List<Long> recentVersions;
+ try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
+ recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
+ } catch (Exception e) {
+ SolrException.log(LOG, "Corrupt tlog - ignoring.", e);
+ recentVersions = new ArrayList<>(0);
+ }
+
+ List<Long> startingVersions = ulog.getStartingVersions();
+
+ if (startingVersions != null && recoveringAfterStartup) {
+ try {
+ int oldIdx = 0; // index of the start of the old list in the current list
+ long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
+
+ for (; oldIdx < recentVersions.size(); oldIdx++) {
+ if (recentVersions.get(oldIdx) == firstStartingVersion) break;
+ }
+
+ if (oldIdx > 0) {
+ LOG.info("####### Found new versions added after startup: num=[{}]", oldIdx);
+ LOG.info("###### currentVersions=[{}]",recentVersions);
+ }
+
+ LOG.info("###### startupVersions=[{}]", startingVersions);
+ } catch (Exception e) {
+ SolrException.log(LOG, "Error getting recent versions.", e);
+ recentVersions = new ArrayList<>(0);
+ }
+ }
+
+ if (recoveringAfterStartup) {
+ // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
+ // when we went down. We may have received updates since then.
+ recentVersions = startingVersions;
+ try {
+ if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
+ // last operation at the time of startup had the GAP flag set...
+ // this means we were previously doing a full index replication
+ // that probably didn't complete and buffering updates in the
+ // meantime.
+ LOG.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
+ firstTime = false; // skip peersync
+ }
+ } catch (Exception e) {
+ SolrException.log(LOG, "Error trying to get ulog starting operation.", e);
+ firstTime = false; // skip peersync
+ }
+ }
+
+ Future<RecoveryInfo> replayFuture = null;
+ while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+ try {
+ CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+ ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+ final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
+
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+
+ boolean isLeader = leaderUrl.equals(ourUrl);
+ if (isLeader && !cloudDesc.isLeader()) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+ }
+ if (cloudDesc.isLeader()) {
+ // we are now the leader - no one else must have been suitable
+ LOG.warn("We have not yet recovered - but we are now the leader!");
+ LOG.info("Finished recovery process.");
+ zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ return;
+ }
+
+ LOG.info("Begin buffering updates. core=[{}]", coreName);
+ ulog.bufferUpdates();
+ replayed = false;
+
+ LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+ ourUrl);
+ zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+
+
+ final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(),
+ cloudDesc.getShardId());
+
+ try {
+ prevSendPreRecoveryHttpUriRequest.abort();
+ } catch (NullPointerException e) {
+ // okay
+ }
+
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
+ sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
+ // we wait a bit so that any updates on the leader
+ // that started before they saw recovering state
+ // are sure to have finished (see SOLR-7141 for
+ // discussion around current value)
+ try {
+ Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ // first thing we just try to sync
+ if (firstTime) {
+ firstTime = false; // only try sync the first time through the loop
+ LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+ // System.out.println("Attempting to PeerSync from " + leaderUrl
+ // + " i am:" + zkController.getNodeName());
+ PeerSync peerSync = new PeerSync(core,
+ Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+ peerSync.setStartingVersions(recentVersions);
+ boolean syncSuccess = peerSync.sync();
+ if (syncSuccess) {
+ SolrQueryRequest req = new LocalSolrQueryRequest(core,
+ new ModifiableSolrParams());
+ // force open a new searcher
+ core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+ LOG.info("PeerSync stage of recovery was successful.");
+
+ // solrcloud_debug
+ cloudDebugLog(core, "synced");
+
+ LOG.info("Replaying updates buffered during PeerSync.");
+ replay(core);
+ replayed = true;
+
+ // sync success
+ successfulRecovery = true;
+ return;
+ }
+
+ LOG.info("PeerSync Recovery was not successful - trying replication.");
+ }
+
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
+ LOG.info("Starting Replication Recovery.");
+
+ try {
+
+ replicate(zkController.getNodeName(), core, leaderprops);
+
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
+ replayFuture = replay(core);
+ replayed = true;
+
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
+ LOG.info("Replication Recovery was successful.");
+ successfulRecovery = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Recovery was interrupted", e);
+ close = true;
+ } catch (Exception e) {
+ SolrException.log(LOG, "Error while trying to recover", e);
+ }
+
+ } catch (Exception e) {
+ SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
+ } finally {
+ if (!replayed) {
+ // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
+ // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
+ // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
+ // reset our starting point for playback.
+ LOG.info("Replay not started, or was not successful... still buffering updates.");
+
+ /** this prev code is retained in case we want to switch strategies.
+ try {
+ ulog.dropBufferedUpdates();
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ }
+ **/
+ }
+ if (successfulRecovery) {
+ LOG.info("Registering as Active after recovery.");
+ try {
+ zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ } catch (Exception e) {
+ LOG.error("Could not publish as ACTIVE after succesful recovery", e);
+ successfulRecovery = false;
+ }
+
+ if (successfulRecovery) {
+ close = true;
+ recoveryListener.recovered();
+ }
+ }
+ }
+
+ if (!successfulRecovery) {
+ // lets pause for a moment and we need to try again...
+ // TODO: we don't want to retry for some problems?
+ // Or do a fall off retry...
+ try {
+
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
+ LOG.error("Recovery failed - trying again... (" + retries + ")");
+
+ retries++;
+ if (retries >= MAX_RETRIES) {
+ SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
+ try {
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+ } catch (Exception e) {
+ SolrException.log(LOG, "Could not publish that recovery failed", e);
+ }
+ break;
+ }
+ } catch (Exception e) {
+ SolrException.log(LOG, "An error has occurred during recovery", e);
+ }
+
+ try {
+ // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
+ // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
+ // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
+ // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
+ double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
+ LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+ for (int i = 0; i < loopCount; i++) {
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break; // check if someone closed us
+ }
+ Thread.sleep(STARTING_RECOVERY_DELAY);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Recovery was interrupted.", e);
+ close = true;
+ }
+ }
+
+ }
+
+ // if replay was skipped (possibly to due pulling a full index from the leader),
+ // then we still need to update version bucket seeds after recovery
+ if (successfulRecovery && replayFuture == null) {
+ LOG.info("Updating version bucket highest from index after successful recovery.");
+ core.seedVersionBuckets();
+ }
+
+ LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
+ }
+
+ private Future<RecoveryInfo> replay(SolrCore core)
+ throws InterruptedException, ExecutionException {
+ Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
+ if (future == null) {
+ // no replay needed\
+ LOG.info("No replay needed.");
+ } else {
+ LOG.info("Replaying buffered documents.");
+ // wait for replay
+ RecoveryInfo report = future.get();
+ if (report.failed) {
+ SolrException.log(LOG, "Replay failed");
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
+ }
+ }
+
+ // solrcloud_debug
+ cloudDebugLog(core, "replayed");
+
+ return future;
+ }
+
+ private void cloudDebugLog(SolrCore core, String op) {
+ if (!LOG.isDebugEnabled()) {
+ return;
+ }
+ try {
+ RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+ SolrIndexSearcher searcher = searchHolder.get();
+ try {
+ final int totalHits = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
+ final String nodeName = core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName();
+ LOG.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
+ } finally {
+ searchHolder.decref();
+ }
+ } catch (Exception e) {
+ LOG.debug("Error in solrcloud_debug block", e);
+ }
+ }
+
+ public boolean isClosed() {
+ return close;
+ }
+
+ private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
+ throws SolrServerException, IOException, InterruptedException, ExecutionException {
+
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
+ client.setConnectionTimeout(30000);
+ WaitForState prepCmd = new WaitForState();
+ prepCmd.setCoreName(leaderCoreName);
+ prepCmd.setNodeName(zkController.getNodeName());
+ prepCmd.setCoreNodeName(coreZkNodeName);
+ prepCmd.setState(Replica.State.RECOVERING);
+ prepCmd.setCheckLive(true);
+ prepCmd.setOnlyIfLeader(true);
+ final Slice.State state = slice.getState();
+ if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY) {
+ prepCmd.setOnlyIfLeaderActive(true);
+ }
+ HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
+ prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+
+ LOG.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
+
+ mrr.future.get();
+ }
+ }
+
+}
[3/4] lucene-solr:jira/solr-9045: * Factor out getReplicateLeaderUrl
protected method in DefaultRecoveryStrategy.
Posted by cp...@apache.org.
* Factor out getReplicateLeaderUrl protected method in DefaultRecoveryStrategy.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/928763bc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/928763bc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/928763bc
Branch: refs/heads/jira/solr-9045
Commit: 928763bc5cddf3b117f02a9cb8096d2d5799df5c
Parents: 3cd0428
Author: Christine Poerschke <cp...@apache.org>
Authored: Wed Apr 20 17:38:59 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 17:18:06 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/solr/cloud/DefaultRecoveryStrategy.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/928763bc/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
index a115595..60d2931 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -124,11 +124,14 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
}
}
+ protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
+ return new ZkCoreNodeProps(leaderprops).getCoreUrl();
+ }
+
private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
throws SolrServerException, IOException {
- ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
- String leaderUrl = leaderCNodeProps.getCoreUrl();
+ final String leaderUrl = getReplicateLeaderUrl(leaderprops);
LOG.info("Attempting to replicate from [{}].", leaderUrl);
[2/4] lucene-solr:jira/solr-9045: * Turn RecoveryStrategy into
abstract base class.
Posted by cp...@apache.org.
* Turn RecoveryStrategy into abstract base class.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3cd04289
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3cd04289
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3cd04289
Branch: refs/heads/jira/solr-9045
Commit: 3cd04289be9c02f7c55c538126583fba96970fab
Parents: 85ed896
Author: Christine Poerschke <cp...@apache.org>
Authored: Wed Apr 27 16:39:50 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 17:18:06 2016 +0100
----------------------------------------------------------------------
.../solr/cloud/DefaultRecoveryStrategy.java | 13 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 571 +------------------
2 files changed, 7 insertions(+), 577 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3cd04289/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
index b1a8040..a115595 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.cloud;
-import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -65,7 +64,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DefaultRecoveryStrategy extends Thread implements Closeable {
+public class DefaultRecoveryStrategy extends RecoveryStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -73,14 +72,9 @@ public class DefaultRecoveryStrategy extends Thread implements Closeable {
private static final int MAX_RETRIES = 500;
private static final int STARTING_RECOVERY_DELAY = 5000;
- public static interface RecoveryListener {
- public void recovered();
- public void failed();
- }
-
private volatile boolean close = false;
- private RecoveryListener recoveryListener;
+ private RecoveryStrategy.RecoveryListener recoveryListener;
private ZkController zkController;
private String baseUrl;
private String coreZkNodeName;
@@ -92,7 +86,7 @@ public class DefaultRecoveryStrategy extends Thread implements Closeable {
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
// this should only be used from SolrCoreState
- public DefaultRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
+ public DefaultRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryStrategy.RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
this.recoveryListener = recoveryListener;
@@ -103,6 +97,7 @@ public class DefaultRecoveryStrategy extends Thread implements Closeable {
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
}
+ @Override
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
this.recoveringAfterStartup = recoveringAfterStartup;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3cd04289/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index abd00ae..d855a6d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -17,581 +17,16 @@
package org.apache.solr.cloud;
import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.DirectoryFactory.DirContext;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.PeerSync;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.RefCounted;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RecoveryStrategy extends Thread implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
- private static final int MAX_RETRIES = 500;
- private static final int STARTING_RECOVERY_DELAY = 5000;
+public abstract class RecoveryStrategy extends Thread implements Closeable {
public static interface RecoveryListener {
public void recovered();
public void failed();
}
-
- private volatile boolean close = false;
-
- private RecoveryListener recoveryListener;
- private ZkController zkController;
- private String baseUrl;
- private String coreZkNodeName;
- private ZkStateReader zkStateReader;
- private volatile String coreName;
- private int retries;
- private boolean recoveringAfterStartup;
- private CoreContainer cc;
- private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-
- // this should only be used from SolrCoreState
- public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
- this.cc = cc;
- this.coreName = cd.getName();
- this.recoveryListener = recoveryListener;
- setName("RecoveryThread-"+this.coreName);
- zkController = cc.getZkController();
- zkStateReader = zkController.getZkStateReader();
- baseUrl = zkController.getBaseUrl();
- coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
- }
-
- public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
- this.recoveringAfterStartup = recoveringAfterStartup;
- }
-
- // make sure any threads stop retrying
- @Override
- public void close() {
- close = true;
- if (prevSendPreRecoveryHttpUriRequest != null) {
- prevSendPreRecoveryHttpUriRequest.abort();
- }
- LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
- }
-
- private void recoveryFailed(final SolrCore core,
- final ZkController zkController, final String baseUrl,
- final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
- SolrException.log(LOG, "Recovery failed - I give up.");
- try {
- zkController.publish(cd, Replica.State.RECOVERY_FAILED);
- } finally {
- close();
- recoveryListener.failed();
- }
- }
-
- private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
- throws SolrServerException, IOException {
-
- ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
- String leaderUrl = leaderCNodeProps.getCoreUrl();
-
- LOG.info("Attempting to replicate from [{}].", leaderUrl);
-
- // send commit
- commitOnLeader(leaderUrl);
-
- // use rep handler directly, so we can do this sync rather than async
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
- if (replicationHandler == null) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
- }
-
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
-
- if (isClosed()) return; // we check closed on return
- boolean success = replicationHandler.doFetch(solrParams, false);
-
- if (!success) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
- }
-
- // solrcloud_debug
- if (LOG.isDebugEnabled()) {
- try {
- RefCounted<SolrIndexSearcher> searchHolder = core
- .getNewestSearcher(false);
- SolrIndexSearcher searcher = searchHolder.get();
- Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
- try {
- LOG.debug(core.getCoreDescriptor().getCoreContainer()
- .getZkController().getNodeName()
- + " replicated "
- + searcher.search(new MatchAllDocsQuery(), 1).totalHits
- + " from "
- + leaderUrl
- + " gen:"
- + core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration()
- + " data:" + core.getDataDir()
- + " index:" + core.getIndexDir()
- + " newIndex:" + core.getNewIndexDir()
- + " files:" + Arrays.asList(dir.listAll()));
- } finally {
- core.getDirectoryFactory().release(dir);
- searchHolder.decref();
- }
- } catch (Exception e) {
- LOG.debug("Error in solrcloud_debug block", e);
- }
- }
-
- }
- private void commitOnLeader(String leaderUrl) throws SolrServerException,
- IOException {
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
- client.setConnectionTimeout(30000);
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
- client);
- }
- }
+ public abstract void setRecoveringAfterStartup(boolean recoveringAfterStartup);
@Override
- public void run() {
-
- // set request info for logging
- try (SolrCore core = cc.getCore(coreName)) {
-
- if (core == null) {
- SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName);
- return;
- }
- MDCLoggingContext.setCore(core);
-
- LOG.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
-
- try {
- doRecovery(core);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- SolrException.log(LOG, "", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (Exception e) {
- LOG.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- } finally {
- MDCLoggingContext.clear();
- }
- }
-
- // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
- public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
- boolean replayed = false;
- boolean successfulRecovery = false;
-
- UpdateLog ulog;
- ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog == null) {
- SolrException.log(LOG, "No UpdateLog found - cannot recover.");
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
- core.getCoreDescriptor());
- return;
- }
-
- boolean firstTime = true;
-
- List<Long> recentVersions;
- try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
- } catch (Exception e) {
- SolrException.log(LOG, "Corrupt tlog - ignoring.", e);
- recentVersions = new ArrayList<>(0);
- }
-
- List<Long> startingVersions = ulog.getStartingVersions();
-
- if (startingVersions != null && recoveringAfterStartup) {
- try {
- int oldIdx = 0; // index of the start of the old list in the current list
- long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-
- for (; oldIdx < recentVersions.size(); oldIdx++) {
- if (recentVersions.get(oldIdx) == firstStartingVersion) break;
- }
-
- if (oldIdx > 0) {
- LOG.info("####### Found new versions added after startup: num=[{}]", oldIdx);
- LOG.info("###### currentVersions=[{}]",recentVersions);
- }
-
- LOG.info("###### startupVersions=[{}]", startingVersions);
- } catch (Exception e) {
- SolrException.log(LOG, "Error getting recent versions.", e);
- recentVersions = new ArrayList<>(0);
- }
- }
-
- if (recoveringAfterStartup) {
- // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
- // when we went down. We may have received updates since then.
- recentVersions = startingVersions;
- try {
- if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
- // last operation at the time of startup had the GAP flag set...
- // this means we were previously doing a full index replication
- // that probably didn't complete and buffering updates in the
- // meantime.
- LOG.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
- firstTime = false; // skip peersync
- }
- } catch (Exception e) {
- SolrException.log(LOG, "Error trying to get ulog starting operation.", e);
- firstTime = false; // skip peersync
- }
- }
-
- Future<RecoveryInfo> replayFuture = null;
- while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
- try {
- CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId());
-
- final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
- final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
- String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-
- boolean isLeader = leaderUrl.equals(ourUrl);
- if (isLeader && !cloudDesc.isLeader()) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
- }
- if (cloudDesc.isLeader()) {
- // we are now the leader - no one else must have been suitable
- LOG.warn("We have not yet recovered - but we are now the leader!");
- LOG.info("Finished recovery process.");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- return;
- }
-
- LOG.info("Begin buffering updates. core=[{}]", coreName);
- ulog.bufferUpdates();
- replayed = false;
-
- LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
- ourUrl);
- zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-
-
- final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(),
- cloudDesc.getShardId());
-
- try {
- prevSendPreRecoveryHttpUriRequest.abort();
- } catch (NullPointerException e) {
- // okay
- }
-
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break;
- }
-
- sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
-
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break;
- }
-
- // we wait a bit so that any updates on the leader
- // that started before they saw recovering state
- // are sure to have finished (see SOLR-7141 for
- // discussion around current value)
- try {
- Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- // first thing we just try to sync
- if (firstTime) {
- firstTime = false; // only try sync the first time through the loop
- LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
- // System.out.println("Attempting to PeerSync from " + leaderUrl
- // + " i am:" + zkController.getNodeName());
- PeerSync peerSync = new PeerSync(core,
- Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
- peerSync.setStartingVersions(recentVersions);
- boolean syncSuccess = peerSync.sync();
- if (syncSuccess) {
- SolrQueryRequest req = new LocalSolrQueryRequest(core,
- new ModifiableSolrParams());
- // force open a new searcher
- core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
- LOG.info("PeerSync stage of recovery was successful.");
-
- // solrcloud_debug
- cloudDebugLog(core, "synced");
-
- LOG.info("Replaying updates buffered during PeerSync.");
- replay(core);
- replayed = true;
-
- // sync success
- successfulRecovery = true;
- return;
- }
-
- LOG.info("PeerSync Recovery was not successful - trying replication.");
- }
-
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break;
- }
-
- LOG.info("Starting Replication Recovery.");
-
- try {
-
- replicate(zkController.getNodeName(), core, leaderprops);
-
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break;
- }
-
- replayFuture = replay(core);
- replayed = true;
-
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break;
- }
-
- LOG.info("Replication Recovery was successful.");
- successfulRecovery = true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Recovery was interrupted", e);
- close = true;
- } catch (Exception e) {
- SolrException.log(LOG, "Error while trying to recover", e);
- }
-
- } catch (Exception e) {
- SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
- } finally {
- if (!replayed) {
- // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
- // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
- // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
- // reset our starting point for playback.
- LOG.info("Replay not started, or was not successful... still buffering updates.");
-
- /** this prev code is retained in case we want to switch strategies.
- try {
- ulog.dropBufferedUpdates();
- } catch (Exception e) {
- SolrException.log(log, "", e);
- }
- **/
- }
- if (successfulRecovery) {
- LOG.info("Registering as Active after recovery.");
- try {
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- } catch (Exception e) {
- LOG.error("Could not publish as ACTIVE after succesful recovery", e);
- successfulRecovery = false;
- }
-
- if (successfulRecovery) {
- close = true;
- recoveryListener.recovered();
- }
- }
- }
-
- if (!successfulRecovery) {
- // lets pause for a moment and we need to try again...
- // TODO: we don't want to retry for some problems?
- // Or do a fall off retry...
- try {
-
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break;
- }
-
- LOG.error("Recovery failed - trying again... (" + retries + ")");
-
- retries++;
- if (retries >= MAX_RETRIES) {
- SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
- try {
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
- } catch (Exception e) {
- SolrException.log(LOG, "Could not publish that recovery failed", e);
- }
- break;
- }
- } catch (Exception e) {
- SolrException.log(LOG, "An error has occurred during recovery", e);
- }
-
- try {
- // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
- // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
- // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
- // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
- double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
- LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
- for (int i = 0; i < loopCount; i++) {
- if (isClosed()) {
- LOG.info("RecoveryStrategy has been closed");
- break; // check if someone closed us
- }
- Thread.sleep(STARTING_RECOVERY_DELAY);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Recovery was interrupted.", e);
- close = true;
- }
- }
-
- }
-
- // if replay was skipped (possibly to due pulling a full index from the leader),
- // then we still need to update version bucket seeds after recovery
- if (successfulRecovery && replayFuture == null) {
- LOG.info("Updating version bucket highest from index after successful recovery.");
- core.seedVersionBuckets();
- }
-
- LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
- }
-
- private Future<RecoveryInfo> replay(SolrCore core)
- throws InterruptedException, ExecutionException {
- Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
- if (future == null) {
- // no replay needed\
- LOG.info("No replay needed.");
- } else {
- LOG.info("Replaying buffered documents.");
- // wait for replay
- RecoveryInfo report = future.get();
- if (report.failed) {
- SolrException.log(LOG, "Replay failed");
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
- }
- }
-
- // solrcloud_debug
- cloudDebugLog(core, "replayed");
-
- return future;
- }
-
- private void cloudDebugLog(SolrCore core, String op) {
- if (!LOG.isDebugEnabled()) {
- return;
- }
- try {
- RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
- SolrIndexSearcher searcher = searchHolder.get();
- try {
- final int totalHits = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
- final String nodeName = core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName();
- LOG.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
- } finally {
- searchHolder.decref();
- }
- } catch (Exception e) {
- LOG.debug("Error in solrcloud_debug block", e);
- }
- }
-
- public boolean isClosed() {
- return close;
- }
-
- private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
- throws SolrServerException, IOException, InterruptedException, ExecutionException {
-
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
- client.setConnectionTimeout(30000);
- WaitForState prepCmd = new WaitForState();
- prepCmd.setCoreName(leaderCoreName);
- prepCmd.setNodeName(zkController.getNodeName());
- prepCmd.setCoreNodeName(coreZkNodeName);
- prepCmd.setState(Replica.State.RECOVERING);
- prepCmd.setCheckLive(true);
- prepCmd.setOnlyIfLeader(true);
- final Slice.State state = slice.getState();
- if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY) {
- prepCmd.setOnlyIfLeaderActive(true);
- }
- HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
- prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
-
- LOG.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
-
- mrr.future.get();
- }
- }
-
+ public abstract void close();
}
[4/4] lucene-solr:jira/solr-9045: SOLR-9045: configurable
RecoveryStrategy support
Posted by cp...@apache.org.
SOLR-9045: configurable RecoveryStrategy support
objectives:
* To allow users to change RecoveryStrategy settings such as maxRetries and startingRecoveryDelay.
* To support configuration of a custom recovery strategy.
patch summary:
* RecoveryStrategy turned to DefaultRecoveryStrategy extending abstract RecoveryStrategy base class
* DefaultRecoveryStrategy hard-coded settings exposed via getters/setters
* DefaultRecoveryStrategyFactory extends abstract RecoveryStrategyFactory
* solrconfig.xml now supports an optional <recoveryStrategyFactory class="..."> element
(absence of the new element preserves existing behaviour)
* CustomRecoveryStrategyFactoryTest using solrconfig-customrecoverystrategyfactory.xml
illustrative solrconfig.xml snippet:
<recoveryStrategyFactory class="MyCustomRecoveryStrategyFactory">
<int name="maxRetries">250</int> <!-- DefaultRecoveryStrategy's default is 500. -->
<str name="settingUsedByCustomBehaviour"></str>
</recoveryStrategyFactory>
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e3d5a192
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e3d5a192
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e3d5a192
Branch: refs/heads/jira/solr-9045
Commit: e3d5a1925bf3cd3feaff6e7694ffa05da6ce64aa
Parents: 928763b
Author: Christine Poerschke <cp...@apache.org>
Authored: Tue Apr 19 17:13:47 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 18:10:03 2016 +0100
----------------------------------------------------------------------
.../solr/cloud/DefaultRecoveryStrategy.java | 41 +++++++--
.../cloud/DefaultRecoveryStrategyFactory.java | 33 ++++++++
.../org/apache/solr/cloud/RecoveryStrategy.java | 1 +
.../solr/cloud/RecoveryStrategyFactory.java | 47 +++++++++++
.../java/org/apache/solr/core/SolrConfig.java | 2 +
.../src/java/org/apache/solr/core/SolrCore.java | 21 ++++-
.../solr/update/DefaultSolrCoreState.java | 17 +++-
.../org/apache/solr/update/SolrCoreState.java | 6 ++
...solrconfig-customrecoverystrategyfactory.xml | 28 ++++++
.../core/CustomRecoveryStrategyFactoryTest.java | 89 ++++++++++++++++++++
10 files changed, 277 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
index 60d2931..fa77d25 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -68,9 +68,9 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
- private static final int MAX_RETRIES = 500;
- private static final int STARTING_RECOVERY_DELAY = 5000;
+ private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
+ private int maxRetries = 500;
+ private int startingRecoveryDelayMilliSeconds = 5000;
private volatile boolean close = false;
@@ -97,6 +97,35 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
}
+ public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
+ return waitForUpdatesWithStaleStatePauseMilliSeconds;
+ }
+
+ public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
+ this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public void setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ public int getStartingRecoveryDelayMilliSeconds() {
+ return startingRecoveryDelayMilliSeconds;
+ }
+
+ public void setStartingRecoveryDelayMilliSeconds(int startingRecoveryDelayMilliSeconds) {
+ this.startingRecoveryDelayMilliSeconds = startingRecoveryDelayMilliSeconds;
+ }
+
+ @Override
+ public boolean getRecoveringAfterStartup() {
+ return recoveringAfterStartup;
+ }
+
@Override
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
this.recoveringAfterStartup = recoveringAfterStartup;
@@ -357,7 +386,7 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
// are sure to have finished (see SOLR-7141 for
// discussion around current value)
try {
- Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
+ Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -476,7 +505,7 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
LOG.error("Recovery failed - trying again... (" + retries + ")");
retries++;
- if (retries >= MAX_RETRIES) {
+ if (retries >= maxRetries) {
SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
@@ -501,7 +530,7 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
LOG.info("RecoveryStrategy has been closed");
break; // check if someone closed us
}
- Thread.sleep(STARTING_RECOVERY_DELAY);
+ Thread.sleep(startingRecoveryDelayMilliSeconds);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java
new file mode 100644
index 0000000..af15c1c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * A factory for creating a {@link DefaultRecoveryStrategy}.
+ */
+public class DefaultRecoveryStrategyFactory extends RecoveryStrategyFactory {
+
+ @Override
+ public RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+ RecoveryStrategy.RecoveryListener recoveryListener) {
+ return new DefaultRecoveryStrategy(cc, cd, recoveryListener);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d855a6d..5b18db2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -25,6 +25,7 @@ public abstract class RecoveryStrategy extends Thread implements Closeable {
public void failed();
}
+ public abstract boolean getRecoveringAfterStartup();
public abstract void setRecoveringAfterStartup(boolean recoveringAfterStartup);
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java
new file mode 100644
index 0000000..3748255
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.util.SolrPluginUtils;
+import org.apache.solr.util.plugin.NamedListInitializedPlugin;
+
+/**
+ * A factory for creating a {@link RecoveryStrategy}.
+ */
+public abstract class RecoveryStrategyFactory implements NamedListInitializedPlugin {
+
+ private NamedList args;
+
+ @Override
+ public void init(NamedList args) {
+ this.args = args;
+ }
+
+ public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
+ RecoveryStrategy.RecoveryListener recoveryListener) {
+ final RecoveryStrategy recoveryStrategy = newRecoveryStrategy(cc, cd, recoveryListener);
+ SolrPluginUtils.invokeSetters(recoveryStrategy, args);
+ return recoveryStrategy;
+ }
+
+ public abstract RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+ RecoveryStrategy.RecoveryListener recoveryListener);
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/core/SolrConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java
index 0b4bac3..adc3e0b 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.Version;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -330,6 +331,7 @@ public class SolrConfig extends Config implements MapSerializable {
.add(new SolrPluginInfo(SolrEventListener.class, "//listener", REQUIRE_CLASS, MULTI_OK, REQUIRE_NAME_IN_OVERLAY))
.add(new SolrPluginInfo(DirectoryFactory.class, "directoryFactory", REQUIRE_CLASS))
+ .add(new SolrPluginInfo(RecoveryStrategyFactory.class, "recoveryStrategyFactory", REQUIRE_CLASS))
.add(new SolrPluginInfo(IndexDeletionPolicy.class, "indexConfig/deletionPolicy", REQUIRE_CLASS))
.add(new SolrPluginInfo(CodecFactory.class, "codecFactory", REQUIRE_CLASS))
.add(new SolrPluginInfo(IndexReaderFactory.class, "indexReaderFactory", REQUIRE_CLASS))
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index b94b3d8..462ec9c 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -52,6 +52,8 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.DefaultRecoveryStrategyFactory;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -158,6 +160,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
private final Map<String, SolrInfoMBean> infoRegistry;
private final IndexDeletionPolicyWrapper solrDelPolicy;
private final DirectoryFactory directoryFactory;
+ private final RecoveryStrategyFactory recoveryStrategyFactory;
private IndexReaderFactory indexReaderFactory;
private final Codec codec;
private final MemClassLoader memClassLoader;
@@ -493,6 +496,20 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
return dirFactory;
}
+ private RecoveryStrategyFactory initRecoveryStrategyFactory() {
+ final PluginInfo info = solrConfig.getPluginInfo(RecoveryStrategyFactory.class.getName());
+ final RecoveryStrategyFactory rsFactory;
+ if (info != null) {
+ log.info(info.className);
+ rsFactory = getResourceLoader().newInstance(info.className, RecoveryStrategyFactory.class);
+ rsFactory.init(info.initArgs);
+ } else {
+ log.info("solr.RecoveryStrategyFactory");
+ rsFactory = new DefaultRecoveryStrategyFactory();
+ }
+ return rsFactory;
+ }
+
private void initIndexReaderFactory() {
IndexReaderFactory indexReaderFactory;
PluginInfo info = solrConfig.getPluginInfo(IndexReaderFactory.class.getName());
@@ -681,10 +698,12 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
if (updateHandler == null) {
directoryFactory = initDirectoryFactory();
- solrCoreState = new DefaultSolrCoreState(directoryFactory);
+ recoveryStrategyFactory = initRecoveryStrategyFactory();
+ solrCoreState = new DefaultSolrCoreState(directoryFactory, recoveryStrategyFactory);
} else {
solrCoreState = updateHandler.getSolrCoreState();
directoryFactory = solrCoreState.getDirectoryFactory();
+ recoveryStrategyFactory = solrCoreState.getRecoveryStrategyFactory();
isReloaded = true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 8eab83f..2695cd7 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -32,7 +32,9 @@ import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SortingMergePolicy;
import org.apache.lucene.search.Sort;
import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.DefaultRecoveryStrategyFactory;
import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.core.CoreContainer;
@@ -63,6 +65,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
+ private final RecoveryStrategyFactory recoveryStrategyFactory;
private volatile RecoveryStrategy recoveryStrat;
@@ -76,8 +79,15 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
protected final ReentrantLock commitLock = new ReentrantLock();
+ @Deprecated
public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
+ this(directoryFactory, new DefaultRecoveryStrategyFactory());
+ }
+
+ public DefaultSolrCoreState(DirectoryFactory directoryFactory,
+ RecoveryStrategyFactory recoveryStrategyFactory) {
this.directoryFactory = directoryFactory;
+ this.recoveryStrategyFactory = recoveryStrategyFactory;
}
private void closeIndexWriter(IndexWriterCloser closer) {
@@ -263,6 +273,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
@Override
+ public RecoveryStrategyFactory getRecoveryStrategyFactory() {
+ return recoveryStrategyFactory;
+ }
+
+ @Override
public void doRecovery(CoreContainer cc, CoreDescriptor cd) {
Thread thread = new Thread() {
@@ -310,7 +325,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
recoveryThrottle.minimumWaitBetweenActions();
recoveryThrottle.markAttemptingAction();
- recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
+ recoveryStrat = recoveryStrategyFactory.create(cc, cd, DefaultSolrCoreState.this);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
Future<?> future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 89e286a..25ba94a 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Sort;
import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
@@ -138,6 +139,11 @@ public abstract class SolrCoreState {
*/
public abstract DirectoryFactory getDirectoryFactory();
+ /**
+ * @return the {@link RecoveryStrategyFactory} that should be used.
+ */
+ public abstract RecoveryStrategyFactory getRecoveryStrategyFactory();
+
public interface IndexWriterCloser {
void closeWriter(IndexWriter writer) throws IOException;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml
new file mode 100644
index 0000000..1d0a93c
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+ <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+ <requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
+ <recoveryStrategyFactory class="org.apache.solr.core.CustomRecoveryStrategyFactoryTest$CustomRecoveryStrategyFactory">
+ <int name="customParameter">42</int>
+ </recoveryStrategyFactory>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+</config>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java b/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java
new file mode 100644
index 0000000..01146a4
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
+import org.junit.BeforeClass;
+
+/**
+ * test that configs can override the RecoveryStrategyFactory
+ */
+public class CustomRecoveryStrategyFactoryTest extends SolrTestCaseJ4 {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ initCore("solrconfig-customrecoverystrategyfactory.xml", "schema.xml");
+ }
+
+ public void testFactory() throws Exception {
+ final RecoveryStrategyFactory recoveryStrategyFactory =
+ h.getCore().getSolrCoreState().getRecoveryStrategyFactory();
+ assertNotNull("recoveryStrategyFactory is null", recoveryStrategyFactory);
+ assertEquals("recoveryStrategyFactory is wrong class (name)",
+ CustomRecoveryStrategyFactoryTest.CustomRecoveryStrategyFactory.class.getName(),
+ recoveryStrategyFactory.getClass().getName());
+ assertTrue("recoveryStrategyFactory is wrong class (instanceof)",
+ recoveryStrategyFactory instanceof CustomRecoveryStrategyFactory);
+ final CustomRecoveryStrategyFactory customRecoveryStrategyFactory =
+ (CustomRecoveryStrategyFactory)recoveryStrategyFactory;
+}
+
+ public void testCreate() throws Exception {
+ final RecoveryStrategyFactory recoveryStrategyFactory =
+ h.getCore().getSolrCoreState().getRecoveryStrategyFactory();
+ assertNotNull("recoveryStrategyFactory is null", recoveryStrategyFactory);
+
+ final RecoveryStrategy recoveryStrategy =
+ recoveryStrategyFactory.create(null, null, null);
+
+ assertEquals("recoveryStrategy is wrong class (name)",
+ CustomRecoveryStrategyFactoryTest.CustomRecoveryStrategy.class.getName(),
+ recoveryStrategy.getClass().getName());
+ assertTrue("recoveryStrategy is wrong class (instanceof)",
+ recoveryStrategy instanceof CustomRecoveryStrategy);
+
+ final CustomRecoveryStrategy customRecoveryStrategy =
+ (CustomRecoveryStrategy)recoveryStrategy;
+ assertEquals(42, customRecoveryStrategy.getCustomParameter());
+}
+
+ static public class CustomRecoveryStrategy extends RecoveryStrategy {
+ public CustomRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+ RecoveryListener recoveryListener) {
+ }
+ private int customParameter = random().nextInt();
+ public int getCustomParameter() { return customParameter; }
+ public void setCustomParameter(int customParameter) { this.customParameter = customParameter; }
+ @Override
+ public boolean getRecoveringAfterStartup() { return false; }
+ @Override
+ public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {}
+ @Override
+ public void close() {}
+ }
+
+ static public class CustomRecoveryStrategyFactory extends RecoveryStrategyFactory {
+ @Override
+ public RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+ RecoveryStrategy.RecoveryListener recoveryListener) {
+ return new CustomRecoveryStrategy(cc, cd, recoveryListener);
+ }
+ }
+
+}