You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/13 16:38:02 UTC
svn commit: r1231134 - in /lucene/dev/branches/solrcloud/solr: ./
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/
core/src/java/org/apache/solr/handler/admin/
core/src/java/org/apache/solr/update/ core/src/test/org/apache/so...
Author: markrmiller
Date: Fri Jan 13 15:38:01 2012
New Revision: 1231134
URL: http://svn.apache.org/viewvc?rev=1231134&view=rev
Log:
harden the hell out of recovery
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/branches/solrcloud/solr/testlogging.properties
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java Fri Jan 13 15:38:01 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
@@ -35,8 +36,10 @@ import org.apache.solr.core.RequestHandl
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,8 +83,7 @@ public class RecoveryStrat {
log.info("Start recovery process");
if (recoveryListener != null) recoveryListener.startRecovery();
- zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName,
- core.getName());
+
} catch (Exception e) {
log.error("", e);
core.getUpdateHandler().getSolrCoreState().recoveryRequests.decrementAndGet();
@@ -102,16 +104,20 @@ public class RecoveryStrat {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) return;
- ulog.bufferUpdates();
boolean replayed = false;
boolean succesfulRecovery = false;
int retries = 0;
while (!succesfulRecovery && !close) {
+ ulog.bufferUpdates();
+ zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName,
+ core.getName());
+ replayed = false;
try {
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
-
- replicate(core, shardZkNodeName, leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName()));
+ // nocommit
+ // System.out.println("recover " + shardZkNodeName + " against " + leaderprops);
+ replicate(zkController.getNodeName(), core, shardZkNodeName, leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName()));
replay(core);
replayed = true;
@@ -150,6 +156,7 @@ public class RecoveryStrat {
// lets pause for a moment and we need to try again...
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
+ try {
log.error("Recovery failed - trying again...");
retries++;
if (retries >= MAX_RETRIES) {
@@ -158,11 +165,19 @@ public class RecoveryStrat {
cloudDesc);
}
+ zkController.publishAsDown(baseUrl, cloudDesc, shardZkNodeName,
+ core.getName());
+
+ } catch (Exception e) {
+ log.error("", e);
+ }
+
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ log.error("Recovery was interrupted", e);
+ retries = MAX_RETRIES;
}
}
}
@@ -181,6 +196,21 @@ public class RecoveryStrat {
// wait for replay
future.get();
}
+
+ // nocommit
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replayed "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
+
return future;
}
};
@@ -196,7 +226,7 @@ public class RecoveryStrat {
close = true;
}
- private void replicate(SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
+ private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
throws SolrServerException, IOException {
// start buffer updates to tran log
// and do recovery - either replay via realtime get (eventually)
@@ -217,7 +247,8 @@ public class RecoveryStrat {
PrepRecovery prepCmd = new PrepRecovery();
prepCmd.setAction(CoreAdminAction.PREPRECOVERY);
prepCmd.setCoreName(leaderCoreName);
- prepCmd.setNodeName(shardZkNodeName);
+ prepCmd.setNodeName(nodeName);
+ prepCmd.setCoreNodeName(shardZkNodeName);
server.request(prepCmd);
@@ -237,9 +268,27 @@ public class RecoveryStrat {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
- replicationHandler.doFetch(solrParams);
+ boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have
+
+ if (!success) {
+ throw new RuntimeException("Replication for recovery failed.");
+ }
if (recoveryListener != null) recoveryListener.finishedReplication();
+
+ // nocommit
+ try {
+ RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+ SolrIndexSearcher searcher = searchHolder.get();
+ try {
+ System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replicated "
+ + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " from " + leaderUrl + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
+ } finally {
+ searchHolder.decref();
+ }
+ } catch (Exception e) {
+
+ }
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Jan 13 15:38:01 2012
@@ -170,10 +170,24 @@ public final class ZkController {
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
- // re register all descriptors
List<CoreDescriptor> descriptors = registerOnReconnect
.getCurrentDescriptors();
if (descriptors != null) {
+ // before registering as live, make sure everyone is in a
+ // recovery state
+ for (CoreDescriptor descriptor : descriptors) {
+ final String shardZkNodeName = getNodeName() + "_"
+ + descriptor.getName();
+ publishAsDown(getBaseUrl(), descriptor.getCloudDescriptor(), shardZkNodeName,
+ descriptor.getName());
+ }
+ }
+
+ // we have to register as live first to pick up docs in the buffer
+ createEphemeralLiveNode();
+
+ // re register all descriptors
+ if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it was
// a leader that was expired - as well as what to do about leaders/overseers
@@ -181,10 +195,7 @@ public final class ZkController {
register(descriptor.getName(), descriptor, true);
}
}
-
- // don't advertise as live until everyone has registered
- createEphemeralLiveNode();
-
+
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -465,7 +476,7 @@ public final class ZkController {
props.put(ZkStateReader.CORE_PROP, coreName);
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
- props.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+ props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
if(shardId!=null) {
props.put(ZkStateReader.SHARD_ID_PROP, shardId);
}
@@ -595,13 +606,24 @@ public final class ZkController {
publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
}
+ void publishAsDown(String baseUrl,
+ final CloudDescriptor cloudDesc, String shardZkNodeName, String coreName) {
+ Map<String,String> finalProps = new HashMap<String,String>();
+ finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+ finalProps.put(ZkStateReader.CORE_PROP, coreName);
+ finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+ finalProps.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
+ publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
+ }
+
void publishAsRecoveryFailed(String baseUrl,
final CloudDescriptor cloudDesc, String shardZkNodeName, String coreName) {
Map<String,String> finalProps = new HashMap<String,String>();
finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
finalProps.put(ZkStateReader.CORE_PROP, coreName);
finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+ finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
finalProps.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Fri Jan 13 15:38:01 2012
@@ -129,6 +129,8 @@ public class ReplicationHandler extends
// It gives the current 'replicateable' index version
if (command.equals(CMD_INDEX_VERSION)) {
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
+ // nocommit
+ //System.out.println("The latest index gen is:" + commitPoint.getGeneration() + " " + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
if (commitPoint != null && replicationEnabled.get()) {
//
// There is a race condition here. The commit point may be changed / deleted by the time
@@ -163,7 +165,7 @@ public class ReplicationHandler extends
new Thread() {
@Override
public void run() {
- doFetch(paramsCopy);
+ doFetch(paramsCopy, false);
}
}.start();
rsp.add(STATUS, OK_STATUS);
@@ -271,10 +273,10 @@ public class ReplicationHandler extends
private volatile SnapPuller tempSnapPuller;
- public void doFetch(SolrParams solrParams) {
+ public boolean doFetch(SolrParams solrParams, boolean force) {
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!snapPullLock.tryLock())
- return;
+ return false;
try {
tempSnapPuller = snapPuller;
if (masterUrl != null) {
@@ -282,13 +284,14 @@ public class ReplicationHandler extends
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
}
- tempSnapPuller.fetchLatestIndex(core);
+ return tempSnapPuller.fetchLatestIndex(core, force);
} catch (Exception e) {
LOG.error("SnapPull failed ", e);
} finally {
tempSnapPuller = snapPuller;
snapPullLock.unlock();
}
+ return false;
}
boolean isReplicating() {
@@ -335,6 +338,8 @@ public class ReplicationHandler extends
}
long version = Long.parseLong(v);
IndexCommit commit = core.getDeletionPolicy().getCommitPoint(version);
+ //nocommit
+ //System.out.println("ask for files for gen:" + commit.getGeneration() + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
if (commit == null) {
rsp.add("status", "invalid indexversion");
return;
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Fri Jan 13 15:38:01 2012
@@ -159,7 +159,7 @@ public class SnapPuller {
}
try {
executorStartTime = System.currentTimeMillis();
- replicationHandler.doFetch(null);
+ replicationHandler.doFetch(null, false);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
@@ -244,7 +244,8 @@ public class SnapPuller {
@SuppressWarnings("unchecked")
boolean successfulInstall = false;
- boolean fetchLatestIndex(SolrCore core) throws IOException {
+ boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException {
+ successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
try {
//get the current 'replicateable' index version in the master
@@ -257,9 +258,10 @@ public class SnapPuller {
}
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
- if (latestVersion == 0L) {
+ if (latestVersion == 0L && !force) {
//there is nothing to be replicated
- return false;
+ successfulInstall = true;
+ return true;
}
IndexCommit commit;
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
@@ -275,9 +277,11 @@ public class SnapPuller {
searcherRefCounted.decref();
}
if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
- //master and slave are alsready in sync just return
+ //master and slave are already in sync just return
LOG.info("Slave in sync with master.");
- return false;
+ System.out.println("SLAVE IN SYNC:" + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
+ successfulInstall = true;
+ return true;
}
LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
@@ -294,7 +298,7 @@ public class SnapPuller {
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
- boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;
+ boolean isFullCopyNeeded = commit.getVersion() >= latestVersion || force;
File tmpIndexDir = createTempindexDir(core);
if (isIndexStale())
isFullCopyNeeded = true;
@@ -336,6 +340,7 @@ public class SnapPuller {
return successfulInstall;
} catch (ReplicationHandlerException e) {
LOG.error("User aborted Replication");
+ return false;
} catch (SolrException e) {
throw e;
} catch (Exception e) {
@@ -344,9 +349,9 @@ public class SnapPuller {
if (deleteTmpIdxDir) delTree(tmpIndexDir);
else delTree(indexDir);
}
- return successfulInstall;
} finally {
if (!successfulInstall) {
+ System.out.println("replication failed handler:" + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
logReplicationTimeAndConfFiles(null, successfulInstall);
}
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Fri Jan 13 15:38:01 2012
@@ -19,12 +19,14 @@ package org.apache.solr.handler.admin;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.RecoveryStrat;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
@@ -590,16 +592,18 @@ public class CoreAdminHandler extends Re
}
}
- protected void handlePrepRecoveryAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, InterruptedException {
+ protected void handlePrepRecoveryAction(SolrQueryRequest req,
+ SolrQueryResponse rsp) throws IOException, InterruptedException {
final SolrParams params = req.getParams();
-
+
String cname = params.get(CoreAdminParams.CORE);
if (cname == null) {
cname = "";
}
-
+
String nodeName = params.get("nodeName");
-
+ String coreNodeName = params.get("coreNodeName");
+
SolrCore core = coreContainer.getCore(cname);
if (core == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:" + cname);
@@ -612,51 +616,75 @@ public class CoreAdminHandler extends Re
// to accept updates
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
.getCloudDescriptor();
- ZkNodeProps nodeProps = coreContainer
+ CloudState cloudState = coreContainer
.getZkController()
- .getCloudState()
- .getSlice(cloudDescriptor.getCollectionName(),
- cloudDescriptor.getShardId()).getShards().get(nodeName);
+ .getCloudState();
+ ZkNodeProps nodeProps =
+ cloudState.getSlice(cloudDescriptor.getCollectionName(),
+ cloudDescriptor.getShardId()).getShards().get(coreNodeName);
state = nodeProps.get(ZkStateReader.STATE_PROP);
- if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)) {
+ if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)
+ && cloudState.liveNodesContain(nodeName)) {
break;
}
if (retry++ == 30) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"I was asked to prep for recovery for " + nodeName
- + " but she is not in a recovery state - state: " + state);
+ + " but she is not live or not in a recovery state - state: " + state);
}
-
+
Thread.sleep(1000);
}
- if (core != null) {
- // small safety net for any updates that started with state that
- // kept it from sending the update to be buffered -
- // pause for a while to let any outstanding updates finish
-
- Thread.sleep(2000);
-
- UpdateRequestProcessorChain processorChain = core
- .getUpdateProcessingChain(SolrPluginUtils.resolveUpdateChainParam(
- params, log));
-
- ModifiableSolrParams reqParams = new ModifiableSolrParams(
- req.getParams());
- reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
-
- SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
- UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
- new SolrQueryResponse());
- CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+ // small safety net for any updates that started with state that
+ // kept it from sending the update to be buffered -
+ // pause for a while to let any outstanding updates finish
+
+ Thread.sleep(4000);
+
+ UpdateRequestProcessorChain processorChain = core
+ .getUpdateProcessingChain(SolrPluginUtils.resolveUpdateChainParam(
+ params, log));
+
+ ModifiableSolrParams reqParams = new ModifiableSolrParams(req.getParams());
+ reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
+
+ SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
+ UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
+ new SolrQueryResponse());
+ CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+
+ processor.processCommit(cuc);
+ processor.finish();
+
+ // nocommit
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
+
+ try {
+ RefCounted<SolrIndexSearcher> searchHolder = core.getSearcher();
+ SolrIndexSearcher searcher = searchHolder.get();
+ try {
+ System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate (2) "
+ + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+ } finally {
+ searchHolder.decref();
+ }
+ } catch (Exception e) {
- processor.processCommit(cuc);
- processor.finish();
- } else {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find core: "
- + core);
}
+
} finally {
if (core != null) {
core.close();
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri Jan 13 15:38:01 2012
@@ -737,7 +737,6 @@ public class UpdateLog implements Plugin
versionInfo.blockUpdates();
try {
if (state != State.BUFFERING) return null;
- state = State.APPLYING_BUFFERED;
// handle case when no log was even created because no updates
// were received.
@@ -745,13 +744,14 @@ public class UpdateLog implements Plugin
state = State.ACTIVE;
return null;
}
-
+ tlog.incref();
+ state = State.APPLYING_BUFFERED;
} finally {
versionInfo.unblockUpdates();
}
- tlog.incref();
if (recoveryExecutor.isShutdown()) {
+ tlog.decref();
throw new RuntimeException("executor is not running...");
}
ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Fri Jan 13 15:38:01 2012
@@ -45,6 +45,8 @@ import org.mortbay.jetty.servlet.FilterH
*/
public class ChaosMonkey {
+ private static final int CONLOSS_PERCENT = 3; //30%
+ private static final int EXPIRE_PERCENT = 4; //40%
private static final boolean DONTKILLLEADER = true;
private Map<String,List<CloudJettyRunner>> shardToJetty;
@@ -361,11 +363,11 @@ public class ChaosMonkey {
int rnd = random.nextInt(10);
- if (expireSessions && rnd < 3) {
+ if (expireSessions && rnd < EXPIRE_PERCENT) {
expireRandomSession();
}
- if (causeConnectionLoss && rnd < 2) {
+ if (causeConnectionLoss && rnd < CONLOSS_PERCENT) {
randomConnectionLoss();
randomConnectionLoss();
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Fri Jan 13 15:38:01 2012
@@ -559,8 +559,9 @@ public class FullSolrCloudTest extends A
+ " live:"
+ cloudState.liveNodesContain(shard.getValue().get(
ZkStateReader.NODE_NAME_PROP)));
- if (shard.getValue().get(ZkStateReader.STATE_PROP)
- .equals(ZkStateReader.RECOVERING)
+ String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+ if ((state.equals(ZkStateReader.RECOVERING) || state
+ .equals(ZkStateReader.DOWN))
&& cloudState.liveNodesContain(shard.getValue().get(
ZkStateReader.NODE_NAME_PROP))) {
sawLiveRecovering = true;
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Fri Jan 13 15:38:01 2012
@@ -92,17 +92,28 @@ public class CoreAdminRequest extends So
public static class PrepRecovery extends CoreAdminRequest {
protected String nodeName;
-
+ protected String coreNodeName;
public PrepRecovery() {
action = CoreAdminAction.PREPRECOVERY;
}
- public void setNodeName(String nodeName) { this.nodeName = nodeName; }
-
- public String getNodeName() { return nodeName; }
-
-
+ public void setNodeName(String nodeName) {
+ this.nodeName = nodeName;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public String getCoreNodeName() {
+ return coreNodeName;
+ }
+
+ public void setCoreNodeName(String coreNodeName) {
+ this.coreNodeName = coreNodeName;
+ }
+
@Override
public SolrParams getParams() {
if( action == null ) {
@@ -116,9 +127,14 @@ public class CoreAdminRequest extends So
if (nodeName != null) {
params.set( "nodeName", nodeName);
}
+
+ if (coreNodeName != null) {
+ params.set( "coreNodeName", coreNodeName);
+ }
return params;
}
+
}
public static class RequestRecovery extends CoreAdminRequest {
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Jan 13 15:38:01 2012
@@ -62,6 +62,7 @@ public class ZkStateReader {
public static final String RECOVERING = "recovering";
public static final String RECOVERY_FAILED = "recovery_failed";
public static final String ACTIVE = "active";
+ public static final String DOWN = "down";
private volatile CloudState cloudState;
@@ -165,50 +166,49 @@ public class ZkStateReader {
// We need to fetch the current cluster state and the set of live nodes
synchronized (getUpdateLock()) {
- cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
-
-
-
- log.info("Updating cluster state from ZooKeeper... ");
-
- zkClient.exists(CLUSTER_STATE, new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- log.info("A cluster state change has occurred");
- try {
+ cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
+
+ log.info("Updating cluster state from ZooKeeper... ");
+
+ zkClient.exists(CLUSTER_STATE, new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.info("A cluster state change has occurred");
+ try {
+
+ // delayed approach
+ // ZkStateReader.this.updateCloudState(false, false);
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ // remake watch
+ final Watcher thisWatch = this;
+ byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null,
+ true);
- // delayed approach
- // ZkStateReader.this.updateCloudState(false, false);
- synchronized (ZkStateReader.this.getUpdateLock()) {
- // remake watch
- final Watcher thisWatch = this;
- byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null, true);
-
- CloudState clusterState = CloudState.load(data,
- ZkStateReader.this.cloudState.getLiveNodes());
- // update volatile
- cloudState = clusterState;
- }
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
+ CloudState clusterState = CloudState.load(data,
+ ZkStateReader.this.cloudState.getLiveNodes());
+ // update volatile
+ cloudState = clusterState;
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
- }
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ return;
}
-
- }, true);
- }
+ }
+
+ }, true);
+ }
synchronized (ZkStateReader.this.getUpdateLock()) {
Modified: lucene/dev/branches/solrcloud/solr/testlogging.properties
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/testlogging.properties?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/testlogging.properties (original)
+++ lucene/dev/branches/solrcloud/solr/testlogging.properties Fri Jan 13 15:38:01 2012
@@ -1,4 +1,4 @@
handlers=java.util.logging.ConsoleHandler
-.level=WARNING
+.level=SEVERE
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter