You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/02/14 20:15:54 UTC
svn commit: r1244177 - in /lucene/dev/trunk/solr/core/src:
java/org/apache/solr/cloud/RecoveryStrategy.java
test/org/apache/solr/cloud/FullSolrCloudTest.java
Author: markrmiller
Date: Tue Feb 14 19:15:54 2012
New Revision: 1244177
URL: http://svn.apache.org/viewvc?rev=1244177&view=rev
Log:
SOLR-3126: We should try to do a quick sync on std start up recovery before trying to do a full blown replication.
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1244177&r1=1244176&r2=1244177&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue Feb 14 19:15:54 2012
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
*/
import java.io.IOException;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@@ -35,15 +36,19 @@ import org.apache.solr.core.CoreDescript
import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RecoveryStrategy extends Thread {
- private static final int MAX_RETRIES = 100;
- private static final int INTERRUPTED = 101;
+ private static final int MAX_RETRIES = 500;
+ private static final int INTERRUPTED = MAX_RETRIES + 1;
private static final int START_TIMEOUT = 100;
private static final String REPLICATION_HANDLER = "/replication";
@@ -86,7 +91,7 @@ public class RecoveryStrategy extends Th
close = true;
}
- private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
+ private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops, String baseUrl)
throws SolrServerException, IOException {
// start buffer updates to tran log
// and do recovery - either replay via realtime get (eventually)
@@ -97,7 +102,7 @@ public class RecoveryStrategy extends Th
String leaderUrl = leaderCNodeProps.getCoreUrl();
String leaderCoreName = leaderCNodeProps.getCoreName();
- log.info("Attempt to replicate from " + leaderUrl);
+ log.info("Attempting to replicate from " + leaderUrl);
// if we are the leader, either we are trying to recover faster
// then our ephemeral timed out or we are the only node
@@ -109,7 +114,7 @@ public class RecoveryStrategy extends Th
PrepRecovery prepCmd = new PrepRecovery();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(nodeName);
- prepCmd.setCoreNodeName(shardZkNodeName);
+ prepCmd.setCoreNodeName(coreZkNodeName);
server.request(prepCmd);
server.shutdown();
@@ -158,46 +163,72 @@ public class RecoveryStrategy extends Th
boolean succesfulRecovery = false;
while (!succesfulRecovery && !close && !isInterrupted()) {
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog == null) return;
-
- ulog.bufferUpdates();
- replayed = false;
- CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
try {
+ // first thing we just try to sync
zkController.publish(core, ZkStateReader.RECOVERING);
+ CloudDescriptor cloudDesc = core.getCoreDescriptor()
+ .getCloudDescriptor();
+ ZkNodeProps leaderprops = null;
- ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
+ leaderprops = zkStateReader.getLeaderProps(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
- // System.out.println("recover " + shardZkNodeName + " against " +
- // leaderprops);
- replicate(zkController.getNodeName(), core, coreZkNodeName,
- leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, coreName));
-
- replay(ulog);
- replayed = true;
-
- // if there are pending recovery requests, don't advert as active
- zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName,
- coreName);
-
- succesfulRecovery = true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Recovery was interrupted", e);
- retries = INTERRUPTED;
- } catch (Throwable t) {
- SolrException.log(log, "Error while trying to recover", t);
- } finally {
- if (!replayed) {
- try {
- ulog.dropBufferedUpdates();
- } catch (Throwable t) {
- SolrException.log(log, "", t);
+ String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP));
+
+ log.info("Attempting to PeerSync from " + leaderUrl);
+ PeerSync peerSync = new PeerSync(core,
+ Collections.singletonList(leaderUrl), 100);
+ boolean syncSuccess = peerSync.sync();
+ if (syncSuccess) {
+ SolrQueryRequest req = new LocalSolrQueryRequest(core,
+ new ModifiableSolrParams());
+ core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+ log.info("Sync Recovery was succesful - registering as Active");
+ // sync success - register as active and return
+ zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
+ coreZkNodeName, coreName);
+ return;
+ }
+ log.info("Sync Recovery was not successful - trying replication");
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog == null) return;
+
+ ulog.bufferUpdates();
+ replayed = false;
+
+ try {
+
+ replicate(zkController.getNodeName(), core,
+ leaderprops, leaderUrl);
+
+ replay(ulog);
+ replayed = true;
+
+ log.info("Recovery was succesful - registering as Active");
+ // if there are pending recovery requests, don't advert as active
+ zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
+ coreZkNodeName, coreName);
+
+ succesfulRecovery = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Recovery was interrupted", e);
+ retries = INTERRUPTED;
+ } catch (Throwable t) {
+ SolrException.log(log, "Error while trying to recover", t);
+ } finally {
+ if (!replayed) {
+ try {
+ ulog.dropBufferedUpdates();
+ } catch (Throwable t) {
+ SolrException.log(log, "", t);
+ }
}
+
}
+ } catch (Throwable t) {
+ SolrException.log(log, "Error while trying to recover", t);
}
if (!succesfulRecovery) {
@@ -205,14 +236,14 @@ public class RecoveryStrategy extends Th
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
try {
-
+
SolrException.log(log, "Recovery failed - trying again...");
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
-
+
} else {
- // TODO: for now, give up after 10 tries - should we do more?
+ // TODO: for now, give up after X tries - should we do more?
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1244177&r1=1244176&r2=1244177&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Tue Feb 14 19:15:54 2012
@@ -698,8 +698,14 @@ public class FullSolrCloudTest extends A
private void brindDownShardIndexSomeDocsAndRecover() throws Exception,
SolrServerException, IOException, InterruptedException {
+ SolrQuery query = new SolrQuery("*:*");
+ query.set("distrib", false);
commit();
+
+ long deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
+ System.out.println("dsc:" + deadShardCount);
+
query("q", "*:*", "sort", "n_tl1 desc");
// kill a shard
@@ -715,7 +721,6 @@ public class FullSolrCloudTest extends A
// ensure shard is dead
try {
- // TODO: ignore fail
index_specific(shardToClient.get(SHARD2).get(0), id, 999, i1, 107, t1,
"specific doc!");
fail("This server should be down and this update should have failed");
@@ -743,6 +748,7 @@ public class FullSolrCloudTest extends A
Thread.sleep(1000);
}
+ long numFound1 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
index_specific(shardToClient.get(SHARD2).get(1), id, 1000, i1, 108, t1,
"specific doc!");
@@ -755,8 +761,10 @@ public class FullSolrCloudTest extends A
// try adding a doc with CloudSolrServer
cloudClient.setDefaultCollection(DEFAULT_COLLECTION);
- SolrQuery query = new SolrQuery("*:*");
- long numFound1 = cloudClient.query(query).getResults().getNumFound();
+
+ long numFound2 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+
+ assertEquals(numFound1 + 1, numFound2);
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", 1001);
@@ -772,10 +780,10 @@ public class FullSolrCloudTest extends A
query("q", "*:*", "sort", "n_tl1 desc");
- long numFound2 = cloudClient.query(query).getResults().getNumFound();
+ long numFound3 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
// lets just check that the one doc since last commit made it in...
- assertEquals(numFound1 + 1, numFound2);
+ assertEquals(numFound2 + 1, numFound3);
// test debugging
testDebugQueries();
@@ -786,7 +794,9 @@ public class FullSolrCloudTest extends A
for (SolrServer client : clients) {
try {
- System.out.println(client.query(new SolrQuery("*:*")).getResults()
+ SolrQuery q = new SolrQuery("*:*");
+ q.set("distrib", false);
+ System.out.println(client.query(q).getResults()
.getNumFound());
} catch (Exception e) {
@@ -798,21 +808,41 @@ public class FullSolrCloudTest extends A
// query("q","matchesnothing","fl","*,score", "debugQuery", "true");
// this should trigger a recovery phase on deadShard
-
deadShard.start(true);
- // make sure we have published we are recoverying
+ // make sure we have published we are recovering
Thread.sleep(1500);
waitForRecoveriesToFinish(false);
- List<SolrServer> s2c = shardToClient.get(SHARD2);
-
+ deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
// if we properly recovered, we should now have the couple missing docs that
// came in while shard was down
- assertEquals(s2c.get(0).query(new SolrQuery("*:*")).getResults()
- .getNumFound(), s2c.get(1).query(new SolrQuery("*:*")).getResults()
- .getNumFound());
+ checkShardConsistency(true, false);
+
+
+ // recover over 100 docs so we do more than just peer sync (replicate recovery)
+ deadShard = chaosMonkey.stopShard(SHARD2, 0);
+
+ for (int i = 0; i < 226; i++) {
+ doc = new SolrInputDocument();
+ doc.addField("id", 2000 + i);
+ controlClient.add(doc);
+ ureq = new UpdateRequest();
+ ureq.add(doc);
+ // ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
+ ureq.process(cloudClient);
+ }
+ commit();
+
+ deadShard.start(true);
+
+ // make sure we have published we are recovering
+ Thread.sleep(1500);
+
+ waitForRecoveriesToFinish(false);
+
+ checkShardConsistency(true, false);
}
private void testDebugQueries() throws Exception {