You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/28 17:35:15 UTC
svn commit: r1207402 - in /lucene/dev/branches/solrcloud/solr/core/src:
java/org/apache/solr/update/ java/org/apache/solr/update/processor/
test/org/apache/solr/search/
Author: yonik
Date: Mon Nov 28 16:35:14 2011
New Revision: 1207402
URL: http://svn.apache.org/viewvc?rev=1207402&view=rev
Log:
SOLR-2808: log updates while recovering, then replay
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Mon Nov 28 16:35:14 2011
@@ -31,10 +31,7 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/** @lucene.experimental */
class NullUpdateLog extends UpdateLog {
@@ -102,6 +99,20 @@ class NullUpdateLog extends UpdateLog {
return false;
}
+ @Override
+ public void bufferUpdates() {
+ }
+
+ @Override
+ public Future<FSUpdateLog.RecoveryInfo> applyBufferedUpdates() {
+ return null;
+ }
+
+ @Override
+ public State getState() {
+ return State.ACTIVE;
+ }
+
}
/** @lucene.experimental */
@@ -110,6 +121,7 @@ public class FSUpdateLog extends UpdateL
public static String TLOG_NAME="tlog";
long id = -1;
+ private State state = State.ACTIVE;
private TransactionLog tlog;
private TransactionLog prevTlog;
@@ -447,10 +459,10 @@ public class FSUpdateLog extends UpdateL
@Override
public boolean recoverFromLog() {
if (tlogFiles.length == 0) return false;
- TransactionLog oldTlog = null; // todo - change name
+ TransactionLog oldTlog = null;
try {
oldTlog = new TransactionLog( new File(tlogDir, tlogFiles[tlogFiles.length-1]), null, true );
- recoveryExecutor.execute(new LogReplayer(oldTlog));
+ recoveryExecutor.execute(new LogReplayer(oldTlog, false));
return true;
} catch (Exception ex) {
@@ -484,131 +496,218 @@ public class FSUpdateLog extends UpdateL
}
}
- public static Runnable testing_logReplayHook; // aquires a permit before each log read
- public static Runnable testing_logReplayFinishHook; // releases a permit when log replay has finished
+ @Override
+ public void bufferUpdates() {
+ assert state == State.ACTIVE;
+ // block all updates to eliminate race conditions
+ // reading state and acting on it in the update processor
+ versionInfo.blockUpdates();
+ try {
+ state = State.BUFFERING;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+ recoveryInfo = new RecoveryInfo();
+ }
+
+ /** Returns the Future to wait on, or null if no replay was needed */
+ @Override
+ public Future<RecoveryInfo> applyBufferedUpdates() {
+ assert state == State.BUFFERING;
+
+ // block all updates to eliminate race conditions
+ // reading state and acting on it in the update processor
+ versionInfo.blockUpdates();
+ try {
+ if (state != State.BUFFERING) return null;
+ state = State.APPLYING_BUFFERED;
+
+ // handle case when no log was even created because no updates
+ // were received.
+ if (tlog == null) {
+ state = State.ACTIVE;
+ return null;
+ }
+
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+
+ tlog.incref();
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+ LogReplayer replayer = new LogReplayer(tlog, true);
+ return cs.submit(replayer, recoveryInfo);
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+
+ public static Runnable testing_logReplayHook; // called before each log read
+ public static Runnable testing_logReplayFinishHook; // called when log replay has finished
+
+
+
+ private RecoveryInfo recoveryInfo;
// TODO: do we let the log replayer run across core reloads?
class LogReplayer implements Runnable {
- TransactionLog tlog;
+ TransactionLog translog;
TransactionLog.LogReader tlogReader;
+ boolean activeLog;
+ boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
- public LogReplayer(TransactionLog tlog) {
- this.tlog = tlog;
+ public LogReplayer(TransactionLog translog, boolean activeLog) {
+ this.translog = translog;
+ this.activeLog = activeLog;
}
@Override
public void run() {
- uhandler.core.log.warn("Starting log replay " + tlogReader);
-
- tlogReader = tlog.getReader();
-
- SolrParams params = new ModifiableSolrParams();
- long commitVersion = 0;
+ try {
- for(;;) {
- Object o = null;
+ uhandler.core.log.warn("Starting log replay " + tlogReader);
- try {
- if (testing_logReplayHook != null) testing_logReplayHook.run();
- o = tlogReader.next(null);
- } catch (InterruptedException e) {
- SolrException.log(log,e);
- } catch (IOException e) {
- SolrException.log(log,e);
- }
+ tlogReader = translog.getReader();
- if (o == null) break;
+ SolrParams params = new ModifiableSolrParams();
+ long commitVersion = 0;
- // create a new request each time since the update handler and core could change
- SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
-
- // TODO: race? This core could close on us if it was reloaded
+ for(;;) {
+ Object o = null;
+
+ try {
+ if (testing_logReplayHook != null) testing_logReplayHook.run();
+ o = tlogReader.next();
+ if (o == null && activeLog) {
+ if (!finishing) {
+ // block to prevent new adds, but don't immediately unlock since
+ // we could be starved from ever completing recovery. Only unlock
+ // after we've finished this recovery.
+ // NOTE: our own updates won't be blocked since the thread holding a write lock can
+ // lock a read lock.
+ versionInfo.blockUpdates();
+ finishing = true;
+ o = tlogReader.next();
+ } else {
+ // we had previously blocked updates, so this "null" from the log is final.
+
+ // Wait until our final commit to change the state and unlock.
+ // This is only so no new updates are written to the current log file, and is
+ // only an issue if we crash before the commit (and we are paying attention
+ // to incomplete log files).
+ //
+ // versionInfo.lock.writeLock().unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ SolrException.log(log,e);
+ } catch (IOException e) {
+ SolrException.log(log,e);
+ }
- try {
+ if (o == null) break;
- // should currently be a List<Oper,Ver,Doc/Id>
- List entry = (List)o;
+ // create a new request each time since the update handler and core could change
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
- int oper = (Integer)entry.get(0);
- long version = (Long) entry.get(1);
+ // TODO: race? This core could close on us if it was reloaded
- switch (oper) {
- case UpdateLog.ADD:
- {
- // byte[] idBytes = (byte[]) entry.get(2);
- SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
- AddUpdateCommand cmd = new AddUpdateCommand(req);
- // cmd.setIndexedId(new BytesRef(idBytes));
- cmd.solrDoc = sdoc;
- cmd.setVersion(version);
- cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- uhandler.addDoc(cmd);
- break;
+ try {
- // TODO: updates need to go through versioning code for handing reorders? (for replicas at least,
- // depending on how they recover.
- }
- case UpdateLog.DELETE:
- {
- byte[] idBytes = (byte[]) entry.get(2);
- DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
- cmd.setIndexedId(new BytesRef(idBytes));
- cmd.setVersion(version);
- cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- uhandler.delete(cmd);
- break;
- }
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ {
+ // byte[] idBytes = (byte[]) entry.get(2);
+ SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ uhandler.addDoc(cmd);
+ break;
+
+ // TODO: updates need to go through versioning code for handing reorders? (for replicas at least,
+ // depending on how they recover.
+ }
+ case UpdateLog.DELETE:
+ {
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ uhandler.delete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY:
+ {
+ String query = (String)entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ uhandler.delete(cmd);
+ break;
+ }
+
+ case UpdateLog.COMMIT:
+ {
+ // currently we don't log commits
+ commitVersion = version;
+ break;
+ }
- case UpdateLog.DELETE_BY_QUERY:
- {
- String query = (String)entry.get(2);
- DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
- cmd.query = query;
- cmd.setVersion(version);
- cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- uhandler.delete(cmd);
- break;
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
+ } catch (IOException ex) {
- case UpdateLog.COMMIT:
- {
- // currently we don't log commits
- commitVersion = version;
- break;
- }
+ } catch (ClassCastException cl) {
+ log.warn("Corrupt log", cl);
+ // would be caused by a corrupt transaction log
+ } catch (Exception ex) {
+ log.warn("Exception replaying log", ex);
- default:
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ // something wrong with the request?
}
+ }
+
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+ CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+ cmd.setVersion(commitVersion);
+ cmd.softCommit = false;
+ cmd.waitSearcher = true;
+ cmd.setFlags(UpdateCommand.REPLAY);
+ try {
+ uhandler.commit(cmd);
} catch (IOException ex) {
+ log.error("Replay exception: final commit.", ex);
+ }
- } catch (ClassCastException cl) {
- log.warn("Corrupt log", cl);
- // would be caused by a corrupt transaction log
- } catch (Exception ex) {
- log.warn("Exception replaying log", ex);
+ tlogReader.close();
+ translog.decref();
- // something wrong with the request?
+ } finally {
+ // change the state while updates are still blocked to prevent races
+ state = State.ACTIVE;
+ if (finishing) {
+ versionInfo.unblockUpdates();
}
}
- SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
- CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
- cmd.setVersion(commitVersion);
- cmd.softCommit = false;
- cmd.waitSearcher = true;
- cmd.setFlags(UpdateCommand.REPLAY);
- try {
- uhandler.commit(cmd);
- } catch (IOException ex) {
- log.error("Replay exception: final commit.", ex);
- }
-
- tlogReader.close();
- tlog.decref();
-
log.warn("Ending log replay " + tlogReader);
if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Mon Nov 28 16:35:14 2011
@@ -67,8 +67,6 @@ public class TransactionLog {
Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
List<String> globalStringList = new ArrayList<String>();
- CountDownLatch latch; // if set, used to signal that we just added another log record
-
// write a BytesRef as a byte array
JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@Override
@@ -238,7 +236,6 @@ public class TransactionLog {
// fos.flushBuffer(); // flush later
- endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -262,7 +259,6 @@ public class TransactionLog {
BytesRef br = cmd.getIndexedId();
codec.writeByteArray(br.bytes, br.offset, br.length);
// fos.flushBuffer(); // flush later
- endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -285,7 +281,6 @@ public class TransactionLog {
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
// fos.flushBuffer(); // flush later
- endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -309,7 +304,6 @@ public class TransactionLog {
codec.writeLong(cmd.getVersion());
codec.writeStr(END_MESSAGE); // ensure these bytes are the last in the file
// fos.flushBuffer(); // flush later
- endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -317,12 +311,6 @@ public class TransactionLog {
}
}
- private void endWrite() {
- if (latch != null) {
- latch.countDown();
- }
- }
-
/* This method is thread safe */
public Object lookup(long pos) {
@@ -412,55 +400,22 @@ public class TransactionLog {
incref();
}
- /** Returns the next object from the log. If this log is being concurrently written
- * to, and you wish to block until a new record is available, then pass in a latch
- * to use.
+ /** Returns the next object from the log, or null if none available.
*
- * @param latch The latch to use to block, waiting for the next log record
* @return The log record, or null if EOF
* @throws IOException
*/
- public Object next(CountDownLatch latch) throws IOException, InterruptedException {
+ public Object next() throws IOException, InterruptedException {
long pos = fis.position();
synchronized (TransactionLog.this) {
if (pos >= fos.size()) {
- // we caught up.
- TransactionLog.this.latch = latch;
-
-
- // TODO: how to prevent a race between catching up and
- // switching to "active"... need higher level coordination?
- // Probably since updating this log is normally done after
- // updating the index. Perhaps more than one phase...
- // 1) next() returns null
- // 2) replayer sets flag to "almost active" and updates start going to the index
- // 3) replayer continues calling next() to get any records that were added inbetween.
- // This *still* doesn't work since a thread that skipped adding to the index could
- // be delayed, next() could return null, and *then* the thread would write to the
- // log.
- // TODO: may need to utilize a read-write lock around all the updates (this
- // may be needed for deleteByQuery anyway)
return null;
}
fos.flushBuffer();
}
- if (TransactionLog.this.latch != null) {
- TransactionLog.this.latch.await();
-
- synchronized (TransactionLog.this) {
- TransactionLog.this.latch = null;
- if (fis.position() >= fos.size()) {
- // still EOF... someone else must have tripped the latch.
- return null;
- }
- fos.flushBuffer();
- }
-
- }
-
if (pos == 0) {
readHeader(fis);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Mon Nov 28 16:35:14 2011
@@ -30,7 +30,8 @@ import org.apache.solr.request.SolrQuery
protected long version;
protected int flags;
- public static int REPLAY = 0x00000001; // update command is from replaying a log.
+ public static int BUFFERING = 0x00000001; // update command is being buffered.
+ public static int REPLAY = 0x00000002; // update command is from replaying a log.
public static int IGNORE_AUTOCOMMIT = 0x00000002; // this update should not count toward triggering of autocommits.
public UpdateCommand(String commandName, SolrQueryRequest req) {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Mon Nov 28 16:35:14 2011
@@ -23,11 +23,14 @@ import org.apache.solr.util.plugin.Plugi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Future;
+
/** @lucene.experimental */
public abstract class UpdateLog implements PluginInfoInitialized {
public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
public enum SyncLevel { NONE, FLUSH, FSYNC }
+ public enum State { REPLAYING, BUFFERING, APPLYING_BUFFERED, ACTIVE }
public static final int ADD = 0x01;
public static final int DELETE = 0x02;
@@ -48,4 +51,12 @@ public abstract class UpdateLog implemen
public abstract VersionInfo getVersionInfo();
public abstract void finish(SyncLevel syncLevel);
public abstract boolean recoverFromLog();
+
+ public abstract void bufferUpdates();
+ public abstract Future<FSUpdateLog.RecoveryInfo> applyBufferedUpdates();
+ public abstract State getState();
+
+
+ public static class RecoveryInfo {
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java Mon Nov 28 16:35:14 2011
@@ -20,6 +20,9 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.queries.function.DocValues;
import org.apache.lucene.queries.function.ValueSource;
@@ -39,6 +42,7 @@ public class VersionInfo {
private final VersionBucket[] buckets;
private SchemaField versionField;
private SchemaField idField;
+ final ReadWriteLock lock = new ReentrantReadWriteLock(true);
public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
this.updateHandler = updateHandler;
@@ -55,6 +59,21 @@ public class VersionInfo {
return versionField;
}
+ public void lockForUpdate() {
+ lock.readLock().lock();
+ }
+
+ public void unlockForUpdate() {
+ lock.readLock().unlock();
+ }
+
+ public void blockUpdates() {
+ lock.writeLock().lock();
+ }
+
+ public void unblockUpdates() {
+ lock.writeLock().unlock();
+ }
// todo: initialize... use current time to start?
// a clock that increments by 1 for every operation makes it easier to detect missing
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Nov 28 16:35:14 2011
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.lang.NullArgumentException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.HashPartitioner;
import org.apache.solr.cloud.HashPartitioner.Range;
import org.apache.solr.common.SolrException;
@@ -47,14 +48,7 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.SolrCmdDistributor;
-import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionBucket;
-import org.apache.solr.update.VersionInfo;
+import org.apache.solr.update.*;
import org.apache.zookeeper.KeeperException;
// NOT mt-safe... create a new processor for each add thread
@@ -297,47 +291,60 @@ public class DistributedUpdateProcessor
}
VersionBucket bucket = vinfo.bucket(hash);
- synchronized (bucket) {
- // we obtain the version when synchronized and then do the add so we can ensure that
- // if version1 < version2 then version1 is actually added before version2.
-
- // even if we don't store the version field, synchronizing on the bucket
- // will enable us to know what version happened first, and thus enable
- // realtime-get to work reliably.
- // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
- // there may be other reasons in the future for a version on the commands
- if (versionsStored) {
- long bucketVersion = bucket.highest;
-
- if (isLeader) {
- long version = vinfo.getNewClock();
- cmd.setVersion(version);
- cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
- bucket.updateHighest(version);
- } else {
- // The leader forwarded us this update.
- cmd.setVersion(versionOnUpdate);
-
- // if we aren't the leader, then we need to check that updates were not re-ordered
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- // we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occured.
- bucket.updateHighest(versionOnUpdate);
+
+ vinfo.lockForUpdate();
+ try {
+ synchronized (bucket) {
+ // we obtain the version when synchronized and then do the add so we can ensure that
+ // if version1 < version2 then version1 is actually added before version2.
+
+ // even if we don't store the version field, synchronizing on the bucket
+ // will enable us to know what version happened first, and thus enable
+ // realtime-get to work reliably.
+ // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
+ // there may be other reasons in the future for a version on the commands
+ if (versionsStored) {
+
+ long bucketVersion = bucket.highest;
+
+ if (isLeader) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(version);
+ cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+ bucket.updateHighest(version);
} else {
- // there have been updates higher than the current update. we need to check
- // the specific version for this id.
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
+ // The leader forwarded us this update.
+ cmd.setVersion(versionOnUpdate);
+
+ if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.add(cmd);
return true;
}
+
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occured.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ return true;
+ }
+ }
}
}
- }
-
- doLocalAdd(cmd);
- } // end synchronized (bucket)
+ doLocalAdd(cmd);
+ } // end synchronized (bucket)
+ } finally {
+ vinfo.unlockForUpdate();
+ }
return false;
}
@@ -409,46 +416,68 @@ public class DistributedUpdateProcessor
Long versionOnUpdate = versionOnUpdateS == null ? null : Long.parseLong(versionOnUpdateS);
VersionBucket bucket = vinfo.bucket(hash);
- synchronized (bucket) {
- if (versionsStored) {
- long bucketVersion = bucket.highest;
-
- if (isLeader) {
- long version = vinfo.getNewClock();
- cmd.setVersion(-version);
- bucket.updateHighest(version);
- } else {
- // The leader forwarded us this update.
- if (versionOnUpdate == null) {
- throw new RuntimeException("we expected to find versionOnUpdate but did not");
- }
-
- cmd.setVersion(versionOnUpdate);
- // if we aren't the leader, then we need to check that updates were not re-ordered
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- // we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occured.
- bucket.updateHighest(versionOnUpdate);
+
+ vinfo.lockForUpdate();
+ try {
+
+ synchronized (bucket) {
+ if (versionsStored) {
+ long bucketVersion = bucket.highest;
+
+ if (isLeader) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(-version);
+ bucket.updateHighest(version);
} else {
- // there have been updates higher than the current update. we need to check
- // the specific version for this id.
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
+ // The leader forwarded us this update.
+ if (versionOnUpdate == null) {
+ throw new RuntimeException("we expected to find versionOnUpdate but did not");
+ }
+
+ cmd.setVersion(versionOnUpdate);
+
+ if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.delete(cmd);
return true;
}
+
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occured.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ return true;
+ }
+ }
}
}
- }
- doLocalDelete(cmd);
- return false;
- } // end synchronized (bucket)
+ doLocalDelete(cmd);
+ return false;
+ } // end synchronized (bucket)
+ } finally {
+ vinfo.unlockForUpdate();
+ }
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
+
+ if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ log.info("Ignoring commit while not ACTIVE");
+ return;
+ }
+
+
// nocommit: make everyone commit?
// if (shards != null) {
// cmdDistrib.distribCommit(cmd, shards);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1207402&r1=1207401&r2=1207402&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRecovery.java Mon Nov 28 16:35:14 2011
@@ -18,11 +18,16 @@ package org.apache.solr.search;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.FSUpdateLog;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -116,4 +121,122 @@ public class TestRecovery extends SolrTe
}
-}
\ No newline at end of file
+ @Test
+ public void testBuffering() throws Exception {
+ try {
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ FSUpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logReplay.acquire();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ FSUpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ clearIndex();
+ assertU(commit());
+
+ SolrQueryRequest req = req();
+ UpdateHandler uhandler = req.getCore().getUpdateHandler();
+ UpdateLog ulog = uhandler.getUpdateLog();
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+ Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture == null);
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+ assertU(adoc("id","1"));
+ assertU(adoc("id","2"));
+ assertU(adoc("id","3"));
+ assertU(delI("1"));
+ assertU(commit());
+
+ // updates should be buffered, so we should not see any results yet.
+ assertJQ(req("q", "*:*")
+ , "/response/numFound==0"
+ );
+
+ rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture != null);
+
+ assertEquals(UpdateLog.State.APPLYING_BUFFERED, ulog.getState());
+
+ logReplay.release(1000);
+
+ UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+ assertJQ(req("q", "*:*")
+ , "/response/numFound==2"
+ );
+
+
+ } finally {
+ FSUpdateLog.testing_logReplayHook = null;
+ FSUpdateLog.testing_logReplayFinishHook = null;
+ }
+
+ }
+
+
+
+}
+
+/**
+
+ - update processor directly log, or pass through to update handler?
+ - only updates with versions (from leader) get logged?
+ - could also log user updates and assign versions later - but couldn't tell them it if failed!
+ - processor needs to know not to check versions
+ - this suggests processor should just directly log.
+
+ - we shouldn't be syncing when temporarily logging (this is called via run update processor currently, so we're ok)
+
+ Transition from "recovering" to "active" - need to ensure that no updates are lost.
+
+ - grab global write lock
+ - change state to active
+ - release global write lock
+
+ - UpdateHandler.setState(BUFFER_UPDATES) BUFFER_UPDATES, REPLAY_BUFFERED_UPDATES, ACTIVE
+
+ ulog.bufferUpdates
+ ulog.replayBufferedUpdates
+ ulog.makeActive
+
+ recoverFrom
+ bufferUpdates()
+ replayUpdates()
+ makeActive()
+
+ bufferUpdates()
+ replayBufferedUpdates() // block or provide a callback function when active?
+ - TODO: what if there are failures while replaying buffered updates?
+ - perhaps just provide functions to return stats about the last recovery?
+ - number of updates buffered
+ - number of updates errored
+ - time?
+
+ -
+
+**/
\ No newline at end of file