You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/01/13 18:21:27 UTC

svn commit: r1231196 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/update/TransactionLog.java java/org/apache/solr/update/UpdateLog.java test/org/apache/solr/search/TestRecovery.java

Author: yonik
Date: Fri Jan 13 17:21:27 2012
New Revision: 1231196

URL: http://svn.apache.org/viewvc?rev=1231196&view=rev
Log:
implement dropping of buffered updates

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1231196&r1=1231195&r2=1231196&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Fri Jan 13 17:21:27 2012
@@ -72,6 +72,9 @@ public class TransactionLog {
   List<String> globalStringList = new ArrayList<String>();
   final boolean debug = log.isDebugEnabled();
 
+  long snapshot_size;
+  int snapshot_numRecords;
+  
   // write a BytesRef as a byte array
   JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
     @Override
@@ -192,7 +195,30 @@ public class TransactionLog {
     }
     return true;
   }
+
+  /** takes a snapshot of the current position and number of records
+   * for later possible rollback, and returns the position */
+  public long snapshot() {
+    synchronized (this) {
+      snapshot_size = fos.size();
+      snapshot_numRecords = numRecords;
+      return snapshot_size;
+    }    
+  }
   
+  // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
+  // This should only be used to roll back buffered updates, not actually applied updates.
+  public void rollback(long pos) throws IOException {
+    synchronized (this) {
+      assert snapshot_size == pos;
+      fos.flush();
+      raf.setLength(pos);
+      fos.setWritten(pos);
+      assert fos.size() == pos;
+      numRecords = snapshot_numRecords;
+    }
+  }
+
 
   public long writeData(Object o) {
     LogCodec codec = new LogCodec();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1231196&r1=1231195&r2=1231196&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri Jan 13 17:21:27 2012
@@ -153,7 +153,7 @@ public class UpdateLog implements Plugin
         oldLog = new TransactionLog( f, null, true );
         addOldLog(oldLog);
       } catch (Exception e) {
-        SolrException.log(log, "Failure to open existing log file (non fatal) "+f, e);
+        SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
         f.delete();
       }
     }
@@ -717,7 +717,7 @@ public class UpdateLog implements Plugin
 
       // since we blocked updates, this synchronization shouldn't strictly be necessary.
       synchronized (this) {
-        recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.position();
+        recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
       }
 
       state = State.BUFFERING;
@@ -726,6 +726,35 @@ public class UpdateLog implements Plugin
     }
   }
 
+  /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */
+  public boolean dropBufferedUpdates() {
+    versionInfo.blockUpdates();
+    try {
+      if (state != State.BUFFERING) return false;
+
+      if (log.isInfoEnabled()) {
+        log.info("Dropping buffered updates " + this);
+      }
+
+      // since we blocked updates, this synchronization shouldn't strictly be necessary.
+      synchronized (this) {
+        if (tlog != null) {
+          tlog.rollback(recoveryInfo.positionOfStart);
+        }
+      }
+
+      state = State.ACTIVE;
+    } catch (IOException e) {
+      SolrException.log(log,"Error attempting to roll back log", e);
+      return false;
+    }
+    finally {
+      versionInfo.unblockUpdates();
+    }
+    return true;
+  }
+
+
   /** Returns the Future to wait on, or null if no replay was needed */
   public Future<RecoveryInfo> applyBufferedUpdates() {
     // recovery trips this assert under some race - even when

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1231196&r1=1231195&r2=1231196&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java Fri Jan 13 17:21:27 2012
@@ -309,6 +309,114 @@ public class TestRecovery extends SolrTe
   }
 
 
+  @Test
+  public void testDropBuffered() throws Exception {
+
+    DirectUpdateHandler2.commitOnClose = false;
+    final Semaphore logReplay = new Semaphore(0);
+    final Semaphore logReplayFinish = new Semaphore(0);
+
+    UpdateLog.testing_logReplayHook = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    UpdateLog.testing_logReplayFinishHook = new Runnable() {
+      @Override
+      public void run() {
+        logReplayFinish.release();
+      }
+    };
+
+
+    SolrQueryRequest req = req();
+    UpdateHandler uhandler = req.getCore().getUpdateHandler();
+    UpdateLog ulog = uhandler.getUpdateLog();
+
+    try {
+      clearIndex();
+      assertU(commit());
+
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+      ulog.bufferUpdates();
+      assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+      Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
+      assertTrue(rinfoFuture == null);
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+      ulog.bufferUpdates();
+      assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+      // simulate updates from a leader
+      updateJ(jsonAdd(sdoc("id","1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+      assertTrue(ulog.dropBufferedUpdates());
+      ulog.bufferUpdates();
+      updateJ(jsonAdd(sdoc("id", "4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id", "5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+      logReplay.release(1000);
+      rinfoFuture = ulog.applyBufferedUpdates();
+      UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
+      assertEquals(2, rinfo.adds);
+
+      assertJQ(req("qt","/get", "getVersions","2")
+          ,"=={'versions':[105,104]}"
+      );
+
+      // this time add some docs first before buffering starts (so tlog won't be at pos 0)
+      updateJ(jsonAdd(sdoc("id","100", "_version_","200")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","101", "_version_","201")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+      ulog.bufferUpdates();
+      updateJ(jsonAdd(sdoc("id","103", "_version_","203")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","104", "_version_","204")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+      assertTrue(ulog.dropBufferedUpdates());
+      ulog.bufferUpdates();
+      updateJ(jsonAdd(sdoc("id","105", "_version_","205")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","106", "_version_","206")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+      rinfoFuture = ulog.applyBufferedUpdates();
+      rinfo = rinfoFuture.get();
+      assertEquals(2, rinfo.adds);
+
+      assertJQ(req("q", "*:*", "sort","_version_ asc", "fl","id,_version_")
+          , "/response/docs==["
+          + "{'id':'4','_version_':104}"
+          + ",{'id':'5','_version_':105}"
+          + ",{'id':'100','_version_':200}"
+          + ",{'id':'101','_version_':201}"
+          + ",{'id':'105','_version_':205}"
+          + ",{'id':'106','_version_':206}"
+          +"]"
+      );
+
+      assertJQ(req("qt","/get", "getVersions","6")
+          ,"=={'versions':[206,205,201,200,105,104]}"
+      );
+
+
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+    } finally {
+      DirectUpdateHandler2.commitOnClose = true;
+      UpdateLog.testing_logReplayHook = null;
+      UpdateLog.testing_logReplayFinishHook = null;
+
+      req().close();
+    }
+
+  }
+
+
   // make sure that on a restart, versions don't start too low
   @Test
   public void testVersionsOnRestart() throws Exception {