You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/07/22 22:30:38 UTC

svn commit: r1364429 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/c...

Author: markrmiller
Date: Sun Jul 22 20:30:37 2012
New Revision: 1364429

URL: http://svn.apache.org/viewvc?rev=1364429&view=rev
Log:
SOLR-3663: There are a couple of bugs in the sync process when a leader goes down and a new leader is elected.

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sun Jul 22 20:30:37 2012
@@ -127,6 +127,9 @@ Bug Fixes
   did not work correctly across a core reload, and update handler synchronization
   was changed to synchronize on core state since more than on update handler
   can coexist for a single index during a reload. (yonik)
+  
+* SOLR-3663: There are a couple of bugs in the sync process when a leader goes down and a 
+  new leader is elected. (Mark Miller)
 
 
 Other Changes

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sun Jul 22 20:30:37 2012
@@ -69,6 +69,7 @@ public class SyncStrategy {
   
   public boolean sync(ZkController zkController, SolrCore core,
       ZkNodeProps leaderProps) {
+    log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
     // TODO: look at our state usage of sync
     // zkController.publish(core, ZkStateReader.SYNC);
     
@@ -208,7 +209,7 @@ public class SyncStrategy {
 //         System.out
 //             .println("try and ask " + node.getCoreUrl() + " to sync");
         log.info("try and ask " + node.getCoreUrl() + " to sync");
-        requestSync(zkLeader.getCoreUrl(), node.getCoreName());
+        requestSync(node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
 
       } catch (Exception e) {
         SolrException.log(log, "Error syncing replica to leader", e);
@@ -224,12 +225,15 @@ public class SyncStrategy {
       if (!success) {
          try {
            log.info("Sync failed - asking replica to recover.");
-           //System.out.println("Sync failed - asking replica to recover.");
+           
+           // TODO: do this in background threads
            RequestRecovery recoverRequestCmd = new RequestRecovery();
            recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
            recoverRequestCmd.setCoreName(((SyncShardRequest)srsp.getShardRequest()).coreName);
            
-           HttpSolrServer server = new HttpSolrServer(zkLeader.getBaseUrl());
+           HttpSolrServer server = new HttpSolrServer(srsp.getShardAddress());
+           server.setConnectionTimeout(45000);
+           server.setSoTimeout(45000);
            server.request(recoverRequestCmd);
          } catch (Exception e) {
            log.info("Could not tell a replica to recover", e);
@@ -251,7 +255,7 @@ public class SyncStrategy {
     return success;
   }
 
-  private void requestSync(String replica, String coreName) {
+  private void requestSync(String replica, String leaderUrl, String coreName) {
     SyncShardRequest sreq = new SyncShardRequest();
     sreq.coreName = coreName;
     sreq.purpose = 1;
@@ -264,7 +268,7 @@ public class SyncStrategy {
     sreq.params.set("qt","/get");
     sreq.params.set("distrib",false);
     sreq.params.set("getVersions",Integer.toString(100));
-    sreq.params.set("sync",replica);
+    sreq.params.set("sync",leaderUrl);
     
     shardHandler.submit(sreq, replica, sreq.params);
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Sun Jul 22 20:30:37 2012
@@ -17,9 +17,17 @@ package org.apache.solr.handler.admin;
  * limitations under the License.
  */
 
+import java.io.IOException;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
@@ -103,6 +111,10 @@ public class CollectionsHandler extends 
           this.handleReloadAction(req, rsp);
           break;
         }
+        case SYNCSHARD: {
+          this.handleSyncShardAction(req, rsp);
+          break;
+        }
         
         default: {
           throw new RuntimeException("Unknown action: " + action);
@@ -123,6 +135,24 @@ public class CollectionsHandler extends 
     // TODO: what if you want to block until the collection is available?
     coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
   }
+  
+  private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
+    log.info("Syncing shard : " + req.getParamString());
+    String collection = req.getParams().required().get("collection");
+    String shard = req.getParams().required().get("shard");
+    
+    CloudState cloudState = coreContainer.getZkController().getCloudState();
+    
+    ZkNodeProps leaderProps = cloudState.getLeader(collection, shard);
+    ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+    
+    HttpSolrServer server = new HttpSolrServer(nodeProps.getBaseUrl());
+    RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
+    reqSyncShard.setCollection(collection);
+    reqSyncShard.setShard(shard);
+    reqSyncShard.setCoreName(nodeProps.getCoreName());
+    server.request(reqSyncShard);
+  }
 
 
   private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Sun Jul 22 20:30:37 2012
@@ -20,7 +20,9 @@ package org.apache.solr.handler.admin;
 import java.io.File;
 import java.io.IOException;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
@@ -28,6 +30,8 @@ import org.apache.lucene.index.Directory
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.SyncStrategy;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.CloudState;
@@ -47,8 +51,6 @@ import org.apache.solr.core.CoreDescript
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
@@ -69,8 +71,6 @@ import org.slf4j.LoggerFactory;
 public class CoreAdminHandler extends RequestHandlerBase {
   protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
   protected final CoreContainer coreContainer;
-  private ShardHandlerFactory shardHandlerFactory;
-  private ShardHandler shardHandler;
 
   public CoreAdminHandler() {
     super();
@@ -87,8 +87,6 @@ public class CoreAdminHandler extends Re
    */
   public CoreAdminHandler(final CoreContainer coreContainer) {
     this.coreContainer = coreContainer;
-    shardHandlerFactory = coreContainer.getShardHandlerFactory();
-    shardHandler = shardHandlerFactory.getShardHandler();
   }
 
 
@@ -182,6 +180,11 @@ public class CoreAdminHandler extends Re
           break;
         }
         
+        case REQUESTSYNCSHARD: {
+          this.handleRequestSyncAction(req, rsp);
+          break;
+        }
+        
         default: {
           doPersist = this.handleCustomAction(req, rsp);
           break;
@@ -676,6 +679,48 @@ public class CoreAdminHandler extends Re
     }
   }
   
+  protected void handleRequestSyncAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException {
+    final SolrParams params = req.getParams();
+
+    log.info("I have been requested to sync up my shard");
+    ZkController zkController = coreContainer.getZkController();
+    if (zkController == null) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
+    }
+    
+    String cname = params.get(CoreAdminParams.CORE);
+    if (cname == null) {
+      throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
+    }
+    SolrCore core = null;
+    try {
+      core = coreContainer.getCore(cname);
+      if (core != null) {
+        SyncStrategy syncStrategy = new SyncStrategy();
+
+        Map<String,String> props = new HashMap<String,String>();
+        props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());
+        props.put(ZkStateReader.CORE_NAME_PROP, cname);
+        props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
+        
+        boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props));
+        if (!success) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed");
+        }
+      } else {
+        SolrException.log(log, "Cound not find core to call sync:" + cname);
+      }
+    } finally {
+      // no recoveryStrat close for now
+      if (core != null) {
+        core.close();
+      }
+    }
+    
+
+  }
+  
   protected void handleWaitForStateAction(SolrQueryRequest req,
       SolrQueryResponse rsp) throws IOException, InterruptedException {
     final SolrParams params = req.getParams();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Sun Jul 22 20:30:37 2012
@@ -165,7 +165,7 @@ public class PeerSync  {
     String myURL = "";
 
     if (zkController != null) {
-      myURL = zkController.getZkServerAddress();
+      myURL = zkController.getBaseUrl();
     }
 
     // TODO: core name turns up blank in many tests - find URL if cloud enabled?

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Jul 22 20:30:37 2012
@@ -19,9 +19,12 @@ package org.apache.solr.update.processor
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
@@ -209,8 +212,22 @@ public class DistributedUpdateProcessor 
                   coreName, null, ZkStateReader.DOWN);
           if (replicaProps != null) {
             nodes = new ArrayList<Node>(replicaProps.size());
+            // check for test param that lets us miss replicas
+            String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
+            Set<String> skipListSet = null;
+            if (skipList != null) {
+              skipListSet = new HashSet<String>(skipList.length);
+              skipListSet.addAll(Arrays.asList(skipList));
+            }
+            
             for (ZkCoreNodeProps props : replicaProps) {
-              nodes.add(new StdNode(props));
+              if (skipList != null) {
+                if (!skipListSet.contains(props.getCoreUrl())) {
+                  nodes.add(new StdNode(props));
+                }
+              } else {
+                nodes.add(new StdNode(props));
+              }
             }
           }
           

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Sun Jul 22 20:30:37 2012
@@ -110,9 +110,15 @@ public abstract class AbstractDistribute
     waitForRecoveriesToFinish(collection, zkStateReader, verbose, true);
   }
   
+  protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
+      throws Exception {
+    waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, 120 * (TEST_NIGHTLY ? 2 : 1) * RANDOM_MULTIPLIER);
+  }
+  
   protected void waitForRecoveriesToFinish(String collection,
-      ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
+      ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
       throws Exception {
+    log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
     boolean cont = true;
     int cnt = 0;
     
@@ -139,7 +145,7 @@ public abstract class AbstractDistribute
           }
         }
       }
-      if (!sawLiveRecovering || cnt == 520) {
+      if (!sawLiveRecovering || cnt == timeoutSeconds) {
         if (!sawLiveRecovering) {
           if (verbose) System.out.println("no one is recoverying");
         } else {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Sun Jul 22 20:30:37 2012
@@ -36,6 +36,8 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.zookeeper.KeeperException;
 import org.eclipse.jetty.servlet.FilterHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The monkey can stop random or specific jetties used with SolrCloud.
@@ -45,7 +47,8 @@ import org.eclipse.jetty.servlet.FilterH
  *
  */
 public class ChaosMonkey {
-
+  private static Logger log = LoggerFactory.getLogger(ChaosMonkey.class);
+  
   private static final int CONLOSS_PERCENT = 3; //30%
   private static final int EXPIRE_PERCENT = 4; //40%
   private Map<String,List<CloudJettyRunner>> shardToJetty;
@@ -82,9 +85,12 @@ public class ChaosMonkey {
     Random random = LuceneTestCase.random();
     expireSessions = random.nextBoolean();
     causeConnectionLoss = random.nextBoolean();
+    monkeyLog("init - expire sessions:" + expireSessions
+        + " cause connection loss:" + causeConnectionLoss);
   }
   
   public void expireSession(JettySolrRunner jetty) {
+    monkeyLog("expire session for " + jetty.getLocalPort() + " !");
     SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilter) jetty.getDispatchFilter().getFilter();
     if (solrDispatchFilter != null) {
       CoreContainer cores = solrDispatchFilter.getCores();
@@ -106,8 +112,9 @@ public class ChaosMonkey {
   }
   
   public void randomConnectionLoss() throws KeeperException, InterruptedException {
-    String sliceName = getRandomSlice();
+    monkeyLog("cause connection loss!");
     
+    String sliceName = getRandomSlice();
     JettySolrRunner jetty = getRandomJetty(sliceName, aggressivelyKillLeaders);
     if (jetty != null) {
       causeConnectionLoss(jetty);
@@ -145,7 +152,7 @@ public class ChaosMonkey {
   }
   
   public static void stop(JettySolrRunner jetty) throws Exception {
-    
+    monkeyLog("stop shard! " + jetty.getLocalPort());
     // get a clean shutdown so that no dirs are left open...
     FilterHolder fh = jetty.getDispatchFilter();
     if (fh != null) {
@@ -162,6 +169,7 @@ public class ChaosMonkey {
   }
   
   public static void kill(JettySolrRunner jetty) throws Exception {
+    monkeyLog("kill shard! " + jetty.getLocalPort());
     FilterHolder fh = jetty.getDispatchFilter();
     SolrDispatchFilter sdf = null;
     if (fh != null) {
@@ -288,6 +296,7 @@ public class ChaosMonkey {
     
     if (numActive < 2) {
       // we cannot kill anyone
+      monkeyLog("only one active node in shard - monkey cannot kill :(");
       return null;
     }
     Random random = LuceneTestCase.random();
@@ -306,17 +315,19 @@ public class ChaosMonkey {
       boolean isLeader = leader.get(ZkStateReader.NODE_NAME_PROP).equals(jetties.get(index).nodeName);
       if (!aggressivelyKillLeaders && isLeader) {
         // we don't kill leaders...
+        monkeyLog("abort! I don't kill leaders");
         return null;
       } 
     }
 
     if (jetty.getLocalPort() == -1) {
       // we can't kill the dead
+      monkeyLog("abort! This guy is already dead");
       return null;
     }
     
     //System.out.println("num active:" + numActive + " for " + slice + " sac:" + jetty.getLocalPort());
-    
+    monkeyLog("chose a victim! " + jetty.getLocalPort());
     return jetty;
   }
   
@@ -335,6 +346,7 @@ public class ChaosMonkey {
   // synchronously starts and stops shards randomly, unless there is only one
   // active shard up for a slice or if there is one active and others recovering
   public void startTheMonkey(boolean killLeaders, final int roundPause) {
+    monkeyLog("starting");
     this.aggressivelyKillLeaders = killLeaders;
     startTime = System.currentTimeMillis();
     // TODO: when kill leaders is on, lets kill a higher percentage of leaders
@@ -409,14 +421,18 @@ public class ChaosMonkey {
             e.printStackTrace();
           }
         }
-        
-        System.out.println("I ran for " + (System.currentTimeMillis() - startTime)/1000.0f + "sec. I stopped " + stops + " and I started " + starts
+        monkeyLog("finished");
+        monkeyLog("I ran for " + (System.currentTimeMillis() - startTime)/1000.0f + "sec. I stopped " + stops + " and I started " + starts
             + ". I also expired " + expires.get() + " and caused " + connloss
             + " connection losses");
       }
     }.start();
   }
   
+  public static void monkeyLog(String msg) {
+    log.info("monkey: " + msg);
+  }
+  
   public void stopTheMonkey() {
     stop = true;
   }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Sun Jul 22 20:30:37 2012
@@ -20,7 +20,6 @@ package org.apache.solr.cloud;
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.http.client.HttpClient;
 import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -35,10 +34,15 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Slow
 @Ignore("ignore while investigating jenkins fails")
 public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
+  public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
+  
+  private static final int BASE_RUN_LENGTH = 180000;
 
   @BeforeClass
   public static void beforeSuperClass() {
@@ -53,7 +57,7 @@ public class ChaosMonkeyNothingIsSafeTes
   public void setUp() throws Exception {
     super.setUp();
     // TODO use @Noisy annotation as we expect lots of exceptions
-    ignoreException(".*");
+    //ignoreException(".*");
     System.setProperty("numShards", Integer.toString(sliceCount));
   }
   
@@ -67,8 +71,8 @@ public class ChaosMonkeyNothingIsSafeTes
   
   public ChaosMonkeyNothingIsSafeTest() {
     super();
-    sliceCount = atLeast(2);
-    shardCount = atLeast(sliceCount * 2);
+    sliceCount = 2;
+    shardCount = 6;
   }
   
   @Override
@@ -99,8 +103,9 @@ public class ChaosMonkeyNothingIsSafeTes
       ftIndexThread.start();
       
       chaosMonkey.startTheMonkey(true, 1500);
+      int runLength = atLeast(BASE_RUN_LENGTH);
       try {
-        Thread.sleep(180000);
+        Thread.sleep(runLength);
       } finally {
         chaosMonkey.stopTheMonkey();
       }
@@ -124,7 +129,7 @@ public class ChaosMonkeyNothingIsSafeTes
       Thread.sleep(2000);
       
       // wait until there are no recoveries...
-      waitForThingsToLevelOut();
+      waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f)));
       
       // make sure we again have leaders for each shard
       for (int j = 1; j < sliceCount; j++) {
@@ -156,35 +161,6 @@ public class ChaosMonkeyNothingIsSafeTes
       }
     }
   }
-
-  private void waitForThingsToLevelOut() throws Exception {
-    int cnt = 0;
-    boolean retry = false;
-    do {
-      waitForRecoveriesToFinish(VERBOSE);
-      
-      try {
-        commit();
-      } catch (Exception e) {
-        // we don't care if this commit fails on some nodes
-      }
-      
-      updateMappingsFromZk(jettys, clients);
-      
-      Set<String> theShards = shardToClient.keySet();
-      String failMessage = null;
-      for (String shard : theShards) {
-        failMessage = checkShardConsistency(shard, false);
-      }
-      
-      if (failMessage != null) {
-        retry  = true;
-      }
-      cnt++;
-      if (cnt > 10) break;
-      Thread.sleep(4000);
-    } while (retry);
-  }
   
   // skip the randoms - they can deadlock...
   protected void indexr(Object... fields) throws Exception {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Sun Jul 22 20:30:37 2012
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.common.SolrInputDocument;
@@ -32,6 +31,8 @@ import org.junit.Ignore;
 @Ignore("SOLR-3126")
 public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
   
+  private static final int BASE_RUN_LENGTH = 120000;
+
   @BeforeClass
   public static void beforeSuperClass() {
     
@@ -66,7 +67,7 @@ public class ChaosMonkeySafeLeaderTest e
   public ChaosMonkeySafeLeaderTest() {
     super();
     sliceCount = atLeast(2);
-    shardCount = atLeast(sliceCount);
+    shardCount = atLeast(sliceCount*2);
   }
   
   @Override
@@ -89,8 +90,8 @@ public class ChaosMonkeySafeLeaderTest e
     }
     
     chaosMonkey.startTheMonkey(false, 500);
-    
-    Thread.sleep(atLeast(8000));
+    int runLength = atLeast(BASE_RUN_LENGTH);
+    Thread.sleep(runLength);
     
     chaosMonkey.stopTheMonkey();
     
@@ -109,40 +110,12 @@ public class ChaosMonkeySafeLeaderTest e
     
     // try and wait for any replications and what not to finish...
     
-    waitForThingsToLevelOut();
+    waitForThingsToLevelOut(Math.round((runLength / 1000.0f / 5.0f)));
 
     checkShardConsistency(true, true);
     
     if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
   }
-
-  private void waitForThingsToLevelOut() throws Exception {
-    int cnt = 0;
-    boolean retry = false;
-    do {
-      waitForRecoveriesToFinish(false);
-      
-      commit();
-      
-      updateMappingsFromZk(jettys, clients);
-      
-      Set<String> theShards = shardToClient.keySet();
-      String failMessage = null;
-      for (String shard : theShards) {
-        failMessage = checkShardConsistency(shard, false);
-      }
-      
-      if (failMessage != null) {
-        retry = true;
-      } else {
-        retry = false;
-      }
-      
-      cnt++;
-      if (cnt > 10) break;
-      Thread.sleep(2000);
-    } while (retry);
-  }
   
   // skip the randoms - they can deadlock...
   protected void indexr(Object... fields) throws Exception {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Sun Jul 22 20:30:37 2012
@@ -55,6 +55,8 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
@@ -64,6 +66,8 @@ import org.junit.BeforeClass;
  */
 @Slow
 public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
+  private static Logger log = LoggerFactory.getLogger(FullSolrCloudTest.class);
+  
   @BeforeClass
   public static void beforeFullSolrCloudTest() {
     // shorten the log output more for this test type
@@ -103,12 +107,13 @@ public class FullSolrCloudTest extends A
   protected volatile ZkStateReader zkStateReader;
   
   private Map<String,SolrServer> shardToLeaderClient = new HashMap<String,SolrServer>();
-  private Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<String,CloudJettyRunner>();
+  protected Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<String,CloudJettyRunner>();
   
   class CloudJettyRunner {
     JettySolrRunner jetty;
     String nodeName;
     String coreNodeName;
+    String url;
   }
   
   static class CloudSolrServerClient {
@@ -403,6 +408,7 @@ public class FullSolrCloudTest extends A
             cjr.jetty = jetty;
             cjr.nodeName = shard.getValue().get(ZkStateReader.NODE_NAME_PROP);
             cjr.coreNodeName = shard.getKey();
+            cjr.url = shard.getValue().get(ZkStateReader.BASE_URL_PROP) + "/" + shard.getValue().get(ZkStateReader.CORE_NAME_PROP);
             list.add(cjr);
             if (isLeader) {
               shardToLeaderJetty.put(slice.getKey(), cjr);
@@ -653,6 +659,11 @@ public class FullSolrCloudTest extends A
     super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose);
   }
   
+  protected void waitForRecoveriesToFinish(boolean verbose, int timeoutSeconds)
+      throws Exception {
+    super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose, true, timeoutSeconds);
+  }
+  
   private void brindDownShardIndexSomeDocsAndRecover() throws Exception {
     SolrQuery query = new SolrQuery("*:*");
     query.set("distrib", false);
@@ -660,7 +671,6 @@ public class FullSolrCloudTest extends A
     commit();
     
     long deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
-    System.err.println("dsc:" + deadShardCount);
     
     query("q", "*:*", "sort", "n_tl1 desc");
     
@@ -1320,6 +1330,36 @@ public class FullSolrCloudTest extends A
     
   };
   
+  protected void waitForThingsToLevelOut(int waitForRecTimeSeconds) throws Exception {
+    log.info("Wait for recoveries to finish - wait " + waitForRecTimeSeconds + " for each attempt");
+    int cnt = 0;
+    boolean retry = false;
+    do {
+      waitForRecoveriesToFinish(VERBOSE, waitForRecTimeSeconds);
+      
+      try {
+        commit();
+      } catch (Exception e) {
+        // we don't care if this commit fails on some nodes
+      }
+      
+      updateMappingsFromZk(jettys, clients);
+      
+      Set<String> theShards = shardToClient.keySet();
+      String failMessage = null;
+      for (String shard : theShards) {
+        failMessage = checkShardConsistency(shard, false);
+      }
+      
+      if (failMessage != null) {
+        retry  = true;
+      }
+      cnt++;
+      if (cnt > 2) break;
+      Thread.sleep(4000);
+    } while (retry);
+  }
+  
   @Override
   @After
   public void tearDown() throws Exception {

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java?rev=1364429&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java Sun Jul 22 20:30:37 2012
@@ -0,0 +1,198 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Test sync phase that occurs when Leader goes down and a new Leader is
+ * elected.
+ */
+@Slow
+public class SyncSliceTest extends FullSolrCloudTest {
+  
+  @BeforeClass
+  public static void beforeSuperClass() {
+    
+  }
+  
+  @AfterClass
+  public static void afterSuperClass() {
+    
+  }
+  
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    // we expect this time of exception as shards go up and down...
+    //ignoreException(".*");
+    
+    System.setProperty("numShards", Integer.toString(sliceCount));
+  }
+  
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    resetExceptionIgnores();
+  }
+  
+  public SyncSliceTest() {
+    super();
+    sliceCount = 1;
+    shardCount = 3;
+  }
+  
+  @Override
+  public void doTest() throws Exception {
+    
+    handle.clear();
+    handle.put("QTime", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+    
+    waitForThingsToLevelOut();
+
+    del("*:*");
+    
+    List<String> skipServers = new ArrayList<String>();
+    
+    indexDoc(skipServers, id, 0, i1, 50, tlong, 50, t1,
+        "to come to the aid of their country.");
+    
+    indexDoc(skipServers, id, 1, i1, 50, tlong, 50, t1,
+        "old haven was blue.");
+    
+    skipServers.add(shardToJetty.get("shard1").get(1).url + "/");
+    
+    indexDoc(skipServers, id, 2, i1, 50, tlong, 50, t1,
+        "but the song was fancy.");
+    
+    skipServers.add(shardToJetty.get("shard1").get(2).url + "/");
+    
+    indexDoc(skipServers, id, 3, i1, 50, tlong, 50, t1,
+        "under the moon and over the lake");
+    
+    commit();
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionAction.SYNCSHARD.toString());
+    params.set("collection", "collection1");
+    params.set("shard", "shard1");
+    SolrRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    
+    String baseUrl = ((HttpSolrServer) shardToClient.get("shard1").get(2)).getBaseURL();
+    baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
+    
+    HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
+    baseServer.request(request);
+    
+    waitForThingsToLevelOut();
+    
+    checkShardConsistency(false, true);
+    
+    skipServers = new ArrayList<String>();
+    
+    skipServers.add(shardToJetty.get("shard1").get(random().nextInt(shardCount)).url + "/");
+    
+    // this doc won't be on one node
+    indexDoc(skipServers, id, 4, i1, 50, tlong, 50, t1,
+        "to come to the aid of their country.");
+    
+    // kill the leader - new leader could have all the docs or be missing one
+    chaosMonkey.killJetty(shardToLeaderJetty.get("shard1").jetty);
+
+    waitForThingsToLevelOut();
+    
+    checkShardConsistency(false, true);
+  }
+
+  private void waitForThingsToLevelOut() throws Exception {
+    int cnt = 0;
+    boolean retry = false;
+    do {
+      waitForRecoveriesToFinish(false);
+      
+      commit();
+      
+      updateMappingsFromZk(jettys, clients);
+      
+      Set<String> theShards = shardToClient.keySet();
+      String failMessage = null;
+      for (String shard : theShards) {
+        failMessage = checkShardConsistency(shard, false);
+      }
+      
+      if (failMessage != null) {
+        retry = true;
+      } else {
+        retry = false;
+      }
+      
+      cnt++;
+      if (cnt > 10) break;
+      Thread.sleep(2000);
+    } while (retry);
+  }
+  
+  protected void indexDoc(List<String> skipServers, Object... fields) throws IOException,
+      SolrServerException {
+    SolrInputDocument doc = new SolrInputDocument();
+    
+    addFields(doc, fields);
+    addFields(doc, "rnd_b", true);
+    
+    controlClient.add(doc);
+    
+    UpdateRequest ureq = new UpdateRequest();
+    ureq.add(doc);
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    for (String skip : skipServers) {
+      params.add("test.distrib.skip.servers", skip);
+    }
+    ureq.setParams(params);
+    ureq.process(cloudClient);
+  }
+  
+  // skip the randoms - they can deadlock...
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    addFields(doc, fields);
+    addFields(doc, "rnd_b", true);
+    indexDoc(doc);
+  }
+
+}

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Sun Jul 22 20:30:37 2012
@@ -221,6 +221,44 @@ public class CoreAdminRequest extends So
     }
   }
   
+  public static class RequestSyncShard extends CoreAdminRequest {
+    private String shard;
+    private String collection;
+    
+    public RequestSyncShard() {
+      action = CoreAdminAction.REQUESTSYNCSHARD;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      if( action == null ) {
+        throw new RuntimeException( "no action specified!" );
+      }
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, action.toString());
+      params.set("shard", shard);
+      params.set("collection", collection);
+      params.set(CoreAdminParams.CORE, core);
+      return params;
+    }
+
+    public String getShard() {
+      return shard;
+    }
+
+    public void setShard(String shard) {
+      this.shard = shard;
+    }
+
+    public String getCollection() {
+      return collection;
+    }
+
+    public void setCollection(String collection) {
+      this.collection = collection;
+    }
+  }
+  
     //a persist core request
   public static class Persist extends CoreAdminRequest {
     protected String fileName = null;

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java Sun Jul 22 20:30:37 2012
@@ -45,6 +45,10 @@ public class ZkCoreNodeProps {
     return nodeProps.get(ZkStateReader.CORE_NAME_PROP);
   }
   
+  public static String getCoreUrl(ZkNodeProps nodeProps) {
+    return getCoreUrl(nodeProps.get(ZkStateReader.BASE_URL_PROP), nodeProps.get(ZkStateReader.CORE_NAME_PROP));
+  }
+  
   public static String getCoreUrl(String baseUrl, String coreName) {
     StringBuilder sb = new StringBuilder();
     if (baseUrl == null) return null;

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sun Jul 22 20:30:37 2012
@@ -277,7 +277,6 @@ public class ZkStateReader {
   private synchronized void updateCloudState(boolean immediate,
       final boolean onlyLiveNodes) throws KeeperException,
       InterruptedException {
-    log.info("Manual update of cluster state initiated");
     // build immutable CloudInfo
     
     if (immediate) {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Sun Jul 22 20:30:37 2012
@@ -28,7 +28,7 @@ public interface CollectionParams 
 
 
   public enum CollectionAction {
-    CREATE, DELETE, RELOAD;
+    CREATE, DELETE, RELOAD, SYNCSHARD;
     
     public static CollectionAction get( String p )
     {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1364429&r1=1364428&r2=1364429&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Sun Jul 22 20:30:37 2012
@@ -93,7 +93,8 @@ public interface CoreAdminParams 
     RENAME,
     MERGEINDEXES,
     PREPRECOVERY, 
-    REQUESTRECOVERY;
+    REQUESTRECOVERY, 
+    REQUESTSYNCSHARD;
     
     public static CoreAdminAction get( String p )
     {