You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/02/20 17:31:18 UTC

svn commit: r1291350 - in /lucene/dev/trunk/solr/core/src/java/org/apache/solr: cloud/RecoveryStrategy.java update/DefaultSolrCoreState.java update/PeerSync.java update/UpdateLog.java

Author: yonik
Date: Mon Feb 20 16:31:18 2012
New Revision: 1291350

URL: http://svn.apache.org/viewvc?rev=1291350&view=rev
Log:
SOLR-3126: get num versions from updatelog, fail sync if comm fail when retrieving updates, use starting versions if syncing aftger startup, only sync first time in recovery loop, more sync logging

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1291350&r1=1291349&r2=1291350&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Feb 20 16:31:18 2012
@@ -70,6 +70,7 @@ public class RecoveryStrategy extends Th
   private volatile String coreName;
   private int retries;
   private SolrCore core;
+  private boolean recoveringAfterStartup;
   
   public RecoveryStrategy(SolrCore core) {
     this.core = core;
@@ -79,9 +80,12 @@ public class RecoveryStrategy extends Th
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();
     coreZkNodeName = zkController.getNodeName() + "_" + coreName;
-    
   }
-  
+
+  public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
+    this.recoveringAfterStartup = recoveringAfterStartup;
+  }
+
   // make sure any threads stop retrying
   public void close() {
     close = true;
@@ -201,48 +205,80 @@ public class RecoveryStrategy extends Th
     List<Long> startingRecentVersions;
     UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates();
     try {
-      startingRecentVersions = startingRecentUpdates.getVersions(100);
+      startingRecentVersions = startingRecentUpdates.getVersions(ulog.numRecordsToKeep);
     } finally {
       startingRecentUpdates.close();
     }
+
+    List<Long> reallyStartingVersions = ulog.getStartingVersions();
+
+
+    if (reallyStartingVersions != null && recoveringAfterStartup) {
+      int oldIdx = 0;  // index of the start of the old list in the current list
+      long firstStartingVersion = reallyStartingVersions.size() > 0 ? reallyStartingVersions.get(0) : 0;
+
+      for (; oldIdx<startingRecentVersions.size(); oldIdx++) {
+        if (startingRecentVersions.get(oldIdx) == firstStartingVersion) break;
+      }
+
+      if (oldIdx < startingRecentVersions.size()) {
+        log.info("####### Found new versions added after startup: num=" + (startingRecentVersions.size()-oldIdx));
+      }
+      
+      log.info("###### startupVersions=" + reallyStartingVersions);
+      log.info("###### currentVersions=" + startingRecentVersions);
+    }
     
+    if (recoveringAfterStartup) {
+      // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
+      // when we went down.
+      startingRecentVersions = reallyStartingVersions;
+    }
+
+    boolean firstTime = true;
+
     while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
       try {
-        // first thing we just try to sync
+
         zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
- 
+
         CloudDescriptor cloudDesc = core.getCoreDescriptor()
             .getCloudDescriptor();
         ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
             cloudDesc.getCollectionName(), cloudDesc.getShardId());
-        
+
         String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
         String leaderCoreName = leaderprops.get(ZkStateReader.CORE_NAME_PROP);
-        
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName); 
-        
+
+        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
+
         sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
-        
-        log.info("Attempting to PeerSync from " + leaderUrl);
-        PeerSync peerSync = new PeerSync(core,
-            Collections.singletonList(leaderUrl), 100);
-        peerSync.setStartingVersions(startingRecentVersions);
-        boolean syncSuccess = peerSync.sync();
-        if (syncSuccess) {
-          SolrQueryRequest req = new LocalSolrQueryRequest(core,
-              new ModifiableSolrParams());
-          core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
-          log.info("Sync Recovery was succesful - registering as Active");
-          // sync success - register as active and return
-          zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
-              coreZkNodeName, coreName);
-          succesfulRecovery = true;
-          close = true;
-          return;
+
+
+        // first thing we just try to sync
+        if (firstTime) {
+          firstTime = false;    // only try sync the first time through the loop
+          log.info("Attempting to PeerSync from " + leaderUrl + " recoveringAfterStartup="+recoveringAfterStartup);
+          PeerSync peerSync = new PeerSync(core,
+              Collections.singletonList(leaderUrl), ulog.numRecordsToKeep);
+          peerSync.setStartingVersions(startingRecentVersions);
+          boolean syncSuccess = peerSync.sync();
+          if (syncSuccess) {
+            SolrQueryRequest req = new LocalSolrQueryRequest(core,
+                new ModifiableSolrParams());
+            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+            log.info("Sync Recovery was succesful - registering as Active");
+            // sync success - register as active and return
+            zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
+                coreZkNodeName, coreName);
+            succesfulRecovery = true;
+            close = true;
+            return;
+          }
+
+          log.info("Sync Recovery was not successful - trying replication");
         }
 
-        log.info("Sync Recovery was not successful - trying replication");
-        
         log.info("Begin buffering updates");
         ulog.bufferUpdates();
         replayed = false;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1291350&r1=1291349&r2=1291350&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Mon Feb 20 16:31:18 2012
@@ -122,8 +122,12 @@ public final class DefaultSolrCoreState 
         }
         if (closed) return;
       }
-      
+
+      // if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
+      boolean recoveringAfterStartup = recoveryStrat == null;
+
       recoveryStrat = new RecoveryStrategy(core);
+      recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
       recoveryStrat.start();
       recoveryRunning = true;
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1291350&r1=1291349&r2=1291350&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Mon Feb 20 16:31:18 2012
@@ -29,11 +29,14 @@ import java.util.Set;
 import org.apache.commons.httpclient.NoHttpResponseException;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -138,7 +141,22 @@ public class PeerSync  {
     return Math.abs(arr.get(elem));
   }
 
-  /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates.
+  // start of peersync related debug messages.  includes the core name for correlation.
+  private String msg() {
+    ZkController zkController = uhandler.core.getCoreDescriptor().getCoreContainer().getZkController();
+
+    String myURL = "";
+
+    if (zkController != null) {
+      myURL = zkController.getZkServerAddress();
+    }
+
+    // TODO: core name turns up blank in many tests - find URL if cloud enabled?
+    return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
+  }
+
+  /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
+   *  when considering the last N updates between it and it's peers.
    *  A commit is not performed.
    */
   public boolean sync() {
@@ -146,6 +164,14 @@ public class PeerSync  {
       return false;
     }
 
+    log.info(msg() + "START replicas=" + replicas + " nUpdates=" + nUpdates);
+
+    if (debug) {
+      if (startingVersions != null) {
+        log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions);
+      }
+    }
+
     // Fire off the requests before getting our own recent updates (for better concurrency)
     // This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates, they would
     // have newer stuff that we also had (assuming updates are going on and are being forwarded).
@@ -178,7 +204,7 @@ public class PeerSync  {
       long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size()-1));
 
       if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
-        log.warn("PeerSync: too many updates received since start - startingUpdates no longer overlaps with cour urrentUpdates");
+        log.warn(msg() + "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates");
         return false;
       }
 
@@ -199,6 +225,7 @@ public class PeerSync  {
       }  else {
         // we have no versions and hence no frame of reference to tell if we can use a peers
         // updates to bring us into sync
+        log.info(msg() + "DONE.  We have no versions.  sync failed.");
         return false;
       }
     }
@@ -211,11 +238,13 @@ public class PeerSync  {
       if (srsp == null) break;
       boolean success = handleResponse(srsp);
       if (!success) {
+        log.info(msg() +  "DONE. sync failed");
         shardHandler.cancelAll();
         return false;
       }
     }
 
+    log.info(msg() +  "DONE. sync succeeded");
     return true;
   }
   
@@ -235,26 +264,35 @@ public class PeerSync  {
   }
 
   private boolean handleResponse(ShardResponse srsp) {
+    ShardRequest sreq = srsp.getShardRequest();
+
     if (srsp.getException() != null) {
 
       // TODO: look at this more thoroughly - we don't want
       // to fail on connection exceptions, but it may make sense
       // to determine this based on the number of fails
-      if (srsp.getException() instanceof SolrServerException) {
+      //
+      // If the replica went down between asking for versions and asking for specific updates, that
+      // shouldn't be treated as success since we counted on getting those updates back (and avoided
+      // redundantly asking other replicas for them).
+      if (sreq.purpose == 1 && srsp.getException() instanceof SolrServerException) {
         Throwable solrException = ((SolrServerException) srsp.getException())
             .getRootCause();
         if (solrException instanceof ConnectException
             || solrException instanceof NoHttpResponseException) {
+          log.info(msg() + " couldn't connect to " + srsp.getShardAddress() + ", counting as success");
+
           return true;
         }
       }
       // TODO: at least log???
       // srsp.getException().printStackTrace(System.out);
       
+      log.warn(msg() + " exception talking to " + srsp.getShardAddress() + ", counting as success");
+      
       return false;
     }
 
-    ShardRequest sreq = srsp.getShardRequest();
     if (sreq.purpose == 1) {
       return handleVersions(srsp);
     } else {
@@ -270,6 +308,8 @@ public class PeerSync  {
     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
     sreq.reportedVersions =  otherVersions;
 
+    log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
+
     if (otherVersions.size() == 0) {
       return true;
     }
@@ -278,6 +318,10 @@ public class PeerSync  {
 
     Collections.sort(otherVersions, absComparator);
 
+    if (debug) {
+      log.debug(msg() + " sorted versions from " + sreq.shards[0] + " = " + otherVersions);
+    }
+    
     long otherHigh = percentile(otherVersions, .2f);
     long otherLow = percentile(otherVersions, .8f);
 
@@ -286,6 +330,7 @@ public class PeerSync  {
       // This means that we might miss updates if we attempted to use this method.
       // Since there exists just one replica that is so much newer, we must
       // fail the sync.
+      log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
       return false;
     }
 
@@ -293,6 +338,7 @@ public class PeerSync  {
       // Small overlap between windows and ours is newer.
       // Using this list to sync would result in requesting/replaying results we don't need
       // and possibly bringing deleted docs back to life.
+      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
       return true;
     }
     
@@ -303,6 +349,8 @@ public class PeerSync  {
 
       if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
         // we either have this update, or already requested it
+        // TODO: what if the shard we previously requested this from returns failure (because it goes
+        // down)
         continue;
       }
 
@@ -313,11 +361,14 @@ public class PeerSync  {
     sreq.requestedUpdates = toRequest;
     
     if (toRequest.isEmpty()) {
+      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+
       // we had (or already requested) all the updates referenced by the replica
       return true;
     }
     
     if (toRequest.size() > maxUpdates) {
+      log.info(msg() + " Failing due to needing too many updates:" + maxUpdates);
       return false;
     }
 
@@ -327,9 +378,7 @@ public class PeerSync  {
   private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
     String replica = srsp.getShardRequest().shards[0];
 
-    log.info("Requesting updates from " + replica + " versions=" + toRequest);
-
-
+    log.info(msg() + "Requesting updates from " + replica + " versions=" + toRequest);
 
     // reuse our original request object
     ShardRequest sreq = srsp.getShardRequest();
@@ -353,7 +402,7 @@ public class PeerSync  {
 
     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
     if (updates.size() < sreq.requestedUpdates.size()) {
-      log.error("PeerSync: Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
+      log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
       return false;
     }
 
@@ -380,6 +429,10 @@ public class PeerSync  {
         o = obj;
         List<Object> entry = (List<Object>)o;
 
+        if (debug) {
+          log.debug(msg() + "raw update record " + o);
+        }
+        
         int oper = (Integer)entry.get(0);
         long version = (Long) entry.get(1);
         if (version == lastVersion && version != 0) continue;
@@ -395,6 +448,9 @@ public class PeerSync  {
             cmd.solrDoc = sdoc;
             cmd.setVersion(version);
             cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+            if (debug) {
+              log.debug(msg() + "add " + cmd);
+            }
             proc.processAdd(cmd);
             break;
           }
@@ -405,6 +461,9 @@ public class PeerSync  {
             cmd.setIndexedId(new BytesRef(idBytes));
             cmd.setVersion(version);
             cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+            if (debug) {
+              log.debug(msg() + "delete " + cmd);
+            }
             proc.processDelete(cmd);
             break;
           }
@@ -416,6 +475,9 @@ public class PeerSync  {
             cmd.query = query;
             cmd.setVersion(version);
             cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+            if (debug) {
+              log.debug(msg() + "deleteByQuery " + cmd);
+            }
             proc.processDelete(cmd);
             break;
           }
@@ -431,12 +493,12 @@ public class PeerSync  {
       // TODO: should this be handled separately as a problem with us?
       // I guess it probably already will by causing replication to be kicked off.
       sreq.updateException = e;
-      log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
+      log.error(msg() + "Error applying updates from " + sreq.shards + " ,update=" + o, e);
       return false;
     }
     catch (Exception e) {
       sreq.updateException = e;
-      log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
+      log.error(msg() + "Error applying updates from " + sreq.shards + " ,update=" + o, e);
       return false;
     }
     finally {
@@ -444,7 +506,7 @@ public class PeerSync  {
         proc.finish();
       } catch (Exception e) {
         sreq.updateException = e;
-        log.error("Error applying updates from " + sreq.shards + " ,finish()", e);
+        log.error(msg() + "Error applying updates from " + sreq.shards + " ,finish()", e);
         return false;
       }
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1291350&r1=1291349&r2=1291350&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Mon Feb 20 16:31:18 2012
@@ -90,7 +90,7 @@ public class UpdateLog implements Plugin
   private TransactionLog prevMapLog2;  // the transaction log used to look up entries found in prevMap
 
   private final int numDeletesToKeep = 1000;
-  private final int numRecordsToKeep = 100;
+  public final int numRecordsToKeep = 100;
   // keep track of deletes only... this is not updated on an add
   private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
     protected boolean removeEldestEntry(Map.Entry eldest) {