You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/21 00:04:36 UTC

svn commit: r1204292 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/pr...

Author: markrmiller
Date: Sun Nov 20 23:04:35 2011
New Revision: 1204292

URL: http://svn.apache.org/viewvc?rev=1204292&view=rev
Log:
do recovery on non leader shards that start up - add simple test for this - fix bug in new 'force' replication - fix bug where some full index replications are not properly replicated from - other minor improvements/movement

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Nov 20 23:04:35 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutExcep
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrException;
@@ -419,10 +420,57 @@ public final class ZkController {
     log.info("Attempting to update " + ZkStateReader.CLUSTER_STATE + " version "
         + null);
     CloudState state = CloudState.load(data);
+    String shardZkNodeName = getNodeName() + "_" + coreName;
+    
+    boolean recover = getIsRecover(cloudDesc, state, shardZkNodeName);
+    
+    String shardId = cloudDesc.getShardId();
+    if (shardId == null && !recover) {
+      shardId = assignShard.assignShard(collection, numShards);
+      cloudDesc.setShardId(shardId);
+    }
+    
+    if (log.isInfoEnabled()) {
+        log.info("Register shard - core:" + coreName + " address:"
+            + shardUrl);
+      }
+    
+    leaderElector.setupForSlice(shardId, collection);
+    
+    ZkNodeProps props = addToZk(collection, desc, cloudDesc, shardUrl, shardZkNodeName);
     
+    // leader election
+    doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
+    
+    String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
+    
+    System.out.println("leader url: "+ leaderUrl);
+    System.out.println("shard url: "+ shardUrl);
+    boolean iamleader = false;
+    if (leaderUrl.equals(shardUrl)) {
+      iamleader = true;
+    } else {
+      // we are not the leader, so catch up with recovery
+      recover = true;
+    }
+    
+    if (recover) {
+      if (desc.getCoreContainer() != null) {
+        doRecovery(collection, desc, cloudDesc, iamleader);
+      } else {
+        log.warn("For some odd reason a SolrCore is trying to recover but does not have access to a CoreContainer - skipping recovery.");
+      }
+    }
+
+    return shardId;
+  }
+
+
+  private boolean getIsRecover(final CloudDescriptor cloudDesc,
+      CloudState state, String shardZkNodeName) {
     boolean recover = false;
     Map<String,Slice> slices = state.getSlices(cloudDesc.getCollectionName());
-    String shardZkNodeName = getNodeName() + "_" + coreName;
+
     if (slices != null) {
       Map<String,String> nodes = new HashMap<String,String>();
 
@@ -440,31 +488,12 @@ public final class ZkController {
         recover = true;
       }
     }
-    
-    String shardId = cloudDesc.getShardId();
-    if (shardId == null && !recover) {
-      shardId = assignShard.assignShard(collection, numShards);
-      cloudDesc.setShardId(shardId);
-    }
-    
-
-    if (log.isInfoEnabled()) {
-        log.info("Register shard - core:" + coreName + " address:"
-            + shardUrl);
-      }
-    
-    leaderElector.setupForSlice(shardId, collection);
-    
-    ZkNodeProps props = addToZk(collection, desc, cloudDesc, shardUrl, shardZkNodeName, recover);
-    
-    // leader election
-    doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
-    return shardId;
+    return recover;
   }
 
 
   ZkNodeProps addToZk(String collection, final CoreDescriptor desc, final CloudDescriptor cloudDesc, String shardUrl,
-      final String shardZkNodeName, boolean recover)
+      final String shardZkNodeName)
       throws Exception {
     ZkNodeProps props = new ZkNodeProps();
     props.put(ZkStateReader.URL_PROP, shardUrl);
@@ -521,42 +550,6 @@ public final class ZkController {
 					zkClient.setData(ZkStateReader.CLUSTER_STATE,
 							CloudState.store(state), stat.getVersion());
 					updated = true;
-					if (recover) {
-					  // nocommit: joke code
-					  System.out.println("do recovery");
-					  // start buffer updates to tran log
-					  // and do recovery - either replay via realtime get 
-					  // or full index replication
-            System.out.println("RECOVERY");
-            // seems we cannot do this here since we are not fully running - 
-            // we need to trigger a recovery that happens later
-            System.out.println("shard is:" + cloudDesc.getShardId());
-            String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
-            System.out.println("leader url: "+ leaderUrl);
-            System.out.println("shard url: "+ shardUrl);
-            if (!leaderUrl.equals(shardUrl)) {
-              // if we are the leader, either we are trying to recover faster
-              // then our ephemeral timed out or we are the only node
-              
-              ModifiableSolrParams params = new ModifiableSolrParams();
-              params.set("command", "fetchindex");
-              params.set("force", true); // force replication regardless of
-                                         // versions
-              params.set("masterUrl", leaderUrl + "replication");
-              QueryRequest req = new QueryRequest(params);
-              req.setPath("/replication");
-              System.out.println("Make replication call to:" + leaderUrl);
-              System.out.println("params:" + params);
-              
-              // if we want to buffer updates while recovering, this
-              // will have to trigger later - http is not yet up
-              
-              // we need to use embedded cause http is not up yet anyhow
-              EmbeddedSolrServer server = new EmbeddedSolrServer(
-                  desc.getCoreContainer(), desc.getName());
-              server.request(req);
-            }
-					}
 				} catch (KeeperException e) {
 					if (e.code() != Code.BADVERSION) {
 						throw e;
@@ -569,6 +562,46 @@ public final class ZkController {
     return props;
   }
 
+
+  private void doRecovery(String collection, final CoreDescriptor desc,
+      final CloudDescriptor cloudDesc, boolean iamleader) throws Exception,
+      SolrServerException, IOException {
+    // nocommit: joke code
+    System.out.println("do recovery");
+    // start buffer updates to tran log
+    // and do recovery - either replay via realtime get 
+    // or full index replication
+
+    // seems perhaps we cannot do this here since we are not fully running - 
+    // we need to trigger a recovery that happens later
+    System.out.println("shard is:" + cloudDesc.getShardId());
+    
+    String leaderUrl = zkStateReader.getLeader(collection, cloudDesc.getShardId());
+    
+    if (!iamleader) {
+      // if we are the leader, either we are trying to recover faster
+      // then our ephemeral timed out or we are the only node
+      
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set("command", "fetchindex");
+      params.set("force", true); // force replication regardless of
+                                 // versions
+      params.set("masterUrl", leaderUrl + "replication");
+      QueryRequest req = new QueryRequest(params);
+      req.setPath("/replication");
+      System.out.println("Make replication call to:" + leaderUrl);
+      System.out.println("params:" + params);
+      
+      // if we want to buffer updates while recovering, this
+      // will have to trigger later - http is not yet up
+      
+      // we need to use embedded cause http is not up yet anyhow
+      EmbeddedSolrServer server = new EmbeddedSolrServer(
+          desc.getCoreContainer(), desc.getName());
+      server.request(req);
+    }
+  }
+
   private void doLeaderElectionProcess(String shardId,
       final String collection, String shardZkNodeName, ZkNodeProps props) throws KeeperException,
       InterruptedException, IOException {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Sun Nov 20 23:04:35 2011
@@ -561,7 +561,7 @@ public final class SolrCore implements S
     if (updateHandler == null) {
       initDirectoryFactory();
     } else {
-      directoryFactory = updateHandler.getIndexWriterProvider().getDirectoryFactory();
+      directoryFactory = updateHandler.getSolrCoreState().getDirectoryFactory();
     }
     
     initIndex();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Sun Nov 20 23:04:35 2011
@@ -81,6 +81,8 @@ import org.slf4j.LoggerFactory;
  * @since solr 1.4
  */
 public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
+  static final String FORCE = "force";
+  
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
   SolrCore core;
 
@@ -118,6 +120,7 @@ public class ReplicationHandler extends 
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     rsp.setHttpCaching(false);
     final SolrParams solrParams = req.getParams();
+    boolean force = solrParams.getBool(FORCE, false);
     String command = solrParams.get(COMMAND);
     if (command == null) {
       rsp.add(STATUS, OK_STATUS);
@@ -131,8 +134,9 @@ public class ReplicationHandler extends 
       
       // this is only set after commit or optimize or something - if it's not set,
       // just use the most recent
-      if (commitPoint == null) {
-        commitPoint = req.getSearcher().getIndexReader().getIndexCommit();
+      if (commitPoint == null || force) {
+        commitPoint = core.getDeletionPolicy().getLatestCommit();
+        indexCommitPoint = commitPoint;
       }
       
       if (commitPoint != null && replicationEnabled.get()) {
@@ -143,6 +147,7 @@ public class ReplicationHandler extends 
         // the CMD_GET_FILE_LIST command.
         //
         core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);
+        System.out.println("return version: " + commitPoint.getVersion());
         rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
         rsp.add(GENERATION, commitPoint.getGeneration());
       } else {
@@ -288,7 +293,7 @@ public class ReplicationHandler extends 
         nl.remove(SnapPuller.POLL_INTERVAL);
         tempSnapPuller = new SnapPuller(nl, this, core);
       }
-      tempSnapPuller.fetchLatestIndex(core, solrParams == null ? false : solrParams.getBool("force", false));
+      tempSnapPuller.fetchLatestIndex(core, solrParams == null ? false : solrParams.getBool(FORCE, false));
     } catch (Exception e) {
       LOG.error("SnapPull failed ", e);
     } finally {
@@ -353,7 +358,9 @@ public class ReplicationHandler extends 
       Collection<String> files = new HashSet<String>(commit.getFileNames());
       for (String fileName : files) {
         if(fileName.endsWith(".lock")) continue;
-        File file = new File(core.getIndexDir(), fileName);
+        // use new dir in case we are replicating from a full index replication
+        // and have not yet reloaded the core
+        File file = new File(core.getNewIndexDir(), fileName);
         Map<String, Object> fileMeta = getFileInfo(file);
         result.add(fileMeta);
       }
@@ -763,9 +770,10 @@ public class ReplicationHandler extends 
   }
 
 
-  void refreshCommitpoint() {
+  void refreshCommitpoint(boolean force) {
     IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
-    if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
+    System.out.println("refresh commit point to:" + commitPoint.getVersion());
+    if(force || replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
       indexCommitPoint = commitPoint;
     }
   }
@@ -1022,7 +1030,9 @@ public class ReplicationHandler extends 
           file = new File(core.getResourceLoader().getConfigDir(), cfileName);
         } else {
           //else read from the indexdirectory
-          file = new File(core.getIndexDir(), fileName);
+          // use new dir in case we are replicating from a full index replication
+          // and have not yet reloaded the core
+          file = new File(core.getNewIndexDir(), fileName);
         }
         if (file.exists() && file.canRead()) {
           inputStream = new FileInputStream(file);

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Sun Nov 20 23:04:35 2011
@@ -172,12 +172,16 @@ public class SnapPuller {
 
   /**
    * Gets the latest commit version and generation from the master
+   * @param force 
    */
   @SuppressWarnings("unchecked")
-  NamedList getLatestVersion() throws IOException {
+  NamedList getLatestVersion(boolean force) throws IOException {
     PostMethod post = new PostMethod(masterUrl);
     post.addParameter(COMMAND, CMD_INDEX_VERSION);
     post.addParameter("wt", "javabin");
+    if (force) {
+      post.addParameter(ReplicationHandler.FORCE, "true");
+    }
     return getNamedListResponse(post);
   }
 
@@ -249,7 +253,7 @@ public class SnapPuller {
       //get the current 'replicateable' index version in the master
       NamedList response = null;
       try {
-        response = getLatestVersion();
+        response = getLatestVersion(force);
       } catch (Exception e) {
         LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
         return false;
@@ -269,7 +273,7 @@ public class SnapPuller {
         if (searcherRefCounted != null)
           searcherRefCounted.decref();
       }
-      if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
+      if (!force && commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
         //master and slave are alsready in sync just return
         LOG.info("Slave in sync with master.");
         return false;
@@ -324,7 +328,7 @@ public class SnapPuller {
           }
           if (successfulInstall) {
             logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
-            doCommit();
+            doCommit(isFullCopyNeeded);
           }
         }
         replicationStartTime = 0;
@@ -469,7 +473,7 @@ public class SnapPuller {
     return sb;
   }
 
-  private void doCommit() throws IOException {
+  private void doCommit(boolean isFullCopyNeeded) throws IOException {
     SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
         new ModifiableSolrParams());
     try {
@@ -478,7 +482,7 @@ public class SnapPuller {
       solrCore.getUpdateHandler().newIndexWriter();
       solrCore.getSearcher(true, false, null);
       
-      replicationHandler.refreshCommitpoint();
+      replicationHandler.refreshCommitpoint(isFullCopyNeeded);
     } finally {
       req.close();
     }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Sun Nov 20 23:04:35 2011
@@ -559,7 +559,7 @@ public class DirectUpdateHandler2 extend
     return "DirectUpdateHandler2" + getStatistics();
   }
   
-  public SolrCoreState getIndexWriterProvider() {
+  public SolrCoreState getSolrCoreState() {
     return solrCoreState;
   }
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Sun Nov 20 23:04:35 2011
@@ -141,7 +141,7 @@ public abstract class UpdateHandler impl
    */
   public abstract void newIndexWriter() throws IOException;
 
-  public abstract SolrCoreState getIndexWriterProvider();
+  public abstract SolrCoreState getSolrCoreState();
 
   public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
   public abstract void delete(DeleteUpdateCommand cmd) throws IOException;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Nov 20 23:04:35 2011
@@ -247,11 +247,8 @@ public class DistributedUpdateProcessor 
 
     System.out.println("LeaderParam:"
         + req.getParams().get(SEEN_LEADER));
-
-
     System.out.println("leader? " + isLeader);
 
-
     // at this point, there is an update we need to try and apply.
     // we may or may not be the leader.
 
@@ -265,8 +262,6 @@ public class DistributedUpdateProcessor 
       // TODO: check for the version in the request params (this will be for user provided versions and optimistic concurrency only)
     }
 
-
-
     VersionBucket bucket = vinfo.bucket(hash);
     synchronized (bucket) {
       // we obtain the version when synchronized and then do the add so we can ensure that
@@ -285,9 +280,10 @@ public class DistributedUpdateProcessor 
           cmd.setVersion(version);
           cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
           bucket.updateHighest(version);
-          System.out.println("add version field to doc");
+          System.out.println("add version field to doc:" + version);
         } else {
           // The leader forwarded us this update.
+          System.out.println("got version from leader:" + versionOnUpdate);
           cmd.setVersion(versionOnUpdate);
 
           // if we aren't the leader, then we need to check that updates were not re-ordered

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java Sun Nov 20 23:04:35 2011
@@ -121,7 +121,7 @@ public class BasicFunctionalityTest exte
     // test merge factor picked up
     SolrCore core = h.getCore();
 
-    IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter(core);
+    IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getSolrCoreState().getIndexWriter(core);
     assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
 
     lrf.args.put(CommonParams.VERSION,"2.2");

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java Sun Nov 20 23:04:35 2011
@@ -48,7 +48,7 @@ public class BasicZkTest extends Abstrac
     // test merge factor picked up
     SolrCore core = h.getCore();
 
-    IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter(core);
+    IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getSolrCoreState().getIndexWriter(core);
 
     assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
     

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sun Nov 20 23:04:35 2011
@@ -20,10 +20,13 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServer;
@@ -39,6 +42,7 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.KeeperException;
 import org.junit.BeforeClass;
 
 /**
@@ -66,10 +70,12 @@ public class FullDistributedZkTest exten
   String invalidField="ignore_exception__invalid_field_not_in_schema";
   private static final int sliceCount = 3;
   
+  protected volatile CloudSolrServer cloudClient;
   
   protected Map<SolrServer,ZkNodeProps> clientToInfo = new HashMap<SolrServer,ZkNodeProps>();
   protected Map<String,List<SolrServer>> shardToClient = new HashMap<String,List<SolrServer>>();
   protected Map<String,List<JettySolrRunner>> shardToJetty = new HashMap<String,List<JettySolrRunner>>();
+  private AtomicInteger i = new AtomicInteger(0);
   
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -85,19 +91,44 @@ public class FullDistributedZkTest exten
     // TODO: for now, turn off stress because it uses regular clients, and we 
     // need the cloud client because we kill servers
     stress = 0;
+    
+    
+  }
+  
+  private void initCloudClient() {
+    // use the distributed solrj client
+    if (cloudClient == null) {
+      synchronized(this) {
+        try {
+          CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
+          server.setDefaultCollection(DEFAULT_COLLECTION);
+          cloudClient = server;
+        } catch (MalformedURLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
   }
   
   @Override
-  protected void createServers(int numShards) throws Exception {
+  protected void createServers(int numServers) throws Exception {
     System.setProperty("collection", "control_collection");
     controlJetty = createJetty(testDir, testDir + "/control/data", "control_shard");
     System.clearProperty("collection");
     controlClient = createNewSolrServer(controlJetty.getLocalPort());
 
+    createJettys(numServers);
+  }
+
+  private void createJettys(int numJettys) throws Exception,
+      InterruptedException, TimeoutException, IOException, KeeperException,
+      URISyntaxException {
+    List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
+    List<SolrServer> clients = new ArrayList<SolrServer>();
     StringBuilder sb = new StringBuilder();
-    for (int i = 1; i <= numShards; i++) {
+    for (int i = 1; i <= numJettys; i++) {
       if (sb.length() > 0) sb.append(',');
-      JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml");
+      JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + this.i.incrementAndGet(), null, "solrconfig-distrib-update.xml");
       jettys.add(j);
       SolrServer client = createNewSolrServer(j.getLocalPort());
       clients.add(client);
@@ -168,10 +199,12 @@ public class FullDistributedZkTest exten
       
     }
     
+    this.jettys.addAll(jettys);
+    this.clients.addAll(clients);
     // build the shard string
-    for (int i = 1; i <= numShards/2; i++) {
-      JettySolrRunner j = jettys.get(i);
-      JettySolrRunner j2 = jettys.get(i + (numShards/2 - 1));
+    for (int i = 1; i <= numJettys/2; i++) {
+      JettySolrRunner j = this.jettys.get(i);
+      JettySolrRunner j2 = this.jettys.get(i + (numJettys/2 - 1));
       if (sb.length() > 0) sb.append(',');
       sb.append("localhost:").append(j.getLocalPort()).append(context);
       sb.append("|localhost:").append(j2.getLocalPort()).append(context);
@@ -266,6 +299,8 @@ public class FullDistributedZkTest exten
    */
   @Override
   public void doTest() throws Exception {
+    initCloudClient();
+    
     handle.clear();
     handle.put("QTime", SKIPVAL);
     handle.put("timestamp", SKIPVAL);
@@ -420,7 +455,7 @@ public class FullDistributedZkTest exten
     
     // kill a shard
     JettySolrRunner deadShard = killShard("shard2", 0);
-    JettySolrRunner deadShard2 = killShard("shard3", 1);
+    //JettySolrRunner deadShard2 = killShard("shard3", 1);
     
     // ensure shard is dead
     try {
@@ -492,9 +527,9 @@ public class FullDistributedZkTest exten
 
     deadShard.start(true);
     
-    List<SolrServer> shard2Clients = shardToClient.get("shard2");
-    System.out.println("shard2_1 port:" + ((CommonsHttpSolrServer)shard2Clients.get(0)).getBaseURL());
-    System.out.println("shard2_2 port:" + ((CommonsHttpSolrServer)shard2Clients.get(1)).getBaseURL());
+    List<SolrServer> s2c = shardToClient.get("shard2");
+    System.out.println("shard2_1 port:" + ((CommonsHttpSolrServer)s2c.get(0)).getBaseURL());
+    System.out.println("shard2_2 port:" + ((CommonsHttpSolrServer)s2c.get(1)).getBaseURL());
     
     // wait a bit for replication
     Thread.sleep(5000);
@@ -502,8 +537,8 @@ public class FullDistributedZkTest exten
 
     // if we properly recovered, we should now have the couple missing docs that
     // came in while shard was down
-    assertEquals(shard2Clients.get(0).query(new SolrQuery("*:*")).getResults()
-        .getNumFound(), shard2Clients.get(1).query(new SolrQuery("*:*"))
+    assertEquals(s2c.get(0).query(new SolrQuery("*:*")).getResults()
+        .getNumFound(), s2c.get(1).query(new SolrQuery("*:*"))
         .getResults().getNumFound());
     
     // kill the other shard3 replica
@@ -512,11 +547,23 @@ public class FullDistributedZkTest exten
     // should fail
     //query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
     
-    // we can't do this here - we have killed a shard
-    //assertDocCounts();
-    
     query("q", "*:*", "sort", "n_tl1 desc");
     
+    // test adding another replica to a shard - it should do a recovery/replication to pick up the index from the leader
+    createJettys(1);
+    
+    // new server should be part of first shard
+    // how man docs are on the new shard?
+    for (SolrServer client : shardToClient.get("shard1")) {
+      System.out.println("total:" + client.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    // wait a bit for replication
+    Thread.sleep(5000);
+    // assert the new server has the same number of docs as another server in that shard
+    assertEquals(shardToClient.get("shard1").get(0).query(new SolrQuery("*:*")).getResults().getNumFound(), shardToClient.get("shard1").get(2).query(new SolrQuery("*:*")).getResults().getNumFound());
+    
+    assertDocCounts();
+    
     // Thread.sleep(10000000000L);
     if (DEBUG) {
       super.printLayout();
@@ -567,31 +614,18 @@ public class FullDistributedZkTest exten
       System.out.println("docs:" + count + "\n\n");
       clientCount += count;
     }
-    assertEquals("Doc Counts do not add up", controlCount, clientCount / (shardCount / sliceCount));
+    SolrQuery query = new SolrQuery("*:*");
+    query.add("distrib", "true");
+    assertEquals("Doc Counts do not add up", controlCount, cloudClient.query(query).getResults().getNumFound());
   }
 
-  volatile CloudSolrServer solrj;
-
   @Override
-  protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
-
-    // use the distributed solrj client
-    if (solrj == null) {
-      synchronized(this) {
-        try {
-          CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
-          server.setDefaultCollection(DEFAULT_COLLECTION);
-          solrj = server;
-        } catch (MalformedURLException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
+  protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {  
+    
     if (r.nextBoolean())
       params.set("collection",DEFAULT_COLLECTION);
 
-    QueryResponse rsp = solrj.query(params);
+    QueryResponse rsp = cloudClient.query(params);
     return rsp;
   }
   

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestConfig.java Sun Nov 20 23:04:35 2011
@@ -116,7 +116,7 @@ public class TestConfig extends SolrTest
 
   @Test
   public void testTermIndexInterval() throws Exception {
-    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
     int interval = writer.getConfig().getTermIndexInterval();
     assertEquals(256, interval);
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestLegacyMergeSchedulerPolicyConfig.java Sun Nov 20 23:04:35 2011
@@ -33,7 +33,7 @@ public class TestLegacyMergeSchedulerPol
 
   @Test
   public void testLegacy() throws Exception {
-    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
     assertTrue(writer.getConfig().getMergePolicy().getClass().getName().equals(LogDocMergePolicy.class.getName()));
     assertTrue(writer.getConfig().getMergeScheduler().getClass().getName().equals(SerialMergeScheduler.class.getName()));
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInject.java Sun Nov 20 23:04:35 2011
@@ -37,13 +37,13 @@ public class TestPropInject extends Abst
   }
 
   public void testMergePolicy() throws Exception {
-    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
     LogByteSizeMergePolicy mp = (LogByteSizeMergePolicy)writer.getConfig().getMergePolicy();
     assertEquals(64.0, mp.getMaxMergeMB(), 0);
   }
   
   public void testProps() throws Exception {
-    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
     ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler)writer.getConfig().getMergeScheduler();
     assertEquals(2, cms.getMaxThreadCount());
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/core/TestPropInjectDefaults.java Sun Nov 20 23:04:35 2011
@@ -33,14 +33,14 @@ public class TestPropInjectDefaults exte
 
   @Test
   public void testMergePolicyDefaults() throws Exception {
-    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
     LogByteSizeMergePolicy mp = (LogByteSizeMergePolicy)writer.getConfig().getMergePolicy();
     assertEquals(32.0, mp.getMaxMergeMB(), 0);
   }
   
   @Test
   public void testPropsDefaults() throws Exception {
-    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter(h.getCore());
+    IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getSolrCoreState().getIndexWriter(h.getCore());
     ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler)writer.getConfig().getMergeScheduler();
     assertEquals(4, cms.getMaxThreadCount());
   }

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1204292&r1=1204291&r2=1204292&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sun Nov 20 23:04:35 2011
@@ -320,7 +320,7 @@ public class ZkStateReader {
 
 	}
   
-  // TODO: do this with cloud state or something along those lines
+  // nocommit TODO: do this with cloud state or something along those lines
   public String getLeader(String collection, String shard) throws Exception {
     
     String url = null;