You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/21 00:04:36 UTC
svn commit: r1204292 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/
core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/pr...
Author: markrmiller
Date: Sun Nov 20 23:04:35 2011
New Revision: 1204292
URL: http://svn.apache.org/viewvc?rev=1204292&view=rev
Log:
do recovery on non leader shards that start up - add simple test for this - fix bug in new 'force' replication - fix bug where some full index replications are not properly replicated from - other minor improvements/movement
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Nov 20 23:04:35 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutExcep
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
@@ -419,10 +420,57 @@ public final class ZkController {
log.info("Attempting to update " + ZkStateReader.CLUSTER_STATE + " version "
+ null);
CloudState state = CloudState.load(data);
+ String shardZkNodeName = getNodeName() + "_" + coreName;
+
+ boolean recover = getIsRecover(cloudDesc, state, shardZkNodeName);
+
+ String shardId = cloudDesc.getShardId();
+ if (shardId == null && !recover) {
+ shardId = assignShard.assignShard(collection, numShards);
+ cloudDesc.setShardId(shardId);
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Register shard - core:" + coreName + " address:"
+ + shardUrl);
+ }
+
+ leaderElector.setupForSlice(shardId, collection);
+
+ ZkNodeProps props = addToZk(collection, desc, cloudDesc, shardUrl, shardZkNodeName);
+ // leader election
+ doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
+
+ String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
+
+ System.out.println("leader url: "+ leaderUrl);
+ System.out.println("shard url: "+ shardUrl);
+ boolean iamleader = false;
+ if (leaderUrl.equals(shardUrl)) {
+ iamleader = true;
+ } else {
+ // we are not the leader, so catch up with recovery
+ recover = true;
+ }
+
+ if (recover) {
+ if (desc.getCoreContainer() != null) {
+ doRecovery(collection, desc, cloudDesc, iamleader);
+ } else {
+ log.warn("For some odd reason a SolrCore is trying to recover but does not have access to a CoreContainer - skipping recovery.");
+ }
+ }
+
+ return shardId;
+ }
+
+
+ private boolean getIsRecover(final CloudDescriptor cloudDesc,
+ CloudState state, String shardZkNodeName) {
boolean recover = false;
Map<String,Slice> slices = state.getSlices(cloudDesc.getCollectionName());
- String shardZkNodeName = getNodeName() + "_" + coreName;
+
if (slices != null) {
Map<String,String> nodes = new HashMap<String,String>();
@@ -440,31 +488,12 @@ public final class ZkController {
recover = true;
}
}
-
- String shardId = cloudDesc.getShardId();
- if (shardId == null && !recover) {
- shardId = assignShard.assignShard(collection, numShards);
- cloudDesc.setShardId(shardId);
- }
-
-
- if (log.isInfoEnabled()) {
- log.info("Register shard - core:" + coreName + " address:"
- + shardUrl);
- }
-
- leaderElector.setupForSlice(shardId, collection);
-
- ZkNodeProps props = addToZk(collection, desc, cloudDesc, shardUrl, shardZkNodeName, recover);
-
- // leader election
- doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
- return shardId;
+ return recover;
}
ZkNodeProps addToZk(String collection, final CoreDescriptor desc, final CloudDescriptor cloudDesc, String shardUrl,
- final String shardZkNodeName, boolean recover)
+ final String shardZkNodeName)
throws Exception {
ZkNodeProps props = new ZkNodeProps();
props.put(ZkStateReader.URL_PROP, shardUrl);
@@ -521,42 +550,6 @@ public final class ZkController {
zkClient.setData(ZkStateReader.CLUSTER_STATE,
CloudState.store(state), stat.getVersion());
updated = true;
- if (recover) {
- // nocommit: joke code
- System.out.println("do recovery");
- // start buffer updates to tran log
- // and do recovery - either replay via realtime get
- // or full index replication
- System.out.println("RECOVERY");
- // seems we cannot do this here since we are not fully running -
- // we need to trigger a recovery that happens later
- System.out.println("shard is:" + cloudDesc.getShardId());
- String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
- System.out.println("leader url: "+ leaderUrl);
- System.out.println("shard url: "+ shardUrl);
- if (!leaderUrl.equals(shardUrl)) {
- // if we are the leader, either we are trying to recover faster
- // then our ephemeral timed out or we are the only node
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("command", "fetchindex");
- params.set("force", true); // force replication regardless of
- // versions
- params.set("masterUrl", leaderUrl + "replication");
- QueryRequest req = new QueryRequest(params);
- req.setPath("/replication");
- System.out.println("Make replication call to:" + leaderUrl);
- System.out.println("params:" + params);
-
- // if we want to buffer updates while recovering, this
- // will have to trigger later - http is not yet up
-
- // we need to use embedded cause http is not up yet anyhow
- EmbeddedSolrServer server = new EmbeddedSolrServer(
- desc.getCoreContainer(), desc.getName());
- server.request(req);
- }
- }
} catch (KeeperException e) {
if (e.code() != Code.BADVERSION) {
throw e;
@@ -569,6 +562,46 @@ public final class ZkController {
return props;
}
+
+ private void doRecovery(String collection, final CoreDescriptor desc,
+ final CloudDescriptor cloudDesc, boolean iamleader) throws Exception,
+ SolrServerException, IOException {
+ // nocommit: joke code
+ System.out.println("do recovery");
+ // start buffer updates to tran log
+ // and do recovery - either replay via realtime get
+ // or full index replication
+
+ // seems perhaps we cannot do this here since we are not fully running -
+ // we need to trigger a recovery that happens later
+ System.out.println("shard is:" + cloudDesc.getShardId());
+
+ String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
+
+ if (!iamleader) {
+ // if we are the leader, either we are trying to recover faster
+ // then our ephemeral timed out or we are the only node
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("command", "fetchindex");
+ params.set("force", true); // force replication regardless of
+ // versions
+ params.set("masterUrl", leaderUrl + "replication");
+ QueryRequest req = new QueryRequest(params);
+ req.setPath("/replication");
+ System.out.println("Make replication call to:" + leaderUrl);
+ System.out.println("params:" + params);
+
+ // if we want to buffer updates while recovering, this
+ // will have to trigger later - http is not yet up
+
+ // we need to use embedded cause http is not up yet anyhow
+ EmbeddedSolrServer server = new EmbeddedSolrServer(
+ desc.getCoreContainer(), desc.getName());
+ server.request(req);
+ }
+ }
+
private void doLeaderElectionProcess(String shardId,
final String collection, String shardZkNodeName, ZkNodeProps props) throws KeeperException,
InterruptedException, IOException {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Sun Nov 20 23:04:35 2011
@@ -561,7 +561,7 @@ public final class SolrCore implements S
if (updateHandler == null) {
initDirectoryFactory();
} else {
- directoryFactory = updateHandler.getIndexWriterProvider().getDirectoryFactory();
+ directoryFactory = updateHandler.getSolrCoreState().getDirectoryFactory();
}
initIndex();
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Sun Nov 20 23:04:35 2011
@@ -81,6 +81,8 @@ import org.slf4j.LoggerFactory;
* @since solr 1.4
*/
public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
+ static final String FORCE = "force";
+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
SolrCore core;
@@ -118,6 +120,7 @@ public class ReplicationHandler extends
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
rsp.setHttpCaching(false);
final SolrParams solrParams = req.getParams();
+ boolean force = solrParams.getBool(FORCE, false);
String command = solrParams.get(COMMAND);
if (command == null) {
rsp.add(STATUS, OK_STATUS);
@@ -131,8 +134,9 @@ public class ReplicationHandler extends
// this is only set after commit or optimize or something - if it's not set,
// just use the most recent
- if (commitPoint == null) {
- commitPoint = req.getSearcher().getIndexReader().getIndexCommit();
+ if (commitPoint == null || force) {
+ commitPoint = core.getDeletionPolicy().getLatestCommit();
+ indexCommitPoint = commitPoint;
}
if (commitPoint != null && replicationEnabled.get()) {
@@ -143,6 +147,7 @@ public class ReplicationHandler extends
// the CMD_GET_FILE_LIST command.
//
core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);
+ System.out.println("return version: " + commitPoint.getVersion());
rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
rsp.add(GENERATION, commitPoint.getGeneration());
} else {
@@ -288,7 +293,7 @@ public class ReplicationHandler extends
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
}
- tempSnapPuller.fetchLatestIndex(core, solrParams == null ? false : solrParams.getBool("force", false));
+ tempSnapPuller.fetchLatestIndex(core, solrParams == null ? false : solrParams.getBool(FORCE, false));
} catch (Exception e) {
LOG.error("SnapPull failed ", e);
} finally {
@@ -353,7 +358,9 @@ public class ReplicationHandler extends
Collection<String> files = new HashSet<String>(commit.getFileNames());
for (String fileName : files) {
if(fileName.endsWith(".lock")) continue;
- File file = new File(core.getIndexDir(), fileName);
+ // use new dir in case we are replicating from a full index replication
+ // and have not yet reloaded the core
+ File file = new File(core.getNewIndexDir(), fileName);
Map<String, Object> fileMeta = getFileInfo(file);
result.add(fileMeta);
}
@@ -763,9 +770,10 @@ public class ReplicationHandler extends
}
- void refreshCommitpoint() {
+ void refreshCommitpoint(boolean force) {
IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
- if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
+ System.out.println("refresh commit point to:" + commitPoint.getVersion());
+ if(force || replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
indexCommitPoint = commitPoint;
}
}
@@ -1022,7 +1030,9 @@ public class ReplicationHandler extends
file = new File(core.getResourceLoader().getConfigDir(), cfileName);
} else {
//else read from the indexdirectory
- file = new File(core.getIndexDir(), fileName);
+ // use new dir in case we are replicating from a full index replication
+ // and have not yet reloaded the core
+ file = new File(core.getNewIndexDir(), fileName);
}
if (file.exists() && file.canRead()) {
inputStream = new FileInputStream(file);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Sun Nov 20 23:04:35 2011
@@ -172,12 +172,16 @@ public class SnapPuller {
/**
* Gets the latest commit version and generation from the master
+ * @param force
*/
@SuppressWarnings("unchecked")
- NamedList getLatestVersion() throws IOException {
+ NamedList getLatestVersion(boolean force) throws IOException {
PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, CMD_INDEX_VERSION);
post.addParameter("wt", "javabin");
+ if (force) {
+ post.addParameter(ReplicationHandler.FORCE, "true");
+ }
return getNamedListResponse(post);
}
@@ -249,7 +253,7 @@ public class SnapPuller {
//get the current 'replicateable' index version in the master
NamedList response = null;
try {
- response = getLatestVersion();
+ response = getLatestVersion(force);
} catch (Exception e) {
LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
return false;
@@ -269,7 +273,7 @@ public class SnapPuller {
if (searcherRefCounted != null)
searcherRefCounted.decref();
}
- if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
+ if (!force && commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
//master and slave are alsready in sync just return
LOG.info("Slave in sync with master.");
return false;
@@ -324,7 +328,7 @@ public class SnapPuller {
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
- doCommit();
+ doCommit(isFullCopyNeeded);
}
}
replicationStartTime = 0;
@@ -469,7 +473,7 @@ public class SnapPuller {
return sb;
}
- private void doCommit() throws IOException {
+ private void doCommit(boolean isFullCopyNeeded) throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
try {
@@ -478,7 +482,7 @@ public class SnapPuller {
solrCore.getUpdateHandler().newIndexWriter();
solrCore.getSearcher(true, false, null);
- replicationHandler.refreshCommitpoint();
+ replicationHandler.refreshCommitpoint(isFullCopyNeeded);
} finally {
req.close();
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Sun Nov 20 23:04:35 2011
@@ -559,7 +559,7 @@ public class DirectUpdateHandler2 extend
return "DirectUpdateHandler2" + getStatistics();
}
- public SolrCoreState getIndexWriterProvider() {
+ public SolrCoreState getSolrCoreState() {
return solrCoreState;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Sun Nov 20 23:04:35 2011
@@ -141,7 +141,7 @@ public abstract class UpdateHandler impl
*/
public abstract void newIndexWriter() throws IOException;
- public abstract SolrCoreState getIndexWriterProvider();
+ public abstract SolrCoreState getSolrCoreState();
public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
public abstract void delete(DeleteUpdateCommand cmd) throws IOException;
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Nov 20 23:04:35 2011
@@ -247,11 +247,8 @@ public class DistributedUpdateProcessor
System.out.println("LeaderParam:"
+ req.getParams().get(SEEN_LEADER));
-
-
System.out.println("leader? " + isLeader);
-
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
@@ -265,8 +262,6 @@ public class DistributedUpdateProcessor
// TODO: check for the version in the request params (this will be for user provided versions and optimistic concurrency only)
}
-
-
VersionBucket bucket = vinfo.bucket(hash);
synchronized (bucket) {
// we obtain the version when synchronized and then do the add so we can ensure that
@@ -285,9 +280,10 @@ public class DistributedUpdateProcessor
cmd.setVersion(version);
cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
bucket.updateHighest(version);
- System.out.println("add version field to doc");
+ System.out.println("add version field to doc:" + version);
} else {
// The leader forwarded us this update.
+ System.out.println("got version from leader:" + versionOnUpdate);
cmd.setVersion(versionOnUpdate);
// if we aren't the leader, then we need to check that updates were not re-ordered
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java Sun Nov 20 23:04:35 2011
@@ -121,7 +121,7 @@ public class BasicFunctionalityTest exte
// test merge factor picked up
SolrCore core = h.getCore();
- IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter(core);
+ IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getSolrCoreState().getIndexWriter(core);
assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
lrf.args.put(CommonParams.VERSION,"2.2");
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java Sun Nov 20 23:04:35 2011
@@ -48,7 +48,7 @@ public class BasicZkTest extends Abstrac
// test merge factor picked up
SolrCore core = h.getCore();
- IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter(core);
+ IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getSolrCoreState().getIndexWriter(core);
assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sun Nov 20 23:04:35 2011
@@ -20,10 +20,13 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
@@ -39,6 +42,7 @@ import org.apache.solr.common.cloud.ZkNo
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
/**
@@ -66,10 +70,12 @@ public class FullDistributedZkTest exten
String invalidField="ignore_exception__invalid_field_not_in_schema";
private static final int sliceCount = 3;
+ protected volatile CloudSolrServer cloudClient;
protected Map<SolrServer,ZkNodeProps> clientToInfo = new HashMap<SolrServer,ZkNodeProps>();
protected Map<String,List<SolrServer>> shardToClient = new HashMap<String,List<SolrServer>>();
protected Map<String,List<JettySolrRunner>> shardToJetty = new HashMap<String,List<JettySolrRunner>>();
+ private AtomicInteger i = new AtomicInteger(0);
@BeforeClass
public static void beforeClass() throws Exception {
@@ -85,19 +91,44 @@ public class FullDistributedZkTest exten
// TODO: for now, turn off stress because it uses regular clients, and we
// need the cloud client because we kill servers
stress = 0;
+
+
+ }
+
+ private void initCloudClient() {
+ // use the distributed solrj client
+ if (cloudClient == null) {
+ synchronized(this) {
+ try {
+ CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
+ server.setDefaultCollection(DEFAULT_COLLECTION);
+ cloudClient = server;
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
@Override
- protected void createServers(int numShards) throws Exception {
+ protected void createServers(int numServers) throws Exception {
System.setProperty("collection", "control_collection");
controlJetty = createJetty(testDir, testDir + "/control/data", "control_shard");
System.clearProperty("collection");
controlClient = createNewSolrServer(controlJetty.getLocalPort());
+ createJettys(numServers);
+ }
+
+ private void createJettys(int numJettys) throws Exception,
+ InterruptedException, TimeoutException, IOException, KeeperException,
+ URISyntaxException {
+ List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
+ List<SolrServer> clients = new ArrayList<SolrServer>();
StringBuilder sb = new StringBuilder();
- for (int i = 1; i <= numShards; i++) {
+ for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
- JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml");
+ JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + this.i.incrementAndGet(), null, "solrconfig-distrib-update.xml");
jettys.add(j);
SolrServer client = createNewSolrServer(j.getLocalPort());
clients.add(client);
@@ -168,10 +199,12 @@ public class FullDistributedZkTest exten
}
+ this.jettys.addAll(jettys);
+ this.clients.addAll(clients);
// build the shard string
- for (int i = 1; i <= numShards/2; i++) {
- JettySolrRunner j = jettys.get(i);
- JettySolrRunner j2 = jettys.get(i + (numShards/2 - 1));
+ for (int i = 1; i <= numJettys/2; i++) {
+ JettySolrRunner j = this.jettys.get(i);
+ JettySolrRunner j2 = this.jettys.get(i + (numJettys/2 - 1));
if (sb.length() > 0) sb.append(',');
sb.append("localhost:").append(j.getLocalPort()).append(context);
sb.append("|localhost:").append(j2.getLocalPort()).append(context);
@@ -266,6 +299,8 @@ public class FullDistributedZkTest exten
*/
@Override
public void doTest() throws Exception {
+ initCloudClient();
+
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
@@ -420,7 +455,7 @@ public class FullDistributedZkTest exten
// kill a shard
JettySolrRunner deadShard = killShard("shard2", 0);
- JettySolrRunner deadShard2 = killShard("shard3", 1);
+ //JettySolrRunner deadShard2 = killShard("shard3", 1);
// ensure shard is dead
try {
@@ -492,9 +527,9 @@ public class FullDistributedZkTest exten
deadShard.start(true);
- List<SolrServer> shard2Clients = shardToClient.get("shard2");
- System.out.println("shard2_1 port:" + ((CommonsHttpSolrServer)shard2Clients.get(0)).getBaseURL());
- System.out.println("shard2_2 port:" + ((CommonsHttpSolrServer)shard2Clients.get(1)).getBaseURL());
+ List<SolrServer> s2c = shardToClient.get("shard2");
+ System.out.println("shard2_1 port:" + ((CommonsHttpSolrServer)s2c.get(0)).getBaseURL());
+ System.out.println("shard2_2 port:" + ((CommonsHttpSolrServer)s2c.get(1)).getBaseURL());
// wait a bit for replication
Thread.sleep(5000);
@@ -502,8 +537,8 @@ public class FullDistributedZkTest exten
// if we properly recovered, we should now have the couple missing docs that
// came in while shard was down
- assertEquals(shard2Clients.get(0).query(new SolrQuery("*:*")).getResults()
- .getNumFound(), shard2Clients.get(1).query(new SolrQuery("*:*"))
+ assertEquals(s2c.get(0).query(new SolrQuery("*:*")).getResults()
+ .getNumFound(), s2c.get(1).query(new SolrQuery("*:*"))
.getResults().getNumFound());
// kill the other shard3 replica
@@ -512,11 +547,23 @@ public class FullDistributedZkTest exten
// should fail
//query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
- // we can't do this here - we have killed a shard
- //assertDocCounts();
-
query("q", "*:*", "sort", "n_tl1 desc");
+ // test adding another replica to a shard - it should do a recovery/replication to pick up the index from the leader
+ createJettys(1);
+
+ // new server should be part of first shard
+ // how man docs are on the new shard?
+ for (SolrServer client : shardToClient.get("shard1")) {
+ System.out.println("total:" + client.query(new SolrQuery("*:*")).getResults().getNumFound());
+ }
+ // wait a bit for replication
+ Thread.sleep(5000);
+ // assert the new server has the same number of docs as another server in that shard
+ assertEquals(shardToClient.get("shard1").get(0).query(new SolrQuery("*:*")).getResults().getNumFound(), shardToClient.get("shard1").get(2).query(new SolrQuery("*:*")).getResults().getNumFound());
+
+ assertDocCounts();
+
// Thread.sleep(10000000000L);
if (DEBUG) {
super.printLayout();
@@ -567,31 +614,18 @@ public class FullDistributedZkTest exten
System.out.println("docs:" + count + "\n\n");
clientCount += count;
}
- assertEquals("Doc Counts do not add up", controlCount, clientCount / (shardCount / sliceCount));
+ SolrQuery query = new SolrQuery("*:*");
+ query.add("distrib", "true");
+ assertEquals("Doc Counts do not add up", controlCount, cloudClient.query(query).getResults().getNumFound());
}
- volatile CloudSolrServer solrj;
-
@Override
- protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
-
- // use the distributed solrj client
- if (solrj == null) {
- synchronized(this) {
- try {
- CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
- server.setDefaultCollection(DEFAULT_COLLECTION);
- solrj = server;
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
+ protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
+
if (r.nextBoolean())
params.set("collection",DEFAULT_COLLECTION);
- QueryResponse rsp = solrj.query(params);
+ QueryResponse rsp = cloudClient.query(params);
return rsp;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java Sun Nov 20 23:04:35 2011
@@ -116,7 +116,7 @@ public class TestConfig extends SolrTest
@Test
public void testTermIndexInterval() throws Exception {
- IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+ IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
int interval = writer.getConfig().getTermIndexInterval();
assertEquals(256, interval);
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java Sun Nov 20 23:04:35 2011
@@ -33,7 +33,7 @@ public class TestLegacyMergeSchedulerPol
@Test
public void testLegacy() throws Exception {
- IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+ IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
assertTrue(writer.getConfig().getMergePolicy().getClass().getName().equals(LogDocMergePolicy.class.getName()));
assertTrue(writer.getConfig().getMergeScheduler().getClass().getName().equals(SerialMergeScheduler.class.getName()));
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java Sun Nov 20 23:04:35 2011
@@ -37,13 +37,13 @@ public class TestPropInject extends Abst
}
public void testMergePolicy() throws Exception {
- IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+ IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
LogByteSizeMergePolicy mp = (LogByteSizeMergePolicy)writer.getConfig().getMergePolicy();
assertEquals(64.0, mp.getMaxMergeMB(), 0);
}
public void testProps() throws Exception {
- IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+ IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler)writer.getConfig().getMergeScheduler();
assertEquals(2, cms.getMaxThreadCount());
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java Sun Nov 20 23:04:35 2011
@@ -33,14 +33,14 @@ public class TestPropInjectDefaults exte
@Test
public void testMergePolicyDefaults() throws Exception {
- IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+ IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
LogByteSizeMergePolicy mp = (LogByteSizeMergePolicy)writer.getConfig().getMergePolicy();
assertEquals(32.0, mp.getMaxMergeMB(), 0);
}
@Test
public void testPropsDefaults() throws Exception {
- IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+ IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler)writer.getConfig().getMergeScheduler();
assertEquals(4, cms.getMaxThreadCount());
}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sun Nov 20 23:04:35 2011
@@ -320,7 +320,7 @@ public class ZkStateReader {
}
- // TODO: do this with cloud state or something along those lines
+ // nocommit TODO: do this with cloud state or something along those lines
public String getLeader(String collection, String shard) throws Exception {
String url = null;