You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/01/05 18:31:33 UTC

svn commit: r1227715 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/handler/component/RealTimeGetComponent.java java/org/apache/solr/update/UpdateLog.java test/org/apache/solr/search/TestRecovery.java

Author: yonik
Date: Thu Jan  5 17:31:32 2012
New Revision: 1227715

URL: http://svn.apache.org/viewvc?rev=1227715&view=rev
Log:
fix tlog ref counting

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1227715&r1=1227714&r2=1227715&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Thu Jan  5 17:31:32 2012
@@ -282,6 +282,8 @@ public class RealTimeGetComponent extend
     UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
     
     rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
+    
+    recentUpdates.close();  // cache this somehow?
   }
 
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1227715&r1=1227714&r2=1227715&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Thu Jan  5 17:31:32 2012
@@ -156,7 +156,14 @@ public class UpdateLog implements Plugin
 
     versionInfo = new VersionInfo(uhandler, 256);
   }
+  
+  public File getLogDir() {
+    return tlogDir;
+  }
 
+  /* Takes over ownership of the log, keeping it until no longer needed
+     and then decrementing it's reference and dropping it.
+   */
   private void addOldLog(TransactionLog oldLog) {
     if (oldLog == null) return;
 
@@ -183,7 +190,9 @@ public class UpdateLog implements Plugin
       break;
     }
 
-    oldLog.incref();  // prevent this from being deleted
+    // shouldn't need to incref... we are taking ownership, but becoming
+    // an additional user.
+    // oldLog.incref();
     logs.addFirst(oldLog);
   }
 
@@ -344,7 +353,8 @@ public class UpdateLog implements Plugin
         // if we made it through the commit, write a commit command to the log
         // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
         prevTlog.writeCommit(cmd);
-        prevTlog.decref();
+        // the old log list will decref when no longer needed
+        // prevTlog.decref();
         prevTlog = null;
       }
     }
@@ -624,6 +634,12 @@ public class UpdateLog implements Plugin
       }
 
     }
+    
+    public void close() {
+      for (TransactionLog log : logList) {
+        log.decref();
+      }
+    }
   }
 
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1227715&r1=1227714&r2=1227715&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java Thu Jan  5 17:31:32 2012
@@ -29,6 +29,7 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
@@ -75,7 +76,7 @@ public class TestRecovery extends SolrTe
       assertU(commit());
 
       Deque<Long> versions = new ArrayDeque<Long>();
-      versions.addFirst( addAndGetVersion(sdoc("id","1") , null) );
+      versions.addFirst(addAndGetVersion(sdoc("id", "1"), null));
       versions.addFirst( addAndGetVersion(sdoc("id", "11"), null));
 
       assertJQ(req("q","*:*"),"/response/numFound==0");
@@ -139,6 +140,7 @@ public class TestRecovery extends SolrTe
       assertEquals(UpdateLog.State.ACTIVE, h.getCore().getUpdateHandler().getUpdateLog().getState());
 
     } finally {
+      DirectUpdateHandler2.commitOnClose = true;
       UpdateLog.testing_logReplayHook = null;
       UpdateLog.testing_logReplayFinishHook = null;
     }
@@ -295,8 +297,10 @@ public class TestRecovery extends SolrTe
 
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
     } finally {
+      DirectUpdateHandler2.commitOnClose = true;
       UpdateLog.testing_logReplayHook = null;
       UpdateLog.testing_logReplayFinishHook = null;
+
       req().close();
     }
 
@@ -313,6 +317,7 @@ public class TestRecovery extends SolrTe
     assertU(adoc("id","2", "val_i","1"));
     assertU(commit());
     long v1 = getVer(req("q","id:1"));
+    long v1a = getVer(req("q","id:2"));
 
     h.close();
     createCore();
@@ -322,6 +327,110 @@ public class TestRecovery extends SolrTe
     long v2 = getVer(req("q","id:1"));
 
     assert(v2 > v1);
+
+    assertJQ(req("qt","/get", "getVersions","2")
+        ,"/versions==[" + v2 + "," + v1a + "]"
+    );
+
+  }
+  
+  
+  private void addDocs(int nDocs, int start, LinkedList<Long> versions) throws Exception {
+    for (int i=0; i<nDocs; i++) {
+      versions.addFirst( addAndGetVersion( sdoc("id",Integer.toString(start + nDocs)) , null) );
+    }
+  }
+
+  @Test
+  public void testRemoveOldLogs() throws Exception {
+    try {
+      DirectUpdateHandler2.commitOnClose = false;
+      final Semaphore logReplay = new Semaphore(0);
+      final Semaphore logReplayFinish = new Semaphore(0);
+
+      UpdateLog.testing_logReplayHook = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            logReplay.acquire();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      UpdateLog.testing_logReplayFinishHook = new Runnable() {
+        @Override
+        public void run() {
+          logReplayFinish.release();
+        }
+      };
+
+
+      clearIndex();
+      assertU(commit());
+
+      File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir();
+
+      h.close();
+
+      String[] files = UpdateLog.getLogList(logDir);
+      for (String file : files) {
+        new File(logDir, file).delete();
+      }
+
+      assertEquals(0, UpdateLog.getLogList(logDir).length);
+
+      createCore();
+
+      int start = 0;
+      int maxReq = 50;
+
+      LinkedList<Long> versions = new LinkedList<Long>();
+      addDocs(10, start, versions); start+=10;
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      assertU(commit());
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+      addDocs(10, start, versions);  start+=10;
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      assertU(commit());
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+      assertEquals(2, UpdateLog.getLogList(logDir).length);
+
+      addDocs(105, start, versions);  start+=105;
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      assertU(commit());
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+      // previous two logs should be gone now
+      assertEquals(1, UpdateLog.getLogList(logDir).length);
+
+      addDocs(1, start, versions);  start+=1;
+      h.close();
+      createCore();      // trigger recovery, make sure that tlog reference handling is correct
+
+      // test we can get versions while replay is happening
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+      logReplay.release(1000);
+      logReplayFinish.acquire();  // wait until replay has finished
+
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+      addDocs(105, start, versions);  start+=105;
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+      assertU(commit());
+      assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+      // previous logs should be gone now
+      assertEquals(1, UpdateLog.getLogList(logDir).length);
+    } finally {
+      DirectUpdateHandler2.commitOnClose = true;
+      UpdateLog.testing_logReplayHook = null;
+      UpdateLog.testing_logReplayFinishHook = null;
+    }
   }