You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2016/04/27 19:11:09 UTC

[1/4] lucene-solr:jira/solr-9045: * Add DefaultRecoveryStrategy.java as clone of RecoveryStrategy.java (prep for factoring out an abstract RecoveryStrategy base class).

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-9045 [created] e3d5a1925


* Add DefaultRecoveryStrategy.java as clone of RecoveryStrategy.java (prep for factoring out an abstract RecoveryStrategy base class).


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/85ed896b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/85ed896b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/85ed896b

Branch: refs/heads/jira/solr-9045
Commit: 85ed896b095dcf8576d1b93827d809385e6c779d
Parents: ebd1204
Author: Christine Poerschke <cp...@apache.org>
Authored: Wed Apr 27 16:30:00 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 17:18:05 2016 +0100

----------------------------------------------------------------------
 .../solr/cloud/DefaultRecoveryStrategy.java     | 597 +++++++++++++++++++
 1 file changed, 597 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/85ed896b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
new file mode 100644
index 0000000..b1a8040
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory.DirContext;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.PeerSync;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.util.RefCounted;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultRecoveryStrategy extends Thread implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
+  private static final int MAX_RETRIES = 500;
+  private static final int STARTING_RECOVERY_DELAY = 5000;
+
+  public static interface RecoveryListener {
+    public void recovered();
+    public void failed();
+  }
+  
+  private volatile boolean close = false;
+
+  private RecoveryListener recoveryListener;
+  private ZkController zkController;
+  private String baseUrl;
+  private String coreZkNodeName;
+  private ZkStateReader zkStateReader;
+  private volatile String coreName;
+  private int retries;
+  private boolean recoveringAfterStartup;
+  private CoreContainer cc;
+  private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
+  
+  // this should only be used from SolrCoreState
+  public DefaultRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
+    this.cc = cc;
+    this.coreName = cd.getName();
+    this.recoveryListener = recoveryListener;
+    setName("RecoveryThread-"+this.coreName);
+    zkController = cc.getZkController();
+    zkStateReader = zkController.getZkStateReader();
+    baseUrl = zkController.getBaseUrl();
+    coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
+  }
+
+  public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
+    this.recoveringAfterStartup = recoveringAfterStartup;
+  }
+
+  // make sure any threads stop retrying
+  @Override
+  public void close() {
+    close = true;
+    if (prevSendPreRecoveryHttpUriRequest != null) {
+      prevSendPreRecoveryHttpUriRequest.abort();
+    }
+    LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
+  }
+
+  private void recoveryFailed(final SolrCore core,
+      final ZkController zkController, final String baseUrl,
+      final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
+    SolrException.log(LOG, "Recovery failed - I give up.");
+    try {
+      zkController.publish(cd, Replica.State.RECOVERY_FAILED);
+    } finally {
+      close();
+      recoveryListener.failed();
+    }
+  }
+  
+  private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
+      throws SolrServerException, IOException {
+
+    ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
+    String leaderUrl = leaderCNodeProps.getCoreUrl();
+    
+    LOG.info("Attempting to replicate from [{}].", leaderUrl);
+    
+    // send commit
+    commitOnLeader(leaderUrl);
+    
+    // use rep handler directly, so we can do this sync rather than async
+    SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+    ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+    
+    if (replicationHandler == null) {
+      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+          "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
+    }
+    
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
+    
+    if (isClosed()) return; // we check closed on return
+    boolean success = replicationHandler.doFetch(solrParams, false);
+    
+    if (!success) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
+    }
+    
+    // solrcloud_debug
+    if (LOG.isDebugEnabled()) {
+      try {
+        RefCounted<SolrIndexSearcher> searchHolder = core
+            .getNewestSearcher(false);
+        SolrIndexSearcher searcher = searchHolder.get();
+        Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
+        try {
+          LOG.debug(core.getCoreDescriptor().getCoreContainer()
+              .getZkController().getNodeName()
+              + " replicated "
+              + searcher.search(new MatchAllDocsQuery(), 1).totalHits
+              + " from "
+              + leaderUrl
+              + " gen:"
+              + core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration()
+              + " data:" + core.getDataDir()
+              + " index:" + core.getIndexDir()
+              + " newIndex:" + core.getNewIndexDir()
+              + " files:" + Arrays.asList(dir.listAll()));
+        } finally {
+          core.getDirectoryFactory().release(dir);
+          searchHolder.decref();
+        }
+      } catch (Exception e) {
+        LOG.debug("Error in solrcloud_debug block", e);
+      }
+    }
+
+  }
+
+  private void commitOnLeader(String leaderUrl) throws SolrServerException,
+      IOException {
+    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
+      client.setConnectionTimeout(30000);
+      UpdateRequest ureq = new UpdateRequest();
+      ureq.setParams(new ModifiableSolrParams());
+      ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
+          client);
+    }
+  }
+
+  @Override
+  public void run() {
+
+    // set request info for logging
+    try (SolrCore core = cc.getCore(coreName)) {
+
+      if (core == null) {
+        SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName);
+        return;
+      }
+      MDCLoggingContext.setCore(core);
+
+      LOG.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
+
+      try {
+        doRecovery(core);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        SolrException.log(LOG, "", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      } catch (Exception e) {
+        LOG.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      }
+    } finally {
+      MDCLoggingContext.clear();
+    }
+  }
+
+  // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
+  public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+    boolean replayed = false;
+    boolean successfulRecovery = false;
+
+    UpdateLog ulog;
+    ulog = core.getUpdateHandler().getUpdateLog();
+    if (ulog == null) {
+      SolrException.log(LOG, "No UpdateLog found - cannot recover.");
+      recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+          core.getCoreDescriptor());
+      return;
+    }
+
+    boolean firstTime = true;
+
+    List<Long> recentVersions;
+    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
+      recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
+    } catch (Exception e) {
+      SolrException.log(LOG, "Corrupt tlog - ignoring.", e);
+      recentVersions = new ArrayList<>(0);
+    }
+
+    List<Long> startingVersions = ulog.getStartingVersions();
+
+    if (startingVersions != null && recoveringAfterStartup) {
+      try {
+        int oldIdx = 0; // index of the start of the old list in the current list
+        long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
+        
+        for (; oldIdx < recentVersions.size(); oldIdx++) {
+          if (recentVersions.get(oldIdx) == firstStartingVersion) break;
+        }
+        
+        if (oldIdx > 0) {
+          LOG.info("####### Found new versions added after startup: num=[{}]", oldIdx);
+          LOG.info("###### currentVersions=[{}]",recentVersions);
+        }
+        
+        LOG.info("###### startupVersions=[{}]", startingVersions);
+      } catch (Exception e) {
+        SolrException.log(LOG, "Error getting recent versions.", e);
+        recentVersions = new ArrayList<>(0);
+      }
+    }
+
+    if (recoveringAfterStartup) {
+      // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
+      // when we went down.  We may have received updates since then.
+      recentVersions = startingVersions;
+      try {
+        if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
+          // last operation at the time of startup had the GAP flag set...
+          // this means we were previously doing a full index replication
+          // that probably didn't complete and buffering updates in the
+          // meantime.
+          LOG.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
+          firstTime = false; // skip peersync
+        }
+      } catch (Exception e) {
+        SolrException.log(LOG, "Error trying to get ulog starting operation.", e);
+        firstTime = false; // skip peersync
+      }
+    }
+
+    Future<RecoveryInfo> replayFuture = null;
+    while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+      try {
+        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
+            cloudDesc.getCollectionName(), cloudDesc.getShardId());
+      
+        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+
+        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
+
+        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+
+        boolean isLeader = leaderUrl.equals(ourUrl);
+        if (isLeader && !cloudDesc.isLeader()) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+        }
+        if (cloudDesc.isLeader()) {
+          // we are now the leader - no one else must have been suitable
+          LOG.warn("We have not yet recovered - but we are now the leader!");
+          LOG.info("Finished recovery process.");
+          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+          return;
+        }
+        
+        LOG.info("Begin buffering updates. core=[{}]", coreName);
+        ulog.bufferUpdates();
+        replayed = false;
+        
+        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+            ourUrl);
+        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+        
+        
+        final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(),
+            cloudDesc.getShardId());
+            
+        try {
+          prevSendPreRecoveryHttpUriRequest.abort();
+        } catch (NullPointerException e) {
+          // okay
+        }
+        
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
+
+        sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+        
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
+        
+        // we wait a bit so that any updates on the leader
+        // that started before they saw recovering state 
+        // are sure to have finished (see SOLR-7141 for
+        // discussion around current value)
+        try {
+          Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+
+        // first thing we just try to sync
+        if (firstTime) {
+          firstTime = false; // only try sync the first time through the loop
+          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+          // System.out.println("Attempting to PeerSync from " + leaderUrl
+          // + " i am:" + zkController.getNodeName());
+          PeerSync peerSync = new PeerSync(core,
+              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+          peerSync.setStartingVersions(recentVersions);
+          boolean syncSuccess = peerSync.sync();
+          if (syncSuccess) {
+            SolrQueryRequest req = new LocalSolrQueryRequest(core,
+                new ModifiableSolrParams());
+            // force open a new searcher
+            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+            LOG.info("PeerSync stage of recovery was successful.");
+
+            // solrcloud_debug
+            cloudDebugLog(core, "synced");
+            
+            LOG.info("Replaying updates buffered during PeerSync.");
+            replay(core);
+            replayed = true;
+            
+            // sync success
+            successfulRecovery = true;
+            return;
+          }
+
+          LOG.info("PeerSync Recovery was not successful - trying replication.");
+        }
+
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
+        
+        LOG.info("Starting Replication Recovery.");
+
+        try {
+
+          replicate(zkController.getNodeName(), core, leaderprops);
+
+          if (isClosed()) {
+            LOG.info("RecoveryStrategy has been closed");
+            break;
+          }
+
+          replayFuture = replay(core);
+          replayed = true;
+          
+          if (isClosed()) {
+            LOG.info("RecoveryStrategy has been closed");
+            break;
+          }
+
+          LOG.info("Replication Recovery was successful.");
+          successfulRecovery = true;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          LOG.warn("Recovery was interrupted", e);
+          close = true;
+        } catch (Exception e) {
+          SolrException.log(LOG, "Error while trying to recover", e);
+        }
+
+      } catch (Exception e) {
+        SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
+      } finally {
+        if (!replayed) {
+          // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
+          // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
+          // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
+          // reset our starting point for playback.
+          LOG.info("Replay not started, or was not successful... still buffering updates.");
+
+          /** this prev code is retained in case we want to switch strategies.
+          try {
+            ulog.dropBufferedUpdates();
+          } catch (Exception e) {
+            SolrException.log(log, "", e);
+          }
+          **/
+        }
+        if (successfulRecovery) {
+          LOG.info("Registering as Active after recovery.");
+          try {
+            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+          } catch (Exception e) {
+            LOG.error("Could not publish as ACTIVE after succesful recovery", e);
+            successfulRecovery = false;
+          }
+          
+          if (successfulRecovery) {
+            close = true;
+            recoveryListener.recovered();
+          }
+        }
+      }
+
+      if (!successfulRecovery) {
+        // lets pause for a moment and we need to try again...
+        // TODO: we don't want to retry for some problems?
+        // Or do a fall off retry...
+        try {
+
+          if (isClosed()) {
+            LOG.info("RecoveryStrategy has been closed");
+            break;
+          }
+          
+          LOG.error("Recovery failed - trying again... (" + retries + ")");
+          
+          retries++;
+          if (retries >= MAX_RETRIES) {
+            SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
+            try {
+              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+            } catch (Exception e) {
+              SolrException.log(LOG, "Could not publish that recovery failed", e);
+            }
+            break;
+          }
+        } catch (Exception e) {
+          SolrException.log(LOG, "An error has occurred during recovery", e);
+        }
+
+        try {
+          // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
+          // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result 
+          // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
+          // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
+          double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
+          LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+          for (int i = 0; i < loopCount; i++) {
+            if (isClosed()) {
+              LOG.info("RecoveryStrategy has been closed");
+              break; // check if someone closed us
+            }
+            Thread.sleep(STARTING_RECOVERY_DELAY);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          LOG.warn("Recovery was interrupted.", e);
+          close = true;
+        }
+      }
+
+    }
+
+    // if replay was skipped (possibly to due pulling a full index from the leader),
+    // then we still need to update version bucket seeds after recovery
+    if (successfulRecovery && replayFuture == null) {
+      LOG.info("Updating version bucket highest from index after successful recovery.");
+      core.seedVersionBuckets();
+    }
+
+    LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
+  }
+
+  private Future<RecoveryInfo> replay(SolrCore core)
+      throws InterruptedException, ExecutionException {
+    Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
+    if (future == null) {
+      // no replay needed\
+      LOG.info("No replay needed.");
+    } else {
+      LOG.info("Replaying buffered documents.");
+      // wait for replay
+      RecoveryInfo report = future.get();
+      if (report.failed) {
+        SolrException.log(LOG, "Replay failed");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
+      }
+    }
+    
+    // solrcloud_debug
+    cloudDebugLog(core, "replayed");
+    
+    return future;
+  }
+  
+  private void cloudDebugLog(SolrCore core, String op) {
+    if (!LOG.isDebugEnabled()) {
+      return;
+    }
+    try {
+      RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+      SolrIndexSearcher searcher = searchHolder.get();
+      try {
+        final int totalHits = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
+        final String nodeName = core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName();
+        LOG.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
+      } finally {
+        searchHolder.decref();
+      }
+    } catch (Exception e) {
+      LOG.debug("Error in solrcloud_debug block", e);
+    }
+  }
+
+  public boolean isClosed() {
+    return close;
+  }
+  
+  private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
+      throws SolrServerException, IOException, InterruptedException, ExecutionException {
+
+    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
+      client.setConnectionTimeout(30000);
+      WaitForState prepCmd = new WaitForState();
+      prepCmd.setCoreName(leaderCoreName);
+      prepCmd.setNodeName(zkController.getNodeName());
+      prepCmd.setCoreNodeName(coreZkNodeName);
+      prepCmd.setState(Replica.State.RECOVERING);
+      prepCmd.setCheckLive(true);
+      prepCmd.setOnlyIfLeader(true);
+      final Slice.State state = slice.getState();
+      if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY) {
+        prepCmd.setOnlyIfLeaderActive(true);
+      }
+      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
+      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+      
+      LOG.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
+      
+      mrr.future.get();
+    }
+  }
+
+}


[3/4] lucene-solr:jira/solr-9045: * Factor out getReplicateLeaderUrl protected method in DefaultRecoveryStrategy.

Posted by cp...@apache.org.
* Factor out getReplicateLeaderUrl protected method in DefaultRecoveryStrategy.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/928763bc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/928763bc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/928763bc

Branch: refs/heads/jira/solr-9045
Commit: 928763bc5cddf3b117f02a9cb8096d2d5799df5c
Parents: 3cd0428
Author: Christine Poerschke <cp...@apache.org>
Authored: Wed Apr 20 17:38:59 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 17:18:06 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/solr/cloud/DefaultRecoveryStrategy.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/928763bc/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
index a115595..60d2931 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -124,11 +124,14 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
     }
   }
   
+  protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
+    return new ZkCoreNodeProps(leaderprops).getCoreUrl();
+  }
+
   private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
       throws SolrServerException, IOException {
 
-    ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
-    String leaderUrl = leaderCNodeProps.getCoreUrl();
+    final String leaderUrl = getReplicateLeaderUrl(leaderprops);
     
     LOG.info("Attempting to replicate from [{}].", leaderUrl);
     


[2/4] lucene-solr:jira/solr-9045: * Turn RecoveryStrategy into abstract base class.

Posted by cp...@apache.org.
* Turn RecoveryStrategy into abstract base class.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3cd04289
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3cd04289
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3cd04289

Branch: refs/heads/jira/solr-9045
Commit: 3cd04289be9c02f7c55c538126583fba96970fab
Parents: 85ed896
Author: Christine Poerschke <cp...@apache.org>
Authored: Wed Apr 27 16:39:50 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 17:18:06 2016 +0100

----------------------------------------------------------------------
 .../solr/cloud/DefaultRecoveryStrategy.java     |  13 +-
 .../org/apache/solr/cloud/RecoveryStrategy.java | 571 +------------------
 2 files changed, 7 insertions(+), 577 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3cd04289/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
index b1a8040..a115595 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.cloud;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -65,7 +64,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultRecoveryStrategy extends Thread implements Closeable {
+public class DefaultRecoveryStrategy extends RecoveryStrategy {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -73,14 +72,9 @@ public class DefaultRecoveryStrategy extends Thread implements Closeable {
   private static final int MAX_RETRIES = 500;
   private static final int STARTING_RECOVERY_DELAY = 5000;
 
-  public static interface RecoveryListener {
-    public void recovered();
-    public void failed();
-  }
-  
   private volatile boolean close = false;
 
-  private RecoveryListener recoveryListener;
+  private RecoveryStrategy.RecoveryListener recoveryListener;
   private ZkController zkController;
   private String baseUrl;
   private String coreZkNodeName;
@@ -92,7 +86,7 @@ public class DefaultRecoveryStrategy extends Thread implements Closeable {
   private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
   
   // this should only be used from SolrCoreState
-  public DefaultRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
+  public DefaultRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryStrategy.RecoveryListener recoveryListener) {
     this.cc = cc;
     this.coreName = cd.getName();
     this.recoveryListener = recoveryListener;
@@ -103,6 +97,7 @@ public class DefaultRecoveryStrategy extends Thread implements Closeable {
     coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
   }
 
+  @Override
   public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
     this.recoveringAfterStartup = recoveringAfterStartup;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3cd04289/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index abd00ae..d855a6d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -17,581 +17,16 @@
 package org.apache.solr.cloud;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.DirectoryFactory.DirContext;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.PeerSync;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.RefCounted;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RecoveryStrategy extends Thread implements Closeable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
-  private static final int MAX_RETRIES = 500;
-  private static final int STARTING_RECOVERY_DELAY = 5000;
+public abstract class RecoveryStrategy extends Thread implements Closeable {
 
   public static interface RecoveryListener {
     public void recovered();
     public void failed();
   }
-  
-  private volatile boolean close = false;
-
-  private RecoveryListener recoveryListener;
-  private ZkController zkController;
-  private String baseUrl;
-  private String coreZkNodeName;
-  private ZkStateReader zkStateReader;
-  private volatile String coreName;
-  private int retries;
-  private boolean recoveringAfterStartup;
-  private CoreContainer cc;
-  private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-  
-  // this should only be used from SolrCoreState
-  public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
-    this.cc = cc;
-    this.coreName = cd.getName();
-    this.recoveryListener = recoveryListener;
-    setName("RecoveryThread-"+this.coreName);
-    zkController = cc.getZkController();
-    zkStateReader = zkController.getZkStateReader();
-    baseUrl = zkController.getBaseUrl();
-    coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
-  }
-
-  public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
-    this.recoveringAfterStartup = recoveringAfterStartup;
-  }
-
-  // make sure any threads stop retrying
-  @Override
-  public void close() {
-    close = true;
-    if (prevSendPreRecoveryHttpUriRequest != null) {
-      prevSendPreRecoveryHttpUriRequest.abort();
-    }
-    LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
-  }
-
-  private void recoveryFailed(final SolrCore core,
-      final ZkController zkController, final String baseUrl,
-      final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
-    SolrException.log(LOG, "Recovery failed - I give up.");
-    try {
-      zkController.publish(cd, Replica.State.RECOVERY_FAILED);
-    } finally {
-      close();
-      recoveryListener.failed();
-    }
-  }
-  
-  private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
-      throws SolrServerException, IOException {
-
-    ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
-    String leaderUrl = leaderCNodeProps.getCoreUrl();
-    
-    LOG.info("Attempting to replicate from [{}].", leaderUrl);
-    
-    // send commit
-    commitOnLeader(leaderUrl);
-    
-    // use rep handler directly, so we can do this sync rather than async
-    SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
-    ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-    
-    if (replicationHandler == null) {
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-          "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
-    }
-    
-    ModifiableSolrParams solrParams = new ModifiableSolrParams();
-    solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
-    
-    if (isClosed()) return; // we check closed on return
-    boolean success = replicationHandler.doFetch(solrParams, false);
-    
-    if (!success) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
-    }
-    
-    // solrcloud_debug
-    if (LOG.isDebugEnabled()) {
-      try {
-        RefCounted<SolrIndexSearcher> searchHolder = core
-            .getNewestSearcher(false);
-        SolrIndexSearcher searcher = searchHolder.get();
-        Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
-        try {
-          LOG.debug(core.getCoreDescriptor().getCoreContainer()
-              .getZkController().getNodeName()
-              + " replicated "
-              + searcher.search(new MatchAllDocsQuery(), 1).totalHits
-              + " from "
-              + leaderUrl
-              + " gen:"
-              + core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration()
-              + " data:" + core.getDataDir()
-              + " index:" + core.getIndexDir()
-              + " newIndex:" + core.getNewIndexDir()
-              + " files:" + Arrays.asList(dir.listAll()));
-        } finally {
-          core.getDirectoryFactory().release(dir);
-          searchHolder.decref();
-        }
-      } catch (Exception e) {
-        LOG.debug("Error in solrcloud_debug block", e);
-      }
-    }
-
-  }
 
-  private void commitOnLeader(String leaderUrl) throws SolrServerException,
-      IOException {
-    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
-      client.setConnectionTimeout(30000);
-      UpdateRequest ureq = new UpdateRequest();
-      ureq.setParams(new ModifiableSolrParams());
-      ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
-      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
-          client);
-    }
-  }
+  public abstract void setRecoveringAfterStartup(boolean recoveringAfterStartup);
 
   @Override
-  public void run() {
-
-    // set request info for logging
-    try (SolrCore core = cc.getCore(coreName)) {
-
-      if (core == null) {
-        SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName);
-        return;
-      }
-      MDCLoggingContext.setCore(core);
-
-      LOG.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
-
-      try {
-        doRecovery(core);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        SolrException.log(LOG, "", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-      } catch (Exception e) {
-        LOG.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-      }
-    } finally {
-      MDCLoggingContext.clear();
-    }
-  }
-
-  // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
-  public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
-    boolean replayed = false;
-    boolean successfulRecovery = false;
-
-    UpdateLog ulog;
-    ulog = core.getUpdateHandler().getUpdateLog();
-    if (ulog == null) {
-      SolrException.log(LOG, "No UpdateLog found - cannot recover.");
-      recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
-          core.getCoreDescriptor());
-      return;
-    }
-
-    boolean firstTime = true;
-
-    List<Long> recentVersions;
-    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
-      recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
-    } catch (Exception e) {
-      SolrException.log(LOG, "Corrupt tlog - ignoring.", e);
-      recentVersions = new ArrayList<>(0);
-    }
-
-    List<Long> startingVersions = ulog.getStartingVersions();
-
-    if (startingVersions != null && recoveringAfterStartup) {
-      try {
-        int oldIdx = 0; // index of the start of the old list in the current list
-        long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-        
-        for (; oldIdx < recentVersions.size(); oldIdx++) {
-          if (recentVersions.get(oldIdx) == firstStartingVersion) break;
-        }
-        
-        if (oldIdx > 0) {
-          LOG.info("####### Found new versions added after startup: num=[{}]", oldIdx);
-          LOG.info("###### currentVersions=[{}]",recentVersions);
-        }
-        
-        LOG.info("###### startupVersions=[{}]", startingVersions);
-      } catch (Exception e) {
-        SolrException.log(LOG, "Error getting recent versions.", e);
-        recentVersions = new ArrayList<>(0);
-      }
-    }
-
-    if (recoveringAfterStartup) {
-      // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
-      // when we went down.  We may have received updates since then.
-      recentVersions = startingVersions;
-      try {
-        if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
-          // last operation at the time of startup had the GAP flag set...
-          // this means we were previously doing a full index replication
-          // that probably didn't complete and buffering updates in the
-          // meantime.
-          LOG.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
-          firstTime = false; // skip peersync
-        }
-      } catch (Exception e) {
-        SolrException.log(LOG, "Error trying to get ulog starting operation.", e);
-        firstTime = false; // skip peersync
-      }
-    }
-
-    Future<RecoveryInfo> replayFuture = null;
-    while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
-      try {
-        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      
-        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
-        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-
-        boolean isLeader = leaderUrl.equals(ourUrl);
-        if (isLeader && !cloudDesc.isLeader()) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
-        }
-        if (cloudDesc.isLeader()) {
-          // we are now the leader - no one else must have been suitable
-          LOG.warn("We have not yet recovered - but we are now the leader!");
-          LOG.info("Finished recovery process.");
-          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-          return;
-        }
-        
-        LOG.info("Begin buffering updates. core=[{}]", coreName);
-        ulog.bufferUpdates();
-        replayed = false;
-        
-        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
-            ourUrl);
-        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-        
-        
-        final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(),
-            cloudDesc.getShardId());
-            
-        try {
-          prevSendPreRecoveryHttpUriRequest.abort();
-        } catch (NullPointerException e) {
-          // okay
-        }
-        
-        if (isClosed()) {
-          LOG.info("RecoveryStrategy has been closed");
-          break;
-        }
-
-        sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
-        
-        if (isClosed()) {
-          LOG.info("RecoveryStrategy has been closed");
-          break;
-        }
-        
-        // we wait a bit so that any updates on the leader
-        // that started before they saw recovering state 
-        // are sure to have finished (see SOLR-7141 for
-        // discussion around current value)
-        try {
-          Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-
-        // first thing we just try to sync
-        if (firstTime) {
-          firstTime = false; // only try sync the first time through the loop
-          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
-          // System.out.println("Attempting to PeerSync from " + leaderUrl
-          // + " i am:" + zkController.getNodeName());
-          PeerSync peerSync = new PeerSync(core,
-              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
-          peerSync.setStartingVersions(recentVersions);
-          boolean syncSuccess = peerSync.sync();
-          if (syncSuccess) {
-            SolrQueryRequest req = new LocalSolrQueryRequest(core,
-                new ModifiableSolrParams());
-            // force open a new searcher
-            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
-            LOG.info("PeerSync stage of recovery was successful.");
-
-            // solrcloud_debug
-            cloudDebugLog(core, "synced");
-            
-            LOG.info("Replaying updates buffered during PeerSync.");
-            replay(core);
-            replayed = true;
-            
-            // sync success
-            successfulRecovery = true;
-            return;
-          }
-
-          LOG.info("PeerSync Recovery was not successful - trying replication.");
-        }
-
-        if (isClosed()) {
-          LOG.info("RecoveryStrategy has been closed");
-          break;
-        }
-        
-        LOG.info("Starting Replication Recovery.");
-
-        try {
-
-          replicate(zkController.getNodeName(), core, leaderprops);
-
-          if (isClosed()) {
-            LOG.info("RecoveryStrategy has been closed");
-            break;
-          }
-
-          replayFuture = replay(core);
-          replayed = true;
-          
-          if (isClosed()) {
-            LOG.info("RecoveryStrategy has been closed");
-            break;
-          }
-
-          LOG.info("Replication Recovery was successful.");
-          successfulRecovery = true;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          LOG.warn("Recovery was interrupted", e);
-          close = true;
-        } catch (Exception e) {
-          SolrException.log(LOG, "Error while trying to recover", e);
-        }
-
-      } catch (Exception e) {
-        SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
-      } finally {
-        if (!replayed) {
-          // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
-          // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
-          // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
-          // reset our starting point for playback.
-          LOG.info("Replay not started, or was not successful... still buffering updates.");
-
-          /** this prev code is retained in case we want to switch strategies.
-          try {
-            ulog.dropBufferedUpdates();
-          } catch (Exception e) {
-            SolrException.log(log, "", e);
-          }
-          **/
-        }
-        if (successfulRecovery) {
-          LOG.info("Registering as Active after recovery.");
-          try {
-            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-          } catch (Exception e) {
-            LOG.error("Could not publish as ACTIVE after succesful recovery", e);
-            successfulRecovery = false;
-          }
-          
-          if (successfulRecovery) {
-            close = true;
-            recoveryListener.recovered();
-          }
-        }
-      }
-
-      if (!successfulRecovery) {
-        // lets pause for a moment and we need to try again...
-        // TODO: we don't want to retry for some problems?
-        // Or do a fall off retry...
-        try {
-
-          if (isClosed()) {
-            LOG.info("RecoveryStrategy has been closed");
-            break;
-          }
-          
-          LOG.error("Recovery failed - trying again... (" + retries + ")");
-          
-          retries++;
-          if (retries >= MAX_RETRIES) {
-            SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
-            try {
-              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
-            } catch (Exception e) {
-              SolrException.log(LOG, "Could not publish that recovery failed", e);
-            }
-            break;
-          }
-        } catch (Exception e) {
-          SolrException.log(LOG, "An error has occurred during recovery", e);
-        }
-
-        try {
-          // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
-          // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result 
-          // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
-          // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
-          double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
-          LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
-          for (int i = 0; i < loopCount; i++) {
-            if (isClosed()) {
-              LOG.info("RecoveryStrategy has been closed");
-              break; // check if someone closed us
-            }
-            Thread.sleep(STARTING_RECOVERY_DELAY);
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          LOG.warn("Recovery was interrupted.", e);
-          close = true;
-        }
-      }
-
-    }
-
-    // if replay was skipped (possibly to due pulling a full index from the leader),
-    // then we still need to update version bucket seeds after recovery
-    if (successfulRecovery && replayFuture == null) {
-      LOG.info("Updating version bucket highest from index after successful recovery.");
-      core.seedVersionBuckets();
-    }
-
-    LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
-  }
-
-  private Future<RecoveryInfo> replay(SolrCore core)
-      throws InterruptedException, ExecutionException {
-    Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
-    if (future == null) {
-      // no replay needed\
-      LOG.info("No replay needed.");
-    } else {
-      LOG.info("Replaying buffered documents.");
-      // wait for replay
-      RecoveryInfo report = future.get();
-      if (report.failed) {
-        SolrException.log(LOG, "Replay failed");
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
-      }
-    }
-    
-    // solrcloud_debug
-    cloudDebugLog(core, "replayed");
-    
-    return future;
-  }
-  
-  private void cloudDebugLog(SolrCore core, String op) {
-    if (!LOG.isDebugEnabled()) {
-      return;
-    }
-    try {
-      RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
-      SolrIndexSearcher searcher = searchHolder.get();
-      try {
-        final int totalHits = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
-        final String nodeName = core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName();
-        LOG.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
-      } finally {
-        searchHolder.decref();
-      }
-    } catch (Exception e) {
-      LOG.debug("Error in solrcloud_debug block", e);
-    }
-  }
-
-  public boolean isClosed() {
-    return close;
-  }
-  
-  private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
-      throws SolrServerException, IOException, InterruptedException, ExecutionException {
-
-    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
-      client.setConnectionTimeout(30000);
-      WaitForState prepCmd = new WaitForState();
-      prepCmd.setCoreName(leaderCoreName);
-      prepCmd.setNodeName(zkController.getNodeName());
-      prepCmd.setCoreNodeName(coreZkNodeName);
-      prepCmd.setState(Replica.State.RECOVERING);
-      prepCmd.setCheckLive(true);
-      prepCmd.setOnlyIfLeader(true);
-      final Slice.State state = slice.getState();
-      if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY) {
-        prepCmd.setOnlyIfLeaderActive(true);
-      }
-      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
-      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
-      
-      LOG.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
-      
-      mrr.future.get();
-    }
-  }
-
+  public abstract void close();
 }


[4/4] lucene-solr:jira/solr-9045: SOLR-9045: configurable RecoveryStrategy support

Posted by cp...@apache.org.
SOLR-9045: configurable RecoveryStrategy support

objectives:
 * To allow users to change RecoveryStrategy settings such as maxRetries and startingRecoveryDelay.
 * To support configuration of a custom recovery strategy.

patch summary:
 * RecoveryStrategy turned to DefaultRecoveryStrategy extending abstract RecoveryStrategy base class
 * DefaultRecoveryStrategy hard-coded settings exposed via getters/setters
 * DefaultRecoveryStrategyFactory extends abstract RecoveryStrategyFactory
 * solrconfig.xml now supports an optional <recoveryStrategyFactory class="..."> element
   (absence of the new element preserves existing behaviour)
 * CustomRecoveryStrategyFactoryTest using solrconfig-customrecoverystrategyfactory.xml

illustrative solrconfig.xml snippet:
  <recoveryStrategyFactory class="MyCustomRecoveryStrategyFactory">
    <int name="maxRetries">250</int> <!-- DefaultRecoveryStrategy's default is 500. -->
    <str name="settingUsedByCustomBehaviour"></str>
  </recoveryStrategyFactory>


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e3d5a192
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e3d5a192
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e3d5a192

Branch: refs/heads/jira/solr-9045
Commit: e3d5a1925bf3cd3feaff6e7694ffa05da6ce64aa
Parents: 928763b
Author: Christine Poerschke <cp...@apache.org>
Authored: Tue Apr 19 17:13:47 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Wed Apr 27 18:10:03 2016 +0100

----------------------------------------------------------------------
 .../solr/cloud/DefaultRecoveryStrategy.java     | 41 +++++++--
 .../cloud/DefaultRecoveryStrategyFactory.java   | 33 ++++++++
 .../org/apache/solr/cloud/RecoveryStrategy.java |  1 +
 .../solr/cloud/RecoveryStrategyFactory.java     | 47 +++++++++++
 .../java/org/apache/solr/core/SolrConfig.java   |  2 +
 .../src/java/org/apache/solr/core/SolrCore.java | 21 ++++-
 .../solr/update/DefaultSolrCoreState.java       | 17 +++-
 .../org/apache/solr/update/SolrCoreState.java   |  6 ++
 ...solrconfig-customrecoverystrategyfactory.xml | 28 ++++++
 .../core/CustomRecoveryStrategyFactoryTest.java | 89 ++++++++++++++++++++
 10 files changed, 277 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
index 60d2931..fa77d25 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategy.java
@@ -68,9 +68,9 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
-  private static final int MAX_RETRIES = 500;
-  private static final int STARTING_RECOVERY_DELAY = 5000;
+  private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 7000);
+  private int maxRetries = 500;
+  private int startingRecoveryDelayMilliSeconds = 5000;
 
   private volatile boolean close = false;
 
@@ -97,6 +97,35 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
     coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
   }
 
+  public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
+    return waitForUpdatesWithStaleStatePauseMilliSeconds;
+  }
+
+  public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
+    this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
+  }
+
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  public void setMaxRetries(int maxRetries) {
+    this.maxRetries = maxRetries;
+  }
+
+  public int getStartingRecoveryDelayMilliSeconds() {
+    return startingRecoveryDelayMilliSeconds;
+  }
+
+  public void setStartingRecoveryDelayMilliSeconds(int startingRecoveryDelayMilliSeconds) {
+    this.startingRecoveryDelayMilliSeconds = startingRecoveryDelayMilliSeconds;
+  }
+
+  @Override
+  public boolean getRecoveringAfterStartup() {
+    return recoveringAfterStartup;
+  }
+
   @Override
   public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
     this.recoveringAfterStartup = recoveringAfterStartup;
@@ -357,7 +386,7 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
         // are sure to have finished (see SOLR-7141 for
         // discussion around current value)
         try {
-          Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
+          Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
@@ -476,7 +505,7 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
           LOG.error("Recovery failed - trying again... (" + retries + ")");
           
           retries++;
-          if (retries >= MAX_RETRIES) {
+          if (retries >= maxRetries) {
             SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
             try {
               recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
@@ -501,7 +530,7 @@ public class DefaultRecoveryStrategy extends RecoveryStrategy {
               LOG.info("RecoveryStrategy has been closed");
               break; // check if someone closed us
             }
-            Thread.sleep(STARTING_RECOVERY_DELAY);
+            Thread.sleep(startingRecoveryDelayMilliSeconds);
           }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java
new file mode 100644
index 0000000..af15c1c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DefaultRecoveryStrategyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * A factory for creating a {@link DefaultRecoveryStrategy}.
+ */
+public class DefaultRecoveryStrategyFactory extends RecoveryStrategyFactory {
+
+  @Override
+  public RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+      RecoveryStrategy.RecoveryListener recoveryListener) {
+    return new DefaultRecoveryStrategy(cc, cd, recoveryListener);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d855a6d..5b18db2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -25,6 +25,7 @@ public abstract class RecoveryStrategy extends Thread implements Closeable {
     public void failed();
   }
 
+  public abstract boolean getRecoveringAfterStartup();
   public abstract void setRecoveringAfterStartup(boolean recoveringAfterStartup);
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java
new file mode 100644
index 0000000..3748255
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategyFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.util.SolrPluginUtils;
+import org.apache.solr.util.plugin.NamedListInitializedPlugin;
+
+/**
+ * A factory for creating a {@link RecoveryStrategy}.
+ */
+public abstract class RecoveryStrategyFactory implements NamedListInitializedPlugin {
+
+  private NamedList args;
+
+  @Override
+  public void init(NamedList args) {
+    this.args = args;
+  }
+
+  public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
+      RecoveryStrategy.RecoveryListener recoveryListener) {
+    final RecoveryStrategy recoveryStrategy = newRecoveryStrategy(cc, cd, recoveryListener);
+    SolrPluginUtils.invokeSetters(recoveryStrategy, args);
+    return recoveryStrategy;
+  }
+
+  public abstract RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+      RecoveryStrategy.RecoveryListener recoveryListener);
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/core/SolrConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java
index 0b4bac3..adc3e0b 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.util.Version;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -330,6 +331,7 @@ public class SolrConfig extends Config implements MapSerializable {
       .add(new SolrPluginInfo(SolrEventListener.class, "//listener", REQUIRE_CLASS, MULTI_OK, REQUIRE_NAME_IN_OVERLAY))
 
       .add(new SolrPluginInfo(DirectoryFactory.class, "directoryFactory", REQUIRE_CLASS))
+      .add(new SolrPluginInfo(RecoveryStrategyFactory.class, "recoveryStrategyFactory", REQUIRE_CLASS))
       .add(new SolrPluginInfo(IndexDeletionPolicy.class, "indexConfig/deletionPolicy", REQUIRE_CLASS))
       .add(new SolrPluginInfo(CodecFactory.class, "codecFactory", REQUIRE_CLASS))
       .add(new SolrPluginInfo(IndexReaderFactory.class, "indexReaderFactory", REQUIRE_CLASS))

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index b94b3d8..462ec9c 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -52,6 +52,8 @@ import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.DefaultRecoveryStrategyFactory;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -158,6 +160,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
   private final Map<String, SolrInfoMBean> infoRegistry;
   private final IndexDeletionPolicyWrapper solrDelPolicy;
   private final DirectoryFactory directoryFactory;
+  private final RecoveryStrategyFactory recoveryStrategyFactory;
   private IndexReaderFactory indexReaderFactory;
   private final Codec codec;
   private final MemClassLoader memClassLoader;
@@ -493,6 +496,20 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
     return dirFactory;
   }
 
+  private RecoveryStrategyFactory initRecoveryStrategyFactory() {
+    final PluginInfo info = solrConfig.getPluginInfo(RecoveryStrategyFactory.class.getName());
+    final RecoveryStrategyFactory rsFactory;
+    if (info != null) {
+      log.info(info.className);
+      rsFactory = getResourceLoader().newInstance(info.className, RecoveryStrategyFactory.class);
+      rsFactory.init(info.initArgs);
+    } else {
+      log.info("solr.RecoveryStrategyFactory");
+      rsFactory = new DefaultRecoveryStrategyFactory();
+    }
+    return rsFactory;
+  }
+
   private void initIndexReaderFactory() {
     IndexReaderFactory indexReaderFactory;
     PluginInfo info = solrConfig.getPluginInfo(IndexReaderFactory.class.getName());
@@ -681,10 +698,12 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
 
     if (updateHandler == null) {
       directoryFactory = initDirectoryFactory();
-      solrCoreState = new DefaultSolrCoreState(directoryFactory);
+      recoveryStrategyFactory = initRecoveryStrategyFactory();
+      solrCoreState = new DefaultSolrCoreState(directoryFactory, recoveryStrategyFactory);
     } else {
       solrCoreState = updateHandler.getSolrCoreState();
       directoryFactory = solrCoreState.getDirectoryFactory();
+      recoveryStrategyFactory = solrCoreState.getRecoveryStrategyFactory();
       isReloaded = true;
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 8eab83f..2695cd7 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -32,7 +32,9 @@ import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.SortingMergePolicy;
 import org.apache.lucene.search.Sort;
 import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.DefaultRecoveryStrategyFactory;
 import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.core.CoreContainer;
@@ -63,6 +65,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
 
   private SolrIndexWriter indexWriter = null;
   private DirectoryFactory directoryFactory;
+  private final RecoveryStrategyFactory recoveryStrategyFactory;
 
   private volatile RecoveryStrategy recoveryStrat;
 
@@ -76,8 +79,15 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
   
   protected final ReentrantLock commitLock = new ReentrantLock();
 
+  @Deprecated
   public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
+    this(directoryFactory, new DefaultRecoveryStrategyFactory());
+  }
+
+  public DefaultSolrCoreState(DirectoryFactory directoryFactory,
+      RecoveryStrategyFactory recoveryStrategyFactory) {
     this.directoryFactory = directoryFactory;
+    this.recoveryStrategyFactory = recoveryStrategyFactory;
   }
   
   private void closeIndexWriter(IndexWriterCloser closer) {
@@ -263,6 +273,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
   }
 
   @Override
+  public RecoveryStrategyFactory getRecoveryStrategyFactory() {
+    return recoveryStrategyFactory;
+  }
+
+  @Override
   public void doRecovery(CoreContainer cc, CoreDescriptor cd) {
     
     Thread thread = new Thread() {
@@ -310,7 +325,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
               recoveryThrottle.minimumWaitBetweenActions();
               recoveryThrottle.markAttemptingAction();
               
-              recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
+              recoveryStrat = recoveryStrategyFactory.create(cc, cd, DefaultSolrCoreState.this);
               recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
               Future<?> future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
               try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 89e286a..25ba94a 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.search.Sort;
 import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.DirectoryFactory;
@@ -138,6 +139,11 @@ public abstract class SolrCoreState {
    */
   public abstract DirectoryFactory getDirectoryFactory();
 
+  /**
+   * @return the {@link RecoveryStrategyFactory} that should be used.
+   */
+  public abstract RecoveryStrategyFactory getRecoveryStrategyFactory();
+
 
   public interface IndexWriterCloser {
     void closeWriter(IndexWriter writer) throws IOException;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml
new file mode 100644
index 0000000..1d0a93c
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategyfactory.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+  <requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
+  <recoveryStrategyFactory class="org.apache.solr.core.CustomRecoveryStrategyFactoryTest$CustomRecoveryStrategyFactory">
+    <int name="customParameter">42</int>
+  </recoveryStrategyFactory>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+</config>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3d5a192/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java b/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java
new file mode 100644
index 0000000..01146a4
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/CustomRecoveryStrategyFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.core;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.cloud.RecoveryStrategyFactory;
+import org.junit.BeforeClass;
+
+/**
+ * test that configs can override the RecoveryStrategyFactory
+ */
+public class CustomRecoveryStrategyFactoryTest extends SolrTestCaseJ4 {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    initCore("solrconfig-customrecoverystrategyfactory.xml", "schema.xml");
+  }
+
+  public void testFactory() throws Exception {
+    final RecoveryStrategyFactory recoveryStrategyFactory =
+        h.getCore().getSolrCoreState().getRecoveryStrategyFactory();
+    assertNotNull("recoveryStrategyFactory is null", recoveryStrategyFactory);
+    assertEquals("recoveryStrategyFactory is wrong class (name)",
+                 CustomRecoveryStrategyFactoryTest.CustomRecoveryStrategyFactory.class.getName(),
+                 recoveryStrategyFactory.getClass().getName());
+    assertTrue("recoveryStrategyFactory is wrong class (instanceof)",
+        recoveryStrategyFactory instanceof CustomRecoveryStrategyFactory);
+    final CustomRecoveryStrategyFactory customRecoveryStrategyFactory =
+        (CustomRecoveryStrategyFactory)recoveryStrategyFactory;
+}
+
+  public void testCreate() throws Exception {
+    final RecoveryStrategyFactory recoveryStrategyFactory =
+        h.getCore().getSolrCoreState().getRecoveryStrategyFactory();
+    assertNotNull("recoveryStrategyFactory is null", recoveryStrategyFactory);
+
+    final RecoveryStrategy recoveryStrategy =
+        recoveryStrategyFactory.create(null, null, null);
+
+    assertEquals("recoveryStrategy is wrong class (name)",
+                 CustomRecoveryStrategyFactoryTest.CustomRecoveryStrategy.class.getName(),
+                 recoveryStrategy.getClass().getName());
+    assertTrue("recoveryStrategy is wrong class (instanceof)",
+        recoveryStrategy instanceof CustomRecoveryStrategy);
+
+    final CustomRecoveryStrategy customRecoveryStrategy =
+        (CustomRecoveryStrategy)recoveryStrategy;
+    assertEquals(42, customRecoveryStrategy.getCustomParameter());
+}
+
+  static public class CustomRecoveryStrategy extends RecoveryStrategy {
+    public CustomRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+        RecoveryListener recoveryListener) {
+    }
+    private int customParameter = random().nextInt();
+    public int getCustomParameter() { return customParameter; }
+    public void setCustomParameter(int customParameter) { this.customParameter = customParameter; }
+    @Override
+    public boolean getRecoveringAfterStartup() { return false; }
+    @Override
+    public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {}
+    @Override
+    public void close() {}
+  }
+
+  static public class CustomRecoveryStrategyFactory extends RecoveryStrategyFactory {
+    @Override
+    public RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
+        RecoveryStrategy.RecoveryListener recoveryListener) {
+      return new CustomRecoveryStrategy(cc, cd, recoveryListener);
+    }
+  }
+
+}