You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/01/05 18:31:33 UTC
svn commit: r1227715 - in /lucene/dev/branches/solrcloud/solr/core/src:
java/org/apache/solr/handler/component/RealTimeGetComponent.java
java/org/apache/solr/update/UpdateLog.java
test/org/apache/solr/search/TestRecovery.java
Author: yonik
Date: Thu Jan 5 17:31:32 2012
New Revision: 1227715
URL: http://svn.apache.org/viewvc?rev=1227715&view=rev
Log:
fix tlog ref counting
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1227715&r1=1227714&r2=1227715&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Thu Jan 5 17:31:32 2012
@@ -282,6 +282,8 @@ public class RealTimeGetComponent extend
UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
+
+ recentUpdates.close(); // cache this somehow?
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1227715&r1=1227714&r2=1227715&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Thu Jan 5 17:31:32 2012
@@ -156,7 +156,14 @@ public class UpdateLog implements Plugin
versionInfo = new VersionInfo(uhandler, 256);
}
+
+ public File getLogDir() {
+ return tlogDir;
+ }
+ /* Takes over ownership of the log, keeping it until no longer needed
+ and then decrementing it's reference and dropping it.
+ */
private void addOldLog(TransactionLog oldLog) {
if (oldLog == null) return;
@@ -183,7 +190,9 @@ public class UpdateLog implements Plugin
break;
}
- oldLog.incref(); // prevent this from being deleted
+ // shouldn't need to incref... we are taking ownership, but becoming
+ // an additional user.
+ // oldLog.incref();
logs.addFirst(oldLog);
}
@@ -344,7 +353,8 @@ public class UpdateLog implements Plugin
// if we made it through the commit, write a commit command to the log
// TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
prevTlog.writeCommit(cmd);
- prevTlog.decref();
+ // the old log list will decref when no longer needed
+ // prevTlog.decref();
prevTlog = null;
}
}
@@ -624,6 +634,12 @@ public class UpdateLog implements Plugin
}
}
+
+ public void close() {
+ for (TransactionLog log : logList) {
+ log.decref();
+ }
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1227715&r1=1227714&r2=1227715&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java Thu Jan 5 17:31:32 2012
@@ -29,6 +29,7 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -75,7 +76,7 @@ public class TestRecovery extends SolrTe
assertU(commit());
Deque<Long> versions = new ArrayDeque<Long>();
- versions.addFirst( addAndGetVersion(sdoc("id","1") , null) );
+ versions.addFirst(addAndGetVersion(sdoc("id", "1"), null));
versions.addFirst( addAndGetVersion(sdoc("id", "11"), null));
assertJQ(req("q","*:*"),"/response/numFound==0");
@@ -139,6 +140,7 @@ public class TestRecovery extends SolrTe
assertEquals(UpdateLog.State.ACTIVE, h.getCore().getUpdateHandler().getUpdateLog().getState());
} finally {
+ DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;
UpdateLog.testing_logReplayFinishHook = null;
}
@@ -295,8 +297,10 @@ public class TestRecovery extends SolrTe
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
} finally {
+ DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;
UpdateLog.testing_logReplayFinishHook = null;
+
req().close();
}
@@ -313,6 +317,7 @@ public class TestRecovery extends SolrTe
assertU(adoc("id","2", "val_i","1"));
assertU(commit());
long v1 = getVer(req("q","id:1"));
+ long v1a = getVer(req("q","id:2"));
h.close();
createCore();
@@ -322,6 +327,110 @@ public class TestRecovery extends SolrTe
long v2 = getVer(req("q","id:1"));
assert(v2 > v1);
+
+ assertJQ(req("qt","/get", "getVersions","2")
+ ,"/versions==[" + v2 + "," + v1a + "]"
+ );
+
+ }
+
+
+ private void addDocs(int nDocs, int start, LinkedList<Long> versions) throws Exception {
+ for (int i=0; i<nDocs; i++) {
+ versions.addFirst( addAndGetVersion( sdoc("id",Integer.toString(start + nDocs)) , null) );
+ }
+ }
+
+ @Test
+ public void testRemoveOldLogs() throws Exception {
+ try {
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logReplay.acquire();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ clearIndex();
+ assertU(commit());
+
+ File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir();
+
+ h.close();
+
+ String[] files = UpdateLog.getLogList(logDir);
+ for (String file : files) {
+ new File(logDir, file).delete();
+ }
+
+ assertEquals(0, UpdateLog.getLogList(logDir).length);
+
+ createCore();
+
+ int start = 0;
+ int maxReq = 50;
+
+ LinkedList<Long> versions = new LinkedList<Long>();
+ addDocs(10, start, versions); start+=10;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ addDocs(10, start, versions); start+=10;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ assertEquals(2, UpdateLog.getLogList(logDir).length);
+
+ addDocs(105, start, versions); start+=105;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ // previous two logs should be gone now
+ assertEquals(1, UpdateLog.getLogList(logDir).length);
+
+ addDocs(1, start, versions); start+=1;
+ h.close();
+ createCore(); // trigger recovery, make sure that tlog reference handling is correct
+
+ // test we can get versions while replay is happening
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ logReplay.release(1000);
+ logReplayFinish.acquire(); // wait until replay has finished
+
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ addDocs(105, start, versions); start+=105;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ // previous logs should be gone now
+ assertEquals(1, UpdateLog.getLogList(logDir).length);
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
}