You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/01/13 18:21:27 UTC
svn commit: r1231196 - in /lucene/dev/branches/solrcloud/solr/core/src:
java/org/apache/solr/update/TransactionLog.java
java/org/apache/solr/update/UpdateLog.java
test/org/apache/solr/search/TestRecovery.java
Author: yonik
Date: Fri Jan 13 17:21:27 2012
New Revision: 1231196
URL: http://svn.apache.org/viewvc?rev=1231196&view=rev
Log:
implement dropping of buffered updates
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1231196&r1=1231195&r2=1231196&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Fri Jan 13 17:21:27 2012
@@ -72,6 +72,9 @@ public class TransactionLog {
List<String> globalStringList = new ArrayList<String>();
final boolean debug = log.isDebugEnabled();
+ long snapshot_size;
+ int snapshot_numRecords;
+
// write a BytesRef as a byte array
JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@Override
@@ -192,7 +195,30 @@ public class TransactionLog {
}
return true;
}
+
+ /** takes a snapshot of the current position and number of records
+ * for later possible rollback, and returns the position */
+ public long snapshot() {
+ synchronized (this) {
+ snapshot_size = fos.size();
+ snapshot_numRecords = numRecords;
+ return snapshot_size;
+ }
+ }
+ // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
+ // This should only be used to roll back buffered updates, not actually applied updates.
+ public void rollback(long pos) throws IOException {
+ synchronized (this) {
+ assert snapshot_size == pos;
+ fos.flush();
+ raf.setLength(pos);
+ fos.setWritten(pos);
+ assert fos.size() == pos;
+ numRecords = snapshot_numRecords;
+ }
+ }
+
public long writeData(Object o) {
LogCodec codec = new LogCodec();
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1231196&r1=1231195&r2=1231196&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri Jan 13 17:21:27 2012
@@ -153,7 +153,7 @@ public class UpdateLog implements Plugin
oldLog = new TransactionLog( f, null, true );
addOldLog(oldLog);
} catch (Exception e) {
- SolrException.log(log, "Failure to open existing log file (non fatal) "+f, e);
+ SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
f.delete();
}
}
@@ -717,7 +717,7 @@ public class UpdateLog implements Plugin
// since we blocked updates, this synchronization shouldn't strictly be necessary.
synchronized (this) {
- recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.position();
+ recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
}
state = State.BUFFERING;
@@ -726,6 +726,35 @@ public class UpdateLog implements Plugin
}
}
+ /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */
+ public boolean dropBufferedUpdates() {
+ versionInfo.blockUpdates();
+ try {
+ if (state != State.BUFFERING) return false;
+
+ if (log.isInfoEnabled()) {
+ log.info("Dropping buffered updates " + this);
+ }
+
+ // since we blocked updates, this synchronization shouldn't strictly be necessary.
+ synchronized (this) {
+ if (tlog != null) {
+ tlog.rollback(recoveryInfo.positionOfStart);
+ }
+ }
+
+ state = State.ACTIVE;
+ } catch (IOException e) {
+ SolrException.log(log,"Error attempting to roll back log", e);
+ return false;
+ }
+ finally {
+ versionInfo.unblockUpdates();
+ }
+ return true;
+ }
+
+
/** Returns the Future to wait on, or null if no replay was needed */
public Future<RecoveryInfo> applyBufferedUpdates() {
// recovery trips this assert under some race - even when
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1231196&r1=1231195&r2=1231196&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java Fri Jan 13 17:21:27 2012
@@ -309,6 +309,114 @@ public class TestRecovery extends SolrTe
}
+ @Test
+ public void testDropBuffered() throws Exception {
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ SolrQueryRequest req = req();
+ UpdateHandler uhandler = req.getCore().getUpdateHandler();
+ UpdateLog ulog = uhandler.getUpdateLog();
+
+ try {
+ clearIndex();
+ assertU(commit());
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+ Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture == null);
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+ // simulate updates from a leader
+ updateJ(jsonAdd(sdoc("id","1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ assertTrue(ulog.dropBufferedUpdates());
+ ulog.bufferUpdates();
+ updateJ(jsonAdd(sdoc("id", "4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id", "5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ logReplay.release(1000);
+ rinfoFuture = ulog.applyBufferedUpdates();
+ UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
+ assertEquals(2, rinfo.adds);
+
+ assertJQ(req("qt","/get", "getVersions","2")
+ ,"=={'versions':[105,104]}"
+ );
+
+ // this time add some docs first before buffering starts (so tlog won't be at pos 0)
+ updateJ(jsonAdd(sdoc("id","100", "_version_","200")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","101", "_version_","201")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ ulog.bufferUpdates();
+ updateJ(jsonAdd(sdoc("id","103", "_version_","203")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","104", "_version_","204")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ assertTrue(ulog.dropBufferedUpdates());
+ ulog.bufferUpdates();
+ updateJ(jsonAdd(sdoc("id","105", "_version_","205")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","106", "_version_","206")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ rinfoFuture = ulog.applyBufferedUpdates();
+ rinfo = rinfoFuture.get();
+ assertEquals(2, rinfo.adds);
+
+ assertJQ(req("q", "*:*", "sort","_version_ asc", "fl","id,_version_")
+ , "/response/docs==["
+ + "{'id':'4','_version_':104}"
+ + ",{'id':'5','_version_':105}"
+ + ",{'id':'100','_version_':200}"
+ + ",{'id':'101','_version_':201}"
+ + ",{'id':'105','_version_':205}"
+ + ",{'id':'106','_version_':206}"
+ +"]"
+ );
+
+ assertJQ(req("qt","/get", "getVersions","6")
+ ,"=={'versions':[206,205,201,200,105,104]}"
+ );
+
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+
+ req().close();
+ }
+
+ }
+
+
// make sure that on a restart, versions don't start too low
@Test
public void testVersionsOnRestart() throws Exception {