You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/07/28 02:37:32 UTC

svn commit: r1366573 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/test-files/solr/ solr/core/src/test/org/apache/solr/cloud/

Author: markrmiller
Date: Sat Jul 28 00:37:32 2012
New Revision: 1366573

URL: http://svn.apache.org/viewvc?rev=1366573&view=rev
Log:
add sync tests and logging

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/test-files/solr/solr.xml
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1366573&r1=1366572&r2=1366573&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sat Jul 28 00:37:32 2012
@@ -17,11 +17,13 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
@@ -63,8 +65,9 @@ public class SyncStrategy {
     shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
   }
   
-  private static class SyncShardRequest extends ShardRequest {
+  private static class ShardCoreRequest extends ShardRequest {
     String coreName;
+    public String baseUrl;
   }
   
   public boolean sync(ZkController zkController, SolrCore core,
@@ -105,23 +108,19 @@ public class SyncStrategy {
       if (!success
           && !areAnyOtherReplicasActive(zkController, leaderProps, collection,
               shardId)) {
-//        System.out
-//            .println("wasnt a success but no on else i active! I am the leader");
-        
+        log.info("Sync was not a success but no on else i active! I am the leader");
         success = true;
       }
       
       if (success) {
-        // solrcloud_debug
-        // System.out.println("Sync success");
-        // we are the leader - tell all of our replias to sync with us
+        log.info("Sync Success - now sync replicas to me");
         
         syncToMe(zkController, collection, shardId, leaderProps);
         
       } else {
+        SolrException.log(log, "Sync Failed");
         
-        // solrcloud_debug
-        // System.out.println("Sync failure");
+        // lets see who seems ahead...
       }
       
     } catch (Exception e) {
@@ -163,11 +162,7 @@ public class SyncStrategy {
         .getReplicaProps(collection, shardId,
             props.get(ZkStateReader.NODE_NAME_PROP),
             props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE); // TODO:
-    // should
-    // there
-    // be a
-    // state
-    // filter?
+    // TODO should there be a state filter?
     
     if (nodes == null) {
       // I have no replicas
@@ -198,19 +193,17 @@ public class SyncStrategy {
             leaderProps.get(ZkStateReader.NODE_NAME_PROP),
             leaderProps.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE);
     if (nodes == null) {
-      // System.out.println("I have no replicas");
-      // I have no replicas
+      log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas");
       return;
     }
-    //System.out.println("tell my replicas to sync");
+
     ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
     for (ZkCoreNodeProps node : nodes) {
       try {
-//         System.out
-//             .println("try and ask " + node.getCoreUrl() + " to sync");
-        log.info("try and ask " + node.getCoreUrl() + " to sync");
-        requestSync(node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
-
+        log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": try and ask " + node.getCoreUrl() + " to sync");
+        
+        requestSync(node.getBaseUrl(), node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
+        
       } catch (Exception e) {
         SolrException.log(log, "Error syncing replica to leader", e);
       }
@@ -221,27 +214,25 @@ public class SyncStrategy {
       ShardResponse srsp = shardHandler.takeCompletedOrError();
       if (srsp == null) break;
       boolean success = handleResponse(srsp);
-      //System.out.println("got response:" + success);
+      if (srsp.getException() != null) {
+        SolrException.log(log, "Sync request error: " + srsp.getException());
+      }
+      
       if (!success) {
          try {
-           log.info("Sync failed - asking replica to recover.");
-           
-           // TODO: do this in background threads
-           RequestRecovery recoverRequestCmd = new RequestRecovery();
-           recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
-           recoverRequestCmd.setCoreName(((SyncShardRequest)srsp.getShardRequest()).coreName);
+           log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - asking replica (" + srsp.getShardAddress() + ") to recover.");
            
-           HttpSolrServer server = new HttpSolrServer(srsp.getShardAddress());
-           server.setConnectionTimeout(45000);
-           server.setSoTimeout(45000);
-           server.request(recoverRequestCmd);
+           requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
+
          } catch (Exception e) {
-           log.info("Could not tell a replica to recover", e);
+           SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e);
          }
-         shardHandler.cancelAll();
-        break;
+      } else {
+        log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
       }
     }
+    
+
   }
   
   private boolean handleResponse(ShardResponse srsp) {
@@ -250,14 +241,19 @@ public class SyncStrategy {
     if (response == null) {
       return false;
     }
-    boolean success = (Boolean) response.get("sync");
+    Boolean success = (Boolean) response.get("sync");
+    
+    if (success == null) {
+      success = false;
+    }
     
     return success;
   }
 
-  private void requestSync(String replica, String leaderUrl, String coreName) {
-    SyncShardRequest sreq = new SyncShardRequest();
+  private void requestSync(String baseUrl, String replica, String leaderUrl, String coreName) {
+    ShardCoreRequest sreq = new ShardCoreRequest();
     sreq.coreName = coreName;
+    sreq.baseUrl = baseUrl;
     sreq.purpose = 1;
     // TODO: this sucks
     if (replica.startsWith("http://"))
@@ -273,6 +269,18 @@ public class SyncStrategy {
     shardHandler.submit(sreq, replica, sreq.params);
   }
   
+  private void requestRecovery(String baseUrl, String coreName) throws SolrServerException, IOException {
+    // TODO: do this in background threads
+    RequestRecovery recoverRequestCmd = new RequestRecovery();
+    recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+    recoverRequestCmd.setCoreName(coreName);
+    
+    HttpSolrServer server = new HttpSolrServer(baseUrl);
+    server.setConnectionTimeout(45000);
+    server.setSoTimeout(45000);
+    server.request(recoverRequestCmd);
+  }
+  
   public static ModifiableSolrParams params(String... params) {
     ModifiableSolrParams msp = new ModifiableSolrParams();
     for (int i = 0; i < params.length; i += 2) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1366573&r1=1366572&r2=1366573&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Sat Jul 28 00:37:32 2012
@@ -19,13 +19,12 @@ package org.apache.solr.handler.admin;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Collections;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.index.DirectoryReader;
@@ -666,7 +665,7 @@ public class CoreAdminHandler extends Re
   protected void handleRequestRecoveryAction(SolrQueryRequest req,
       SolrQueryResponse rsp) throws IOException {
     final SolrParams params = req.getParams();
-    log.info("The leader requested that we recover");
+    log.info("It has been requested that we recover");
     String cname = params.get(CoreAdminParams.CORE);
     if (cname == null) {
       cname = "";
@@ -675,6 +674,15 @@ public class CoreAdminHandler extends Re
     try {
       core = coreContainer.getCore(cname);
       if (core != null) {
+        // try to publish as recovering right away
+        try {
+          coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
+        } catch (KeeperException e) {
+          SolrException.log(log, "", e);
+        } catch (InterruptedException e) {
+          SolrException.log(log, "", e);
+        }
+        
         core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, cname);
       } else {
         SolrException.log(log, "Cound not find core to call recovery:" + cname);

Modified: lucene/dev/branches/branch_4x/solr/core/src/test-files/solr/solr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test-files/solr/solr.xml?rev=1366573&r1=1366572&r2=1366573&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test-files/solr/solr.xml (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test-files/solr/solr.xml Sat Jul 28 00:37:32 2012
@@ -28,7 +28,7 @@
   adminPath: RequestHandler path to manage cores.  
     If 'null' (or absent), cores will not be manageable via request handler
   -->
-  <cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}" hostContext="solr" zkClientTimeout="12000" numShards="${numShards:3}">
+  <cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}" hostContext="solr" zkClientTimeout="8000" numShards="${numShards:3}">
     <core name="collection1" instanceDir="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" schema="${schema:schema.xml}"/>
   </cores>
 </solr>

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1366573&r1=1366572&r2=1366573&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Sat Jul 28 00:37:32 2012
@@ -114,6 +114,24 @@ public class FullSolrCloudTest extends A
     String url;
     CloudSolrServerClient client;
     public ZkNodeProps info;
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((url == null) ? 0 : url.hashCode());
+      return result;
+    }
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      CloudJettyRunner other = (CloudJettyRunner) obj;
+      if (url == null) {
+        if (other.url != null) return false;
+      } else if (!url.equals(other.url)) return false;
+      return true;
+    }
   }
   
   static class CloudSolrServerClient {
@@ -418,7 +436,7 @@ public class FullSolrCloudTest extends A
       List<CloudJettyRunner> jetties = shardToJetty.get(slice.getKey());
       assertNotNull("Test setup problem: We found no jetties for shard: " + slice.getKey()
           + " just:" + shardToJetty.keySet(), jetties);
-      assertTrue(jetties.size() > 0);
+      assertEquals(slice.getValue().getShards().size(), jetties.size());
     }
   }
   

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java?rev=1366573&r1=1366572&r2=1366573&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java Sat Jul 28 00:37:32 2012
@@ -31,7 +31,6 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.servlet.SolrDispatchFilter;
@@ -77,7 +76,7 @@ public class SyncSliceTest extends FullS
   public SyncSliceTest() {
     super();
     sliceCount = 1;
-    shardCount = 3;
+    shardCount = TEST_NIGHTLY ? 7 : 3;
   }
   
   @Override
@@ -91,25 +90,31 @@ public class SyncSliceTest extends FullS
 
     del("*:*");
     List<String> skipServers = new ArrayList<String>();
-    
-    indexDoc(skipServers, id, 0, i1, 50, tlong, 50, t1,
+    int docId = 0;
+    indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
         "to come to the aid of their country.");
     
-    indexDoc(skipServers, id, 1, i1, 50, tlong, 50, t1,
+    indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
         "old haven was blue.");
     
     skipServers.add(shardToJetty.get("shard1").get(1).url + "/");
     
-    indexDoc(skipServers, id, 2, i1, 50, tlong, 50, t1,
+    indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
         "but the song was fancy.");
     
     skipServers.add(shardToJetty.get("shard1").get(2).url + "/");
     
-    indexDoc(skipServers, id, 3, i1, 50, tlong, 50, t1,
+    indexDoc(skipServers, id,docId++, i1, 50, tlong, 50, t1,
         "under the moon and over the lake");
     
     commit();
+    
+    waitForRecoveriesToFinish(false);
 
+    // shard should be inconsistent
+    String shardFailMessage = checkShardConsistency("shard1", true);
+    assertNotNull(shardFailMessage);
+    
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionAction.SYNCSHARD.toString());
     params.set("collection", "collection1");
@@ -131,32 +136,28 @@ public class SyncSliceTest extends FullS
     long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
     assertEquals(4, cloudClientDocs);
     
-    skipServers = new ArrayList<String>();
     
-    skipServers.add(shardToJetty.get("shard1").get(random().nextInt(shardCount)).url + "/");
+    // kill the leader - new leader could have all the docs or be missing one
+    CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
+    
+    skipServers = getRandomOtherJetty(leaderJetty, null); // but not the leader
     
     // this doc won't be on one node
-    indexDoc(skipServers, id, 4, i1, 50, tlong, 50, t1,
+    indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
         "to come to the aid of their country.");
     
-    // kill the leader - new leader could have all the docs or be missing one
-    CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
-
+    
     Set<CloudJettyRunner> jetties = new HashSet<CloudJettyRunner>();
     jetties.addAll(shardToJetty.get("shard1"));
     jetties.remove(leaderJetty);
+    assertEquals(shardCount - 1, jetties.size());
     
     chaosMonkey.killJetty(leaderJetty);
 
     // we are careful to make sure the downed node is no longer in the state,
     // because on some systems (especially freebsd w/ blackhole enabled), trying
     // to talk to a downed node causes grief
-    for (CloudJettyRunner cjetty : jetties) {
-      waitToSeeNotLive(((SolrDispatchFilter) cjetty.jetty.getDispatchFilter()
-          .getFilter()).getCores().getZkController().getZkStateReader(),
-          leaderJetty);
-    }
-    waitToSeeNotLive(cloudClient.getZkStateReader(), leaderJetty);
+    waitToSeeDownInCloudState(leaderJetty, jetties);
 
     waitForThingsToLevelOut();
     
@@ -164,6 +165,105 @@ public class SyncSliceTest extends FullS
     
     cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
     assertEquals(5, cloudClientDocs);
+    
+    CloudJettyRunner deadJetty = leaderJetty;
+    
+    // let's get the latest leader
+    while (deadJetty == leaderJetty) {
+      updateMappingsFromZk(this.jettys, this.clients);
+      leaderJetty = shardToLeaderJetty.get("shard1");
+    }
+    
+    // bring back dead node
+    ChaosMonkey.start(deadJetty.jetty); // he is not the leader anymore
+    
+    // give a moment to be sure it has started recovering
+    Thread.sleep(2000);
+    
+    waitForThingsToLevelOut();
+    waitForRecoveriesToFinish(false);
+    
+    skipServers = getRandomOtherJetty(leaderJetty, null);
+    
+    // skip list should be 
+    
+    //System.out.println("leader:" + leaderJetty.url);
+    //System.out.println("skip list:" + skipServers);
+    
+    // we are skipping the leader and one node
+    assertEquals(1, skipServers.size());
+    
+    // more docs than can peer sync
+    for (int i = 0; i < 300; i++) {
+      indexDoc(skipServers, id, docId++, i1, 50, tlong, 50, t1,
+          "to come to the aid of their country.");
+    }
+    
+    commit();
+    
+    waitForRecoveriesToFinish(false);
+    
+    // shard should be inconsistent
+    shardFailMessage = checkShardConsistency("shard1", true);
+    assertNotNull(shardFailMessage);
+    
+    
+    jetties = new HashSet<CloudJettyRunner>();
+    jetties.addAll(shardToJetty.get("shard1"));
+    jetties.remove(leaderJetty);
+    assertEquals(shardCount - 1, jetties.size());
+
+    
+    // kill the current leader
+    chaosMonkey.killJetty(leaderJetty);
+    
+    waitToSeeDownInCloudState(leaderJetty, jetties);
+    
+    Thread.sleep(4000);
+    
+    waitForRecoveriesToFinish(false);
+    
+    
+    // TODO: for now, we just check consistency -
+    // there will be 305 or 5 docs depending on who
+    // becomes the leader - eventually we want that to
+    // always be the 305
+    //checkShardConsistency(true, true);
+    checkShardConsistency(false, true);
+    
+  }
+
+  private List<String> getRandomJetty() {
+    return getRandomOtherJetty(null, null);
+  }
+  
+  private List<String> getRandomOtherJetty(CloudJettyRunner leader, CloudJettyRunner down) {
+    List<String> skipServers = new ArrayList<String>();
+    List<CloudJettyRunner> candidates = new ArrayList<CloudJettyRunner>();
+    candidates.addAll(shardToJetty.get("shard1"));
+
+    if (leader != null) {
+      candidates.remove(leader);
+    }
+    
+    if (down != null) {
+      candidates.remove(down);
+    }
+    
+    CloudJettyRunner cjetty = candidates.get(random().nextInt(candidates.size()));
+    skipServers.add(cjetty.url + "/");
+    return skipServers;
+  }
+
+  private void waitToSeeDownInCloudState(CloudJettyRunner leaderJetty,
+      Set<CloudJettyRunner> jetties) throws InterruptedException {
+
+    for (CloudJettyRunner cjetty : jetties) {
+      waitToSeeNotLive(((SolrDispatchFilter) cjetty.jetty.getDispatchFilter()
+          .getFilter()).getCores().getZkController().getZkStateReader(),
+          leaderJetty);
+    }
+    waitToSeeNotLive(cloudClient.getZkStateReader(), leaderJetty);
   }
 
   private void waitForThingsToLevelOut() throws Exception {