You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/28 17:35:15 UTC

svn commit: r1207402 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/update/ java/org/apache/solr/update/processor/ test/org/apache/solr/search/

Author: yonik
Date: Mon Nov 28 16:35:14 2011
New Revision: 1207402

URL: http://svn.apache.org/viewvc?rev=1207402&view=rev
Log:
SOLR-2808: log updates while recovering, then replay

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Mon Nov 28 16:35:14 2011
@@ -31,10 +31,7 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /** @lucene.experimental */
 class NullUpdateLog extends UpdateLog {
@@ -102,6 +99,20 @@ class NullUpdateLog extends UpdateLog {
     return false;
   }
 
+  @Override
+  public void bufferUpdates() {
+  }
+
+  @Override
+  public Future<FSUpdateLog.RecoveryInfo> applyBufferedUpdates() {
+    return null;
+  }
+
+  @Override
+  public State getState() {
+    return State.ACTIVE;
+  }
+
 }
 
 /** @lucene.experimental */
@@ -110,6 +121,7 @@ public class FSUpdateLog extends UpdateL
   public static String TLOG_NAME="tlog";
 
   long id = -1;
+  private State state = State.ACTIVE;
 
   private TransactionLog tlog;
   private TransactionLog prevTlog;
@@ -447,10 +459,10 @@ public class FSUpdateLog extends UpdateL
   @Override
   public boolean recoverFromLog() {
     if (tlogFiles.length == 0) return false;
-    TransactionLog oldTlog = null;     // todo - change name
+    TransactionLog oldTlog = null;
     try {
       oldTlog = new TransactionLog( new File(tlogDir, tlogFiles[tlogFiles.length-1]), null, true );
-      recoveryExecutor.execute(new LogReplayer(oldTlog));
+      recoveryExecutor.execute(new LogReplayer(oldTlog, false));
       return true;
 
     } catch (Exception ex) {
@@ -484,131 +496,218 @@ public class FSUpdateLog extends UpdateL
     }
   }
 
-  public static Runnable testing_logReplayHook;  // aquires a permit before each log read
-  public static Runnable testing_logReplayFinishHook;  // releases a permit when log replay has finished
+  @Override
+  public void bufferUpdates() {
+    assert state == State.ACTIVE;
 
+    // block all updates to eliminate race conditions
+    // reading state and acting on it in the update processor
+    versionInfo.blockUpdates();
+    try {
+      state = State.BUFFERING;
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+    recoveryInfo = new RecoveryInfo();
+  }
+
+  /** Returns the Future to wait on, or null if no replay was needed */
+  @Override
+  public Future<RecoveryInfo> applyBufferedUpdates() {
+    assert state == State.BUFFERING;
+
+    // block all updates to eliminate race conditions
+    // reading state and acting on it in the update processor
+    versionInfo.blockUpdates();
+    try {
+      if (state != State.BUFFERING) return null;
+      state = State.APPLYING_BUFFERED;
+
+      // handle case when no log was even created because no updates
+      // were received.
+      if (tlog == null) {
+        state = State.ACTIVE;
+        return null;
+      }
+
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+
+    tlog.incref();
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+    LogReplayer replayer = new LogReplayer(tlog, true);
+    return cs.submit(replayer, recoveryInfo);
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+
+  public static Runnable testing_logReplayHook;  // called before each log read
+  public static Runnable testing_logReplayFinishHook;  // called when log replay has finished
+
+
+
+  private RecoveryInfo recoveryInfo;
 
   // TODO: do we let the log replayer run across core reloads?
   class LogReplayer implements Runnable {
-    TransactionLog tlog;
+    TransactionLog translog;
     TransactionLog.LogReader tlogReader;
+    boolean activeLog;
+    boolean finishing = false;  // state where we lock out other updates and finish those updates that snuck in before we locked
 
 
-    public LogReplayer(TransactionLog tlog) {
-      this.tlog = tlog;
+    public LogReplayer(TransactionLog translog, boolean activeLog) {
+      this.translog = translog;
+      this.activeLog = activeLog;
     }
 
     @Override
     public void run() {
-      uhandler.core.log.warn("Starting log replay " + tlogReader);
-
-      tlogReader = tlog.getReader();
-
-      SolrParams params = new ModifiableSolrParams();
-      long commitVersion = 0;
+      try {
 
-      for(;;) {
-        Object o = null;
+        uhandler.core.log.warn("Starting log replay " + tlogReader);
 
-        try {
-          if (testing_logReplayHook != null) testing_logReplayHook.run();
-          o = tlogReader.next(null);
-        } catch (InterruptedException e) {
-          SolrException.log(log,e);
-        } catch (IOException e) {
-          SolrException.log(log,e);
-        }
+        tlogReader = translog.getReader();
 
-        if (o == null) break;
+        SolrParams params = new ModifiableSolrParams();
+        long commitVersion = 0;
 
-        // create a new request each time since the update handler and core could change
-        SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
-
-        // TODO: race?  This core could close on us if it was reloaded
+        for(;;) {
+          Object o = null;
+
+          try {
+            if (testing_logReplayHook != null) testing_logReplayHook.run();
+            o = tlogReader.next();
+            if (o == null && activeLog) {
+              if (!finishing) {
+                // block to prevent new adds, but don't immediately unlock since
+                // we could be starved from ever completing recovery.  Only unlock
+                // after we've finished this recovery.
+                // NOTE: our own updates won't be blocked since the thread holding a write lock can
+                // lock a read lock.
+                versionInfo.blockUpdates();
+                finishing = true;
+                o = tlogReader.next();
+              } else {
+                // we had previously blocked updates, so this "null" from the log is final.
+
+                // Wait until our final commit to change the state and unlock.
+                // This is only so no new updates are written to the current log file, and is
+                // only an issue if we crash before the commit (and we are paying attention
+                // to incomplete log files).
+                //
+                // versionInfo.lock.writeLock().unlock();
+              }
+            }
+          } catch (InterruptedException e) {
+            SolrException.log(log,e);
+          } catch (IOException e) {
+            SolrException.log(log,e);
+          }
 
-        try {
+          if (o == null) break;
 
-          // should currently be a List<Oper,Ver,Doc/Id>
-          List entry = (List)o;
+          // create a new request each time since the update handler and core could change
+          SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
 
-          int oper = (Integer)entry.get(0);
-          long version = (Long) entry.get(1);
+          // TODO: race?  This core could close on us if it was reloaded
 
-          switch (oper) {
-            case UpdateLog.ADD:
-            {
-              // byte[] idBytes = (byte[]) entry.get(2);
-              SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
-              AddUpdateCommand cmd = new AddUpdateCommand(req);
-              // cmd.setIndexedId(new BytesRef(idBytes));
-              cmd.solrDoc = sdoc;
-              cmd.setVersion(version);
-              cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-              uhandler.addDoc(cmd);
-              break;
+          try {
 
-              // TODO: updates need to go through versioning code for handing reorders? (for replicas at least,
-              // depending on how they recover.
-            }
-            case UpdateLog.DELETE:
-            {
-              byte[] idBytes = (byte[]) entry.get(2);
-              DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
-              cmd.setIndexedId(new BytesRef(idBytes));
-              cmd.setVersion(version);
-              cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-              uhandler.delete(cmd);
-              break;
-            }
+            // should currently be a List<Oper,Ver,Doc/Id>
+            List entry = (List)o;
+
+            int oper = (Integer)entry.get(0);
+            long version = (Long) entry.get(1);
+
+            switch (oper) {
+              case UpdateLog.ADD:
+              {
+                // byte[] idBytes = (byte[]) entry.get(2);
+                SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+                AddUpdateCommand cmd = new AddUpdateCommand(req);
+                // cmd.setIndexedId(new BytesRef(idBytes));
+                cmd.solrDoc = sdoc;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+                uhandler.addDoc(cmd);
+                break;
+
+                // TODO: updates need to go through versioning code for handing reorders? (for replicas at least,
+                // depending on how they recover.
+              }
+              case UpdateLog.DELETE:
+              {
+                byte[] idBytes = (byte[]) entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.setIndexedId(new BytesRef(idBytes));
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+                uhandler.delete(cmd);
+                break;
+              }
+
+              case UpdateLog.DELETE_BY_QUERY:
+              {
+                String query = (String)entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.query = query;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+                uhandler.delete(cmd);
+                break;
+              }
+
+              case UpdateLog.COMMIT:
+              {
+                // currently we don't log commits
+                commitVersion = version;
+                break;
+              }
 
-            case UpdateLog.DELETE_BY_QUERY:
-            {
-              String query = (String)entry.get(2);
-              DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
-              cmd.query = query;
-              cmd.setVersion(version);
-              cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-              uhandler.delete(cmd);
-              break;
+              default:
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
             }
+          } catch (IOException ex) {
 
-            case UpdateLog.COMMIT:
-            {
-              // currently we don't log commits
-              commitVersion = version;
-              break;
-            }
+          } catch (ClassCastException cl) {
+            log.warn("Corrupt log", cl);
+            // would be caused by a corrupt transaction log
+          } catch (Exception ex) {
+            log.warn("Exception replaying log", ex);
 
-            default:
-              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+            // something wrong with the request?
           }
+        }
+
+        SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+        CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+        cmd.setVersion(commitVersion);
+        cmd.softCommit = false;
+        cmd.waitSearcher = true;
+        cmd.setFlags(UpdateCommand.REPLAY);
+        try {
+          uhandler.commit(cmd);
         } catch (IOException ex) {
+          log.error("Replay exception: final commit.", ex);
+        }
 
-        } catch (ClassCastException cl) {
-          log.warn("Corrupt log", cl);
-          // would be caused by a corrupt transaction log
-        } catch (Exception ex) {
-          log.warn("Exception replaying log", ex);
+        tlogReader.close();
+        translog.decref();
 
-          // something wrong with the request?
+      } finally {
+        // change the state while updates are still blocked to prevent races
+        state = State.ACTIVE;
+        if (finishing) {
+          versionInfo.unblockUpdates();
         }
       }
 
-      SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
-      CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
-      cmd.setVersion(commitVersion);
-      cmd.softCommit = false;
-      cmd.waitSearcher = true;
-      cmd.setFlags(UpdateCommand.REPLAY);
-      try {
-        uhandler.commit(cmd);
-      } catch (IOException ex) {
-        log.error("Replay exception: final commit.", ex);
-      }
-
-      tlogReader.close();
-      tlog.decref();
-
       log.warn("Ending log replay " + tlogReader);
 
       if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Mon Nov 28 16:35:14 2011
@@ -67,8 +67,6 @@ public class TransactionLog {
   Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
   List<String> globalStringList = new ArrayList<String>();
 
-  CountDownLatch latch;  // if set, used to signal that we just added another log record
-
   // write a BytesRef as a byte array
   JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
     @Override
@@ -238,7 +236,6 @@ public class TransactionLog {
         // fos.flushBuffer();  // flush later
 
 
-        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -262,7 +259,6 @@ public class TransactionLog {
         BytesRef br = cmd.getIndexedId();
         codec.writeByteArray(br.bytes, br.offset, br.length);
         // fos.flushBuffer();  // flush later
-        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -285,7 +281,6 @@ public class TransactionLog {
         codec.writeLong(cmd.getVersion());
         codec.writeStr(cmd.query);
         // fos.flushBuffer();  // flush later
-        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -309,7 +304,6 @@ public class TransactionLog {
         codec.writeLong(cmd.getVersion());
         codec.writeStr(END_MESSAGE);  // ensure these bytes are the last in the file
         // fos.flushBuffer();  // flush later
-        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -317,12 +311,6 @@ public class TransactionLog {
     }
   }
 
-  private void endWrite() {
-    if (latch != null) {
-      latch.countDown();
-    }
-  }
-
 
   /* This method is thread safe */
   public Object lookup(long pos) {
@@ -412,55 +400,22 @@ public class TransactionLog {
       incref();
     }
 
-    /** Returns the next object from the log.  If this log is being concurrently written
-     * to, and you wish to block until a new record is available, then pass in a latch
-     * to use.
+    /** Returns the next object from the log, or null if none available.
      *
-     * @param latch The latch to use to block, waiting for the next log record
      * @return The log record, or null if EOF
      * @throws IOException
      */
-    public Object next(CountDownLatch latch) throws IOException, InterruptedException {
+    public Object next() throws IOException, InterruptedException {
       long pos = fis.position();
 
       synchronized (TransactionLog.this) {
         if (pos >= fos.size()) {
-          // we caught up.
-          TransactionLog.this.latch = latch;
-
-
-          // TODO: how to prevent a race between catching up and
-          // switching to "active"... need higher level coordination?
-          // Probably since updating this log is normally done after
-          // updating the index.  Perhaps more than one phase...
-          // 1) next() returns null
-          // 2) replayer sets flag to "almost active" and updates start going to the index
-          // 3) replayer continues calling next() to get any records that were added inbetween.
-          // This *still* doesn't work since a thread that skipped adding to the index could
-          // be delayed, next() could return null, and *then* the thread would write to the
-          // log.
-          // TODO: may need to utilize a read-write lock around all the updates (this
-          // may be needed for deleteByQuery anyway)
           return null;
         }
 
         fos.flushBuffer();
       }
 
-      if (TransactionLog.this.latch != null) {
-        TransactionLog.this.latch.await();
-
-        synchronized (TransactionLog.this) {
-          TransactionLog.this.latch = null;
-          if (fis.position() >= fos.size()) {
-            // still EOF... someone else must have tripped the latch.
-            return null;
-          }
-          fos.flushBuffer();
-        }
-
-      }
-
       if (pos == 0) {
         readHeader(fis);
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Mon Nov 28 16:35:14 2011
@@ -30,7 +30,8 @@ import org.apache.solr.request.SolrQuery
     protected long version;
     protected int flags;
 
-    public static int REPLAY = 0x00000001; // update command is from replaying a log.
+    public static int BUFFERING = 0x00000001; // update command is being buffered.
+    public static int REPLAY    = 0x00000002; // update command is from replaying a log.
     public static int IGNORE_AUTOCOMMIT = 0x00000002; // this update should not count toward triggering of autocommits.
 
     public UpdateCommand(String commandName, SolrQueryRequest req) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Mon Nov 28 16:35:14 2011
@@ -23,11 +23,14 @@ import org.apache.solr.util.plugin.Plugi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Future;
+
 /** @lucene.experimental */
 public abstract class UpdateLog implements PluginInfoInitialized {
   public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
 
   public enum SyncLevel { NONE, FLUSH, FSYNC }
+  public enum State { REPLAYING, BUFFERING, APPLYING_BUFFERED, ACTIVE }
 
   public static final int ADD = 0x01;
   public static final int DELETE = 0x02;
@@ -48,4 +51,12 @@ public abstract class UpdateLog implemen
   public abstract VersionInfo getVersionInfo();
   public abstract void finish(SyncLevel syncLevel);
   public abstract boolean recoverFromLog();
+
+  public abstract void bufferUpdates();
+  public abstract Future<FSUpdateLog.RecoveryInfo> applyBufferedUpdates();
+  public abstract State getState();
+
+
+  public static class RecoveryInfo {
+  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java Mon Nov 28 16:35:14 2011
@@ -20,6 +20,9 @@ package org.apache.solr.update;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.lucene.queries.function.DocValues;
 import org.apache.lucene.queries.function.ValueSource;
@@ -39,6 +42,7 @@ public class VersionInfo {
   private final VersionBucket[] buckets;
   private SchemaField versionField;
   private SchemaField idField;
+  final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
   public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
     this.updateHandler = updateHandler;
@@ -55,6 +59,21 @@ public class VersionInfo {
     return versionField;
   }
 
+  public void lockForUpdate() {
+    lock.readLock().lock();
+  }
+
+  public void unlockForUpdate() {
+    lock.readLock().unlock();
+  }
+
+  public void blockUpdates() {
+    lock.writeLock().lock();
+  }
+
+  public void unblockUpdates() {
+    lock.writeLock().unlock();
+  }
 
   // todo: initialize... use current time to start?
   // a clock that increments by 1 for every operation makes it easier to detect missing

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Nov 28 16:35:14 2011
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.commons.lang.NullArgumentException;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.HashPartitioner;
 import org.apache.solr.cloud.HashPartitioner.Range;
 import org.apache.solr.common.SolrException;
@@ -47,14 +48,7 @@ import org.apache.solr.request.SolrQuery
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.schema.SchemaField;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.SolrCmdDistributor;
-import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionBucket;
-import org.apache.solr.update.VersionInfo;
+import org.apache.solr.update.*;
 import org.apache.zookeeper.KeeperException;
 
 // NOT mt-safe... create a new processor for each add thread
@@ -297,47 +291,60 @@ public class DistributedUpdateProcessor 
     }
 
     VersionBucket bucket = vinfo.bucket(hash);
-    synchronized (bucket) {
-      // we obtain the version when synchronized and then do the add so we can ensure that
-      // if version1 < version2 then version1 is actually added before version2.
-
-      // even if we don't store the version field, synchronizing on the bucket
-      // will enable us to know what version happened first, and thus enable
-      // realtime-get to work reliably.
-      // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
-      // there may be other reasons in the future for a version on the commands
-      if (versionsStored) {
-        long bucketVersion = bucket.highest;
-
-        if (isLeader) {
-          long version = vinfo.getNewClock();
-          cmd.setVersion(version);
-          cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
-          bucket.updateHighest(version);
-        } else {
-          // The leader forwarded us this update.
-          cmd.setVersion(versionOnUpdate);
-
-          // if we aren't the leader, then we need to check that updates were not re-ordered
-          if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
-            // we're OK... this update has a version higher than anything we've seen
-            // in this bucket so far, so we know that no reordering has yet occured.
-            bucket.updateHighest(versionOnUpdate);
+
+    vinfo.lockForUpdate();
+    try {
+      synchronized (bucket) {
+        // we obtain the version when synchronized and then do the add so we can ensure that
+        // if version1 < version2 then version1 is actually added before version2.
+
+        // even if we don't store the version field, synchronizing on the bucket
+        // will enable us to know what version happened first, and thus enable
+        // realtime-get to work reliably.
+        // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
+        // there may be other reasons in the future for a version on the commands
+        if (versionsStored) {
+
+          long bucketVersion = bucket.highest;
+
+          if (isLeader) {
+            long version = vinfo.getNewClock();
+            cmd.setVersion(version);
+            cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+            bucket.updateHighest(version);
           } else {
-            // there have been updates higher than the current update.  we need to check
-            // the specific version for this id.
-            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-            if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
-              // This update is a repeat, or was reordered.  We need to drop this update.
+            // The leader forwarded us this update.
+            cmd.setVersion(versionOnUpdate);
+
+            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+              // we're not in an active state, and this update isn't from a replay, so buffer it.
+              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+              ulog.add(cmd);
               return true;
             }
+
+            // if we aren't the leader, then we need to check that updates were not re-ordered
+            if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+              // we're OK... this update has a version higher than anything we've seen
+              // in this bucket so far, so we know that no reordering has yet occured.
+              bucket.updateHighest(versionOnUpdate);
+            } else {
+              // there have been updates higher than the current update.  we need to check
+              // the specific version for this id.
+              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+              if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+                // This update is a repeat, or was reordered.  We need to drop this update.
+                return true;
+              }
+            }
           }
         }
-      }
-
-      doLocalAdd(cmd);
-    }  // end synchronized (bucket)
 
+        doLocalAdd(cmd);
+      }  // end synchronized (bucket)
+    } finally {
+      vinfo.unlockForUpdate();
+    }
     return false;
   }
   
@@ -409,46 +416,68 @@ public class DistributedUpdateProcessor 
     Long versionOnUpdate = versionOnUpdateS == null ? null : Long.parseLong(versionOnUpdateS);
 
     VersionBucket bucket = vinfo.bucket(hash);
-    synchronized (bucket) {
-      if (versionsStored) {
-        long bucketVersion = bucket.highest;
-
-        if (isLeader) {
-          long version = vinfo.getNewClock();
-          cmd.setVersion(-version);
-          bucket.updateHighest(version);
-        } else {
-          // The leader forwarded us this update.
-          if (versionOnUpdate == null) {
-            throw new RuntimeException("we expected to find versionOnUpdate but did not");
-          }
-          
-          cmd.setVersion(versionOnUpdate);
-          // if we aren't the leader, then we need to check that updates were not re-ordered
-          if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
-            // we're OK... this update has a version higher than anything we've seen
-            // in this bucket so far, so we know that no reordering has yet occured.
-            bucket.updateHighest(versionOnUpdate);
+
+    vinfo.lockForUpdate();
+    try {
+
+      synchronized (bucket) {
+        if (versionsStored) {
+          long bucketVersion = bucket.highest;
+
+          if (isLeader) {
+            long version = vinfo.getNewClock();
+            cmd.setVersion(-version);
+            bucket.updateHighest(version);
           } else {
-            // there have been updates higher than the current update.  we need to check
-            // the specific version for this id.
-            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-            if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
-              // This update is a repeat, or was reordered.  We need to drop this update.   
+            // The leader forwarded us this update.
+            if (versionOnUpdate == null) {
+              throw new RuntimeException("we expected to find versionOnUpdate but did not");
+            }
+
+            cmd.setVersion(versionOnUpdate);
+
+            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+              // we're not in an active state, and this update isn't from a replay, so buffer it.
+              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+              ulog.delete(cmd);
               return true;
             }
+
+            // if we aren't the leader, then we need to check that updates were not re-ordered
+            if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+              // we're OK... this update has a version higher than anything we've seen
+              // in this bucket so far, so we know that no reordering has yet occured.
+              bucket.updateHighest(versionOnUpdate);
+            } else {
+              // there have been updates higher than the current update.  we need to check
+              // the specific version for this id.
+              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+              if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+                // This update is a repeat, or was reordered.  We need to drop this update.
+                return true;
+              }
+            }
           }
         }
-      }
 
-      doLocalDelete(cmd);
-      return false;
-    }  // end synchronized (bucket)
+        doLocalDelete(cmd);
+        return false;
+      }  // end synchronized (bucket)
 
+    } finally {
+      vinfo.unlockForUpdate();
+    }
   }
   
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
+
+    if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+      log.info("Ignoring commit while not ACTIVE");
+      return;
+    }
+
+
     // nocommit: make everyone commit?
 //    if (shards != null) {
 //      cmdDistrib.distribCommit(cmd, shards);

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java Mon Nov 28 16:35:14 2011
@@ -18,11 +18,16 @@ package org.apache.solr.search;
 
 
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.DirectUpdateHandler2;
 import org.apache.solr.update.FSUpdateLog;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -116,4 +121,122 @@ public class TestRecovery extends SolrTe
 
   }
 
-}
\ No newline at end of file
+  @Test
+  public void testBuffering() throws Exception {
+    try {
+
+      DirectUpdateHandler2.commitOnClose = false;
+      final Semaphore logReplay = new Semaphore(0);
+      final Semaphore logReplayFinish = new Semaphore(0);
+
+      FSUpdateLog.testing_logReplayHook = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            logReplay.acquire();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      FSUpdateLog.testing_logReplayFinishHook = new Runnable() {
+        @Override
+        public void run() {
+          logReplayFinish.release();
+        }
+      };
+
+
+      clearIndex();
+      assertU(commit());
+
+      SolrQueryRequest req = req();
+      UpdateHandler uhandler = req.getCore().getUpdateHandler();
+      UpdateLog ulog = uhandler.getUpdateLog();
+
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+      ulog.bufferUpdates();
+      assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+      Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
+      assertTrue(rinfoFuture == null);
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+      ulog.bufferUpdates();
+      assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+      assertU(adoc("id","1"));
+      assertU(adoc("id","2"));
+      assertU(adoc("id","3"));
+      assertU(delI("1"));
+      assertU(commit());
+
+      // updates should be buffered, so we should not see any results yet.
+      assertJQ(req("q", "*:*")
+          , "/response/numFound==0"
+      );
+
+      rinfoFuture = ulog.applyBufferedUpdates();
+      assertTrue(rinfoFuture != null);
+
+      assertEquals(UpdateLog.State.APPLYING_BUFFERED, ulog.getState());
+
+      logReplay.release(1000);
+
+      UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+      assertJQ(req("q", "*:*")
+          , "/response/numFound==2"
+      );
+
+
+    } finally {
+      FSUpdateLog.testing_logReplayHook = null;
+      FSUpdateLog.testing_logReplayFinishHook = null;
+    }
+
+  }
+
+
+
+}
+
+/**
+
+ - update processor directly log, or pass through to update handler?
+  - only updates with versions (from leader) get logged?
+    - could also log user updates and assign versions later - but couldn't tell them it if failed!
+  - processor needs to know not to check versions
+    - this suggests processor should just directly log.
+
+ - we shouldn't be syncing when temporarily logging (this is called via run update processor currently, so we're ok)
+
+ Transition from "recovering" to "active" - need to ensure that no updates are lost.
+
+ - grab global write lock
+ - change state to active
+ - release global write lock
+
+ - UpdateHandler.setState(BUFFER_UPDATES)  BUFFER_UPDATES, REPLAY_BUFFERED_UPDATES, ACTIVE
+
+   ulog.bufferUpdates
+   ulog.replayBufferedUpdates
+   ulog.makeActive
+
+   recoverFrom
+   bufferUpdates()
+   replayUpdates()
+   makeActive()
+
+   bufferUpdates()
+   replayBufferedUpdates()  // block or provide a callback function when active?
+     - TODO: what if there are failures while replaying buffered updates?
+       - perhaps just provide functions to return stats about the last recovery?
+           - number of updates buffered
+           - number of updates errored
+           - time?
+
+ -
+
+**/
\ No newline at end of file