You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/19 18:52:28 UTC
svn commit: r1204028 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/update/
solrj/src/java/org/apache/solr/common/util/
Author: yonik
Date: Sat Nov 19 17:52:27 2011
New Revision: 1204028
URL: http://svn.apache.org/viewvc?rev=1204028&view=rev
Log:
SOLR-2808: reimplement log recovery for concurrent reading
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Sat Nov 19 17:52:27 2011
@@ -55,6 +55,9 @@ public class DeleteUpdateCommand extends
return indexedId;
}
+ public void setIndexedId(BytesRef indexedId) {
+ this.indexedId = indexedId;
+ }
@Override
public String toString() {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Sat Nov 19 17:52:27 2011
@@ -446,21 +446,16 @@ public class FSUpdateLog extends UpdateL
@Override
public boolean recoverFromLog() {
if (tlogFiles.length == 0) return false;
- TransactionLogReader tlogReader = null;
+ TransactionLog oldTlog = null; // todo - change name
try {
- tlogReader = new TransactionLogReader( new File(tlogDir, tlogFiles[tlogFiles.length-1]) );
- boolean completed = tlogReader.completed();
- if (completed) {
- return true;
- }
-
- recoveryExecutor.execute(new LogReplayer(tlogReader));
+ oldTlog = new TransactionLog( new File(tlogDir, tlogFiles[tlogFiles.length-1]), null, true );
+ recoveryExecutor.execute(new LogReplayer(oldTlog));
return true;
} catch (Exception ex) {
// an error during recovery
- uhandler.log.warn("Exception during recovery", ex);
- if (tlogReader != null) tlogReader.close();
+ log.warn("Exception during recovery", ex);
+ if (oldTlog != null) oldTlog.decref();
}
return false;
@@ -486,22 +481,38 @@ public class FSUpdateLog extends UpdateL
}
}
+
+
// TODO: do we let the log replayer run across core reloads?
class LogReplayer implements Runnable {
- TransactionLogReader tlogReader;
- public LogReplayer(TransactionLogReader tlogReader) {
- this.tlogReader = tlogReader;
+ TransactionLog tlog;
+ TransactionLog.LogReader tlogReader;
+
+
+ public LogReplayer(TransactionLog tlog) {
+ this.tlog = tlog;
}
@Override
public void run() {
uhandler.core.log.warn("Starting log replay " + tlogReader);
+ tlogReader = tlog.getReader();
+
SolrParams params = new ModifiableSolrParams();
long commitVersion = 0;
for(;;) {
- Object o = tlogReader.readNext();
+ Object o = null;
+
+ try {
+ o = tlogReader.next(null);
+ } catch (InterruptedException e) {
+ SolrException.log(log,e);
+ } catch (IOException e) {
+ SolrException.log(log,e);
+ }
+
if (o == null) break;
// create a new request each time since the update handler and core could change
@@ -529,11 +540,15 @@ public class FSUpdateLog extends UpdateL
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
uhandler.addDoc(cmd);
break;
+
+ // TODO: updates need to go through versioning code for handing reorders? (for replicas at least,
+ // depending on how they recover.
}
case UpdateLog.DELETE:
{
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
uhandler.delete(cmd);
@@ -542,13 +557,18 @@ public class FSUpdateLog extends UpdateL
case UpdateLog.DELETE_BY_QUERY:
{
- // TODO
+ String query = (String)entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ uhandler.delete(cmd);
break;
}
case UpdateLog.COMMIT:
{
- // TODO
+ // currently we don't log commits
commitVersion = version;
break;
}
@@ -559,15 +579,16 @@ public class FSUpdateLog extends UpdateL
} catch (IOException ex) {
} catch (ClassCastException cl) {
- uhandler.log.warn("Corrupt log", cl);
+ log.warn("Corrupt log", cl);
// would be caused by a corrupt transaction log
} catch (Exception ex) {
- uhandler.log.warn("Exception replaying log", ex);
+ log.warn("Exception replaying log", ex);
// something wrong with the request?
}
}
tlogReader.close();
+ tlog.decref();
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
@@ -578,16 +599,14 @@ public class FSUpdateLog extends UpdateL
try {
uhandler.commit(cmd);
} catch (IOException ex) {
- uhandler.log.error("Replay exception: final commit.", ex);
+ log.error("Replay exception: final commit.", ex);
}
- tlogReader.delete();
- uhandler.core.log.warn("Ending log replay " + tlogReader);
+ log.warn("Ending log replay " + tlogReader);
}
}
-
static ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Sat Nov 19 17:52:27 2011
@@ -23,12 +23,14 @@ import org.apache.solr.common.SolrInputD
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.zookeeper.Transaction;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -57,8 +59,7 @@ public class TransactionLog {
RandomAccessFile raf;
FileChannel channel;
OutputStream os;
- FastOutputStream fos;
- InputStream is;
+ FastOutputStream fos; // all accesses to this stream should be synchronized on "this"
volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
@@ -66,6 +67,8 @@ public class TransactionLog {
Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
List<String> globalStringList = new ArrayList<String>();
+ CountDownLatch latch; // if set, used to signal that we just added another log record
+
// write a BytesRef as a byte array
JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@Override
@@ -130,24 +133,54 @@ public class TransactionLog {
}
TransactionLog(File tlogFile, Collection<String> globalStrings) {
+ this(tlogFile, globalStrings, false);
+ }
+
+ TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
try {
this.tlogFile = tlogFile;
raf = new RandomAccessFile(this.tlogFile, "rw");
long start = raf.length();
- assert start==0;
- if (start > 0) {
- raf.setLength(0);
- }
- // System.out.println("###start= "+start);
channel = raf.getChannel();
os = Channels.newOutputStream(channel);
fos = FastOutputStream.wrap(os);
- addGlobalStrings(globalStrings);
+
+ if (openExisting) {
+ if (start > 0) {
+ readHeader(null);
+ fos.setWritten(start); // reflect that we aren't starting at the beginning
+ } else {
+ addGlobalStrings(globalStrings);
+ }
+ } else {
+ assert start==0;
+ if (start > 0) {
+ raf.setLength(0);
+ }
+ addGlobalStrings(globalStrings);
+ }
+
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
+ private void readHeader(FastInputStream fis) throws IOException {
+ // read existing header
+ fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
+ LogCodec codec = new LogCodec();
+ Map header = (Map)codec.unmarshal(fis);
+ // needed to read other records
+
+ synchronized (this) {
+ globalStringList = (List<String>)header.get("strings");
+ globalStringMap = new HashMap<String, Integer>(globalStringList.size());
+ for (int i=0; i<globalStringList.size(); i++) {
+ globalStringMap.put( globalStringList.get(i), i+1);
+ }
+ }
+ }
+
private void addGlobalStrings(Collection<String> strings) {
if (strings == null) return;
int origSize = globalStringMap.size();
@@ -205,7 +238,7 @@ public class TransactionLog {
// fos.flushBuffer(); // flush later
-
+ endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -229,6 +262,7 @@ public class TransactionLog {
BytesRef br = cmd.getIndexedId();
codec.writeByteArray(br.bytes, br.offset, br.length);
// fos.flushBuffer(); // flush later
+ endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -251,6 +285,7 @@ public class TransactionLog {
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
// fos.flushBuffer(); // flush later
+ endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -274,6 +309,7 @@ public class TransactionLog {
codec.writeLong(cmd.getVersion());
codec.writeStr(END_MESSAGE); // ensure these bytes are the last in the file
// fos.flushBuffer(); // flush later
+ endWrite();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -281,6 +317,12 @@ public class TransactionLog {
}
}
+ private void endWrite() {
+ if (latch != null) {
+ latch.countDown();
+ }
+ }
+
/* This method is thread safe */
public Object lookup(long pos) {
@@ -306,7 +348,10 @@ public class TransactionLog {
}
public void incref() {
- refcount.incrementAndGet();
+ int result = refcount.incrementAndGet();
+ if (result <= 1) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "incref on a closed log: " + this);
+ }
}
public void decref() {
@@ -350,11 +395,106 @@ public class TransactionLog {
return tlogFile.toString();
}
+ /** Returns a reader that can be used while a log is still in use.
+ * Currently only *one* log may be outstanding, and that log may only
+ * be used from a single thread. */
+ public LogReader getReader() {
+ return new LogReader();
+ }
+
+
+
+ public class LogReader {
+ ChannelFastInputStream fis = new ChannelFastInputStream(channel, 0);
+ private LogCodec codec = new LogCodec();
+
+ public LogReader() {
+ incref();
+ }
+
+ /** Returns the next object from the log. If this log is being concurrently written
+ * to, and you wish to block until a new record is available, then pass in a latch
+ * to use.
+ *
+ * @param latch The latch to use to block, waiting for the next log record
+ * @return The log record, or null if EOF
+ * @throws IOException
+ */
+ public Object next(CountDownLatch latch) throws IOException, InterruptedException {
+ long pos = fis.position();
+
+ synchronized (TransactionLog.this) {
+ if (pos >= fos.size()) {
+ // we caught up.
+ TransactionLog.this.latch = latch;
+
+
+ // TODO: how to prevent a race between catching up and
+ // switching to "active"... need higher level coordination?
+ // Probably since updating this log is normally done after
+ // updating the index. Perhaps more than one phase...
+ // 1) next() returns null
+ // 2) replayer sets flag to "almost active" and updates start going to the index
+ // 3) replayer continues calling next() to get any records that were added inbetween.
+ // This *still* doesn't work since a thread that skipped adding to the index could
+ // be delayed, next() could return null, and *then* the thread would write to the
+ // log.
+ // TODO: may need to utilize a read-write lock around all the updates (this
+ // may be needed for deleteByQuery anyway)
+ return null;
+ }
+
+ fos.flushBuffer();
+ }
+
+ if (TransactionLog.this.latch != null) {
+ TransactionLog.this.latch.await();
+
+ synchronized (TransactionLog.this) {
+ TransactionLog.this.latch = null;
+ if (fis.position() >= fos.size()) {
+ // still EOF... someone else must have tripped the latch.
+ return null;
+ }
+ fos.flushBuffer();
+ }
+
+ }
+
+ if (pos == 0) {
+ readHeader(fis);
+
+ // shouldn't currently happen - header and first record are currently written at the same time
+ synchronized (TransactionLog.this) {
+ if (fis.position() >= fos.size()) {
+ return null;
+ }
+ }
+ }
+
+ return codec.readVal(fis);
+ }
+
+ public void close() {
+ decref();
+ }
+
+ @Override
+ public String toString() {
+ synchronized (TransactionLog.this) {
+ return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+ }
+ }
+
+ }
+
}
+
+
class ChannelFastInputStream extends FastInputStream {
FileChannel ch;
- long chPosition;
+ private long chPosition;
public ChannelFastInputStream(FileChannel ch, long chPosition) {
super(null);
@@ -378,4 +518,3 @@ class ChannelFastInputStream extends Fas
}
}
-
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sat Nov 19 17:52:27 2011
@@ -20,9 +20,13 @@ package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** @lucene.experimental */
public abstract class UpdateLog implements PluginInfoInitialized {
+ public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
+
public enum SyncLevel { NONE, FLUSH, FSYNC }
public static final int ADD = 0x01;
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java Sat Nov 19 17:52:27 2011
@@ -27,6 +27,7 @@ public class FastInputStream extends Inp
private final byte[] buf;
private int pos;
private int end;
+ private long readFromStream; // number of bytes read from the underlying inputstream
public FastInputStream(InputStream in) {
// use default BUFSIZE of BufferedOutputStream so if we wrap that
@@ -73,7 +74,13 @@ public class FastInputStream extends Inp
}
public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
- return in.read(target, offset, len);
+ int ret = in.read(target, offset, len);
+ if (ret > 0) readFromStream += ret;
+ return ret;
+ }
+
+ public long position() {
+ return readFromStream - (end - pos);
}
public void refill() throws IOException {
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java Sat Nov 19 17:52:27 2011
@@ -25,7 +25,7 @@ import java.io.*;
public class FastOutputStream extends OutputStream implements DataOutput {
private final OutputStream out;
private final byte[] buf;
- private long written; // how many bytes written
+ private long written; // how many bytes written to the underlying stream
private int pos;
public FastOutputStream(OutputStream w) {
@@ -191,4 +191,17 @@ public class FastOutputStream extends Ou
public long size() {
return written + pos;
}
+
+ /** Returns the number of bytes actually written to the underlying OutputStream, not including
+ * anything currently buffered by this class itself.
+ */
+ public long written() {
+ return written;
+ }
+
+ /** Resets the count returned by written() */
+ public void setWritten(long written) {
+ this.written = written;
+ }
+
}