You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC
svn commit: r1235888 [5/12] - in /lucene/dev/trunk: dev-tools/eclipse/
dev-tools/maven/ solr/ solr/cloud-dev/
solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.rmi.registry.LocateRegistry;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Log Format: List{Operation, Version, ...}
+ * ADD, VERSION, DOC
+ * DELETE, VERSION, ID_BYTES
+ * DELETE_BY_QUERY, VERSION, String
+ *
+ * TODO: keep two files, one for [operation, version, id] and the other for the actual
+ * document data. That way we could throw away document log files more readily
+ * while retaining the smaller operation log files longer (and we can retrieve
+ * the stored fields from the latest documents from the index).
+ *
+ * This would require keeping all source fields stored of course.
+ *
+ * This would also allow to not log document data for requests with commit=true
+ * in them (since we know that if the request succeeds, all docs will be committed)
+ *
+ */
+public class TransactionLog {
+ public static Logger log = LoggerFactory.getLogger(TransactionLog.class);
+
+ public final static String END_MESSAGE="SOLR_TLOG_END";
+
+ long id;
+ File tlogFile;
+ RandomAccessFile raf;
+ FileChannel channel;
+ OutputStream os;
+ FastOutputStream fos; // all accesses to this stream should be synchronized on "this" (The TransactionLog)
+ int numRecords;
+
+ volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
+
+ AtomicInteger refcount = new AtomicInteger(1);
+ Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
+ List<String> globalStringList = new ArrayList<String>();
+ final boolean debug = log.isDebugEnabled();
+
+ long snapshot_size;
+ int snapshot_numRecords;
+
+ // write a BytesRef as a byte array
+ JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
+ @Override
+ public Object resolve(Object o, JavaBinCodec codec) throws IOException {
+ if (o instanceof BytesRef) {
+ BytesRef br = (BytesRef)o;
+ codec.writeByteArray(br.bytes, br.offset, br.length);
+ return null;
+ }
+ return o;
+ }
+ };
+
+ public class LogCodec extends JavaBinCodec {
+ public LogCodec() {
+ super(resolver);
+ }
+
+ @Override
+ public void writeExternString(String s) throws IOException {
+ if (s == null) {
+ writeTag(NULL);
+ return;
+ }
+
+ // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
+ Integer idx = globalStringMap.get(s);
+ if (idx == null) {
+ // write a normal string
+ writeStr(s);
+ } else {
+ // write the extern string
+ writeTag(EXTERN_STRING, idx);
+ }
+ }
+
+ @Override
+ public String readExternString(FastInputStream fis) throws IOException {
+ int idx = readSize(fis);
+ if (idx != 0) {// idx != 0 is the index of the extern string
+ // no need to synchronize globalStringList - it's only updated before the first record is written to the log
+ return globalStringList.get(idx - 1);
+ } else {// idx == 0 means it has a string value
+ // this shouldn't happen with this codec subclass.
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
+ }
+ }
+
+
+ }
+
+
+ TransactionLog(File tlogFile, Collection<String> globalStrings) throws IOException {
+ this(tlogFile, globalStrings, false);
+ }
+
+ TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) throws IOException {
+ try {
+ if (debug) {
+ log.debug("New TransactionLog file=" + tlogFile + ", exists=" + tlogFile.exists() + ", size=" + tlogFile.length() + ", openExisting=" + openExisting);
+ }
+
+ this.tlogFile = tlogFile;
+ raf = new RandomAccessFile(this.tlogFile, "rw");
+ long start = raf.length();
+ channel = raf.getChannel();
+ os = Channels.newOutputStream(channel);
+ fos = FastOutputStream.wrap(os);
+
+ if (openExisting) {
+ if (start > 0) {
+ readHeader(null);
+ raf.seek(start);
+ assert channel.position() == start;
+ fos.setWritten(start); // reflect that we aren't starting at the beginning
+ assert fos.size() == channel.size();
+ } else {
+ addGlobalStrings(globalStrings);
+ }
+ } else {
+ assert start==0;
+ if (start > 0) {
+ raf.setLength(0);
+ }
+ addGlobalStrings(globalStrings);
+ }
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ /** Returns the number of records in the log (currently includes the header and an optional commit).
+ * Note: currently returns 0 for reopened existing log files.
+ */
+ public int numRecords() {
+ synchronized (this) {
+ return this.numRecords;
+ }
+ }
+
+ public boolean endsWithCommit() throws IOException {
+ long size;
+ synchronized (this) {
+ fos.flush();
+ size = fos.size();
+ }
+
+
+ // the end of the file should have the end message (added during a commit) plus a 4 byte size
+ byte[] buf = new byte[ END_MESSAGE.length() ];
+ long pos = size - END_MESSAGE.length() - 4;
+ if (pos < 0) return false;
+ ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
+ is.read(buf);
+ for (int i=0; i<buf.length; i++) {
+ if (buf[i] != END_MESSAGE.charAt(i)) return false;
+ }
+ return true;
+ }
+
+ /** takes a snapshot of the current position and number of records
+ * for later possible rollback, and returns the position */
+ public long snapshot() {
+ synchronized (this) {
+ snapshot_size = fos.size();
+ snapshot_numRecords = numRecords;
+ return snapshot_size;
+ }
+ }
+
+ // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
+ // This should only be used to roll back buffered updates, not actually applied updates.
+ public void rollback(long pos) throws IOException {
+ synchronized (this) {
+ assert snapshot_size == pos;
+ fos.flush();
+ raf.setLength(pos);
+ fos.setWritten(pos);
+ assert fos.size() == pos;
+ numRecords = snapshot_numRecords;
+ }
+ }
+
+
+ public long writeData(Object o) {
+ LogCodec codec = new LogCodec();
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ codec.init(fos);
+ codec.writeVal(o);
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+
+ private void readHeader(FastInputStream fis) throws IOException {
+ // read existing header
+ fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
+ LogCodec codec = new LogCodec();
+ Map header = (Map)codec.unmarshal(fis);
+
+ fis.readInt(); // skip size
+
+ // needed to read other records
+
+ synchronized (this) {
+ globalStringList = (List<String>)header.get("strings");
+ globalStringMap = new HashMap<String, Integer>(globalStringList.size());
+ for (int i=0; i<globalStringList.size(); i++) {
+ globalStringMap.put( globalStringList.get(i), i+1);
+ }
+ }
+ }
+
+ private void addGlobalStrings(Collection<String> strings) {
+ if (strings == null) return;
+ int origSize = globalStringMap.size();
+ for (String s : strings) {
+ Integer idx = null;
+ if (origSize > 0) {
+ idx = globalStringMap.get(s);
+ }
+ if (idx != null) continue; // already in list
+ globalStringList.add(s);
+ globalStringMap.put(s, globalStringList.size());
+ }
+ assert globalStringMap.size() == globalStringList.size();
+ }
+
+ Collection<String> getGlobalStrings() {
+ synchronized (this) {
+ return new ArrayList<String>(globalStringList);
+ }
+ }
+
+ private void writeLogHeader(LogCodec codec) throws IOException {
+ long pos = fos.size();
+ assert pos == 0;
+
+ Map header = new LinkedHashMap<String,Object>();
+ header.put("SOLR_TLOG",1); // a magic string + version number
+ header.put("strings",globalStringList);
+ codec.marshal(header, fos);
+
+ endRecord(pos);
+ }
+
+ private void endRecord(long startRecordPosition) throws IOException {
+ fos.writeInt((int)(fos.size() - startRecordPosition));
+ numRecords++;
+ }
+
+
+ public long write(AddUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ long pos = 0;
+ synchronized (this) {
+ try {
+ pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ SolrInputDocument sdoc = cmd.getSolrInputDocument();
+
+ if (pos == 0) { // TODO: needs to be changed if we start writing a header first
+ addGlobalStrings(sdoc.getFieldNames());
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+
+ /***
+ System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+ if (pos != fos.size()) {
+ throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+ }
+ ***/
+
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.ADD); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+
+ endRecord(pos);
+ // fos.flushBuffer(); // flush later
+ return pos;
+ } catch (IOException e) {
+ // TODO: reset our file pointer back to "pos", the start of this record.
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
+ }
+ }
+ }
+
+ public long writeDelete(DeleteUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (this) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.DELETE); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ BytesRef br = cmd.getIndexedId();
+ codec.writeByteArray(br.bytes, br.offset, br.length);
+
+ endRecord(pos);
+ // fos.flushBuffer(); // flush later
+
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+ public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (this) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeStr(cmd.query);
+
+ endRecord(pos);
+ // fos.flushBuffer(); // flush later
+
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+
+ public long writeCommit(CommitUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (this) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.COMMIT); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
+
+ endRecord(pos);
+
+ fos.flush(); // flush since this will be the last record in a log fill
+ assert fos.size() == channel.size();
+
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+
+ /* This method is thread safe */
+ public Object lookup(long pos) {
+ // A negative position can result from a log replay (which does not re-log, but does
+ // update the version map. This is OK since the node won't be ACTIVE when this happens.
+ if (pos < 0) return null;
+
+ try {
+ // make sure any unflushed buffer has been flushed
+ synchronized (this) {
+ // TODO: optimize this by keeping track of what we have flushed up to
+ fos.flushBuffer();
+ /***
+ System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+ if (fos.size() != raf.length() || pos >= fos.size() ) {
+ throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+ }
+ ***/
+ }
+
+ ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
+ LogCodec codec = new LogCodec();
+ return codec.readVal(fis);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public void incref() {
+ int result = refcount.incrementAndGet();
+ if (result <= 1) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "incref on a closed log: " + this);
+ }
+ }
+
+ public boolean try_incref() {
+ return refcount.incrementAndGet() > 1;
+ }
+
+ public void decref() {
+ if (refcount.decrementAndGet() == 0) {
+ close();
+ }
+ }
+
+ /** returns the current position in the log file */
+ public long position() {
+ synchronized (this) {
+ return fos.size();
+ }
+ }
+
+ public void finish(UpdateLog.SyncLevel syncLevel) {
+ if (syncLevel == UpdateLog.SyncLevel.NONE) return;
+ try {
+ synchronized (this) {
+ fos.flushBuffer();
+ }
+
+ if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
+ // Since fsync is outside of synchronized block, we can end up with a partial
+ // last record on power failure (which is OK, and does not represent an error...
+ // we just need to be aware of it when reading).
+ raf.getFD().sync();
+ }
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ private void close() {
+ try {
+ if (debug) {
+ log.debug("Closing tlog" + this);
+ }
+
+ synchronized (this) {
+ fos.flush();
+ fos.close();
+ }
+
+ if (deleteOnClose) {
+ tlogFile.delete();
+ }
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public void forceClose() {
+ if (refcount.get() > 0) {
+ log.error("Error: Forcing close of " + this);
+ refcount.set(0);
+ close();
+ }
+ }
+
+ public String toString() {
+ return "tlog{file=" + tlogFile.toString() + " refcount=" + refcount.get() + "}";
+ }
+
+ /** Returns a reader that can be used while a log is still in use.
+ * Currently only *one* LogReader may be outstanding, and that log may only
+ * be used from a single thread. */
+ public LogReader getReader(long startingPos) {
+ return new LogReader(startingPos);
+ }
+
+ /** Returns a single threaded reverse reader */
+ public ReverseReader getReverseReader() throws IOException {
+ return new ReverseReader();
+ }
+
+
+ public class LogReader {
+ ChannelFastInputStream fis;
+ private LogCodec codec = new LogCodec();
+
+ public LogReader(long startingPos) {
+ incref();
+ fis = new ChannelFastInputStream(channel, startingPos);
+ }
+
+ /** Returns the next object from the log, or null if none available.
+ *
+ * @return The log record, or null if EOF
+ * @throws IOException
+ */
+ public Object next() throws IOException, InterruptedException {
+ long pos = fis.position();
+
+
+ synchronized (TransactionLog.this) {
+ if (debug) {
+ log.debug("Reading log record. pos="+pos+" currentSize="+fos.size());
+ }
+
+ if (pos >= fos.size()) {
+ return null;
+ }
+
+ fos.flushBuffer();
+ }
+
+ if (pos == 0) {
+ readHeader(fis);
+
+ // shouldn't currently happen - header and first record are currently written at the same time
+ synchronized (TransactionLog.this) {
+ if (fis.position() >= fos.size()) {
+ return null;
+ }
+ pos = fis.position();
+ }
+ }
+
+ Object o = codec.readVal(fis);
+
+ // skip over record size
+ int size = fis.readInt();
+ assert size == fis.position() - pos - 4;
+
+ return o;
+ }
+
+ public void close() {
+ decref();
+ }
+
+ @Override
+ public String toString() {
+ synchronized (TransactionLog.this) {
+ return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+ }
+ }
+
+ }
+
+ public class ReverseReader {
+ ChannelFastInputStream fis;
+ private LogCodec codec = new LogCodec() {
+ @Override
+ public SolrInputDocument readSolrInputDocument(FastInputStream dis) throws IOException {
+ // Given that the SolrInputDocument is last in an add record, it's OK to just skip
+ // reading it completely.
+ return null;
+ }
+ };
+
+ int nextLength; // length of the next record (the next one closer to the start of the log file)
+ long prevPos; // where we started reading from last time (so prevPos - nextLength == start of next record)
+
+ public ReverseReader() throws IOException {
+ incref();
+
+ long sz;
+ synchronized (TransactionLog.this) {
+ fos.flushBuffer();
+ sz = fos.size();
+ assert sz == channel.size();
+ }
+
+ fis = new ChannelFastInputStream(channel, 0);
+ if (sz >=4) {
+ // readHeader(fis); // should not be needed
+ prevPos = sz - 4;
+ fis.seek(prevPos);
+ nextLength = fis.readInt();
+ }
+ }
+
+
+ /** Returns the next object from the log, or null if none available.
+ *
+ * @return The log record, or null if EOF
+ * @throws IOException
+ */
+ public Object next() throws IOException {
+ if (prevPos <= 0) return null;
+
+ long endOfThisRecord = prevPos;
+
+ int thisLength = nextLength;
+
+ long recordStart = prevPos - thisLength; // back up to the beginning of the next record
+ prevPos = recordStart - 4; // back up 4 more to read the length of the next record
+
+ if (prevPos <= 0) return null; // this record is the header
+
+ long bufferPos = fis.getBufferPos();
+ if (prevPos >= bufferPos) {
+ // nothing to do... we're within the current buffer
+ } else {
+ // Position buffer so that this record is at the end.
+ // For small records, this will cause subsequent calls to next() to be within the buffer.
+ long seekPos = endOfThisRecord - fis.getBufferSize();
+ seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
+ seekPos = Math.max(seekPos, 0);
+ fis.seek(seekPos);
+ fis.peek(); // cause buffer to be filled
+ }
+
+ fis.seek(prevPos);
+ nextLength = fis.readInt(); // this is the length of the *next* record (i.e. closer to the beginning)
+
+ // TODO: optionally skip document data
+ Object o = codec.readVal(fis);
+
+ // assert fis.position() == prevPos + 4 + thisLength; // this is only true if we read all the data (and we currently skip reading SolrInputDocument
+
+ return o;
+ }
+
+ /* returns the position in the log file of the last record returned by next() */
+ public long position() {
+ return prevPos + 4; // skip the length
+ }
+
+ public void close() {
+ decref();
+ }
+
+ @Override
+ public String toString() {
+ synchronized (TransactionLog.this) {
+ return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+ }
+ }
+
+
+ }
+
+}
+
+
+
+class ChannelFastInputStream extends FastInputStream {
+ private FileChannel ch;
+
+ public ChannelFastInputStream(FileChannel ch, long chPosition) {
+ // super(null, new byte[10],0,0); // a small buffer size for testing purposes
+ super(null);
+ this.ch = ch;
+ super.readFromStream = chPosition;
+ }
+
+ @Override
+ public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
+ ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
+ int ret = ch.read(bb, readFromStream);
+ return ret;
+ }
+
+ public void seek(long position) throws IOException {
+ if (position <= readFromStream && position >= getBufferPos()) {
+ // seek within buffer
+ pos = (int)(position - getBufferPos());
+ } else {
+ // long currSize = ch.size(); // not needed - underlying read should handle (unless read never done)
+ // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
+ readFromStream = position;
+ end = pos = 0;
+ }
+ assert position() == position;
+ }
+
+ /** where is the start of the buffer relative to the whole file */
+ public long getBufferPos() {
+ return readFromStream - end;
+ }
+
+ public int getBufferSize() {
+ return buf.length;
+ }
+
+ @Override
+ public void close() throws IOException {
+ ch.close();
+ }
+
+ @Override
+ public String toString() {
+ return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
+ }
+}
+
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -17,6 +17,7 @@
package org.apache.solr.update;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.request.SolrQueryRequest;
@@ -24,17 +25,56 @@ import org.apache.solr.request.SolrQuery
*
*
*/
- public class UpdateCommand {
- protected final SolrQueryRequest req;
- protected final String commandName;
-
- public UpdateCommand(String commandName, SolrQueryRequest req) {
- this.req = req;
- this.commandName = commandName;
- }
+public abstract class UpdateCommand implements Cloneable {
+ protected SolrQueryRequest req;
+ protected long version;
+ protected int flags;
+
+ public static int BUFFERING = 0x00000001; // update command is being buffered.
+ public static int REPLAY = 0x00000002; // update command is from replaying a log.
+ public static int PEER_SYNC = 0x00000004; // update command is a missing update being provided by a peer.
+ public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
+
+ public UpdateCommand(SolrQueryRequest req) {
+ this.req = req;
+ }
+
+ public abstract String name();
+
+ @Override
+ public String toString() {
+ return name() + "{flags="+flags+",version="+version;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
+ public void setFlags(int flags) {
+ this.flags = flags;
+ }
+
+ public int getFlags() {
+ return flags;
+ }
+
+ public SolrQueryRequest getReq() {
+ return req;
+ }
+
+ public void setReq(SolrQueryRequest req) {
+ this.req = req;
+ }
- @Override
- public String toString() {
- return commandName;
+ @Override
+ public UpdateCommand clone() {
+ try {
+ return (UpdateCommand) super.clone();
+ } catch (CloneNotSupportedException e) {
+ return null;
}
}
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Wed Jan 25 19:49:26 2012
@@ -88,12 +88,11 @@ public abstract class UpdateHandler impl
private void initLog() {
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
if (ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
- ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
- } else {
- ulog = new NullUpdateLog();
- ulog.init(null);
+ ulog = new UpdateLog();
+ ulog.init(ulogPluginInfo);
+ // ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
+ ulog.init(this, core);
}
- ulog.init(this, core);
}
@@ -123,16 +122,7 @@ public abstract class UpdateHandler impl
parseEventListeners();
initLog();
}
-
- /**
- * Allows the UpdateHandler to create the SolrIndexSearcher after it
- * has issued a 'softCommit'.
- *
- * @param previousSearcher
- * @throws IOException
- */
- public abstract SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException;
-
+
/**
* Called when the Writer should be opened again - eg when replication replaces
* all of the index files.
@@ -141,7 +131,7 @@ public abstract class UpdateHandler impl
*/
public abstract void newIndexWriter() throws IOException;
- public abstract SolrCoreState getIndexWriterProvider();
+ public abstract SolrCoreState getSolrCoreState();
public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
public abstract void delete(DeleteUpdateCommand cmd) throws IOException;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Wed Jan 25 19:49:26 2012
@@ -18,23 +18,1051 @@
package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
+import org.apache.solr.update.processor.RunUpdateProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
/** @lucene.experimental */
-public abstract class UpdateLog implements PluginInfoInitialized {
- public static final int ADD = 0x00;
- public static final int DELETE = 0x01;
- public static final int DELETE_BY_QUERY = 0x02;
-
- public abstract void init(UpdateHandler uhandler, SolrCore core);
- public abstract void add(AddUpdateCommand cmd);
- public abstract void delete(DeleteUpdateCommand cmd);
- public abstract void deleteByQuery(DeleteUpdateCommand cmd);
- public abstract void preCommit(CommitUpdateCommand cmd);
- public abstract void postCommit(CommitUpdateCommand cmd);
- public abstract void preSoftCommit(CommitUpdateCommand cmd);
- public abstract void postSoftCommit(CommitUpdateCommand cmd);
- public abstract Object lookup(BytesRef indexedId);
- public abstract void close();
+public class UpdateLog implements PluginInfoInitialized {
+ public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
+ public boolean debug = log.isDebugEnabled();
+
+
+ public enum SyncLevel { NONE, FLUSH, FSYNC }
+ public enum State { REPLAYING, BUFFERING, APPLYING_BUFFERED, ACTIVE }
+
+ public static final int ADD = 0x01;
+ public static final int DELETE = 0x02;
+ public static final int DELETE_BY_QUERY = 0x03;
+ public static final int COMMIT = 0x04;
+
+ public static class RecoveryInfo {
+ public long positionOfStart;
+
+ public int adds;
+ public int deletes;
+ public int deleteByQuery;
+ public int errors;
+ }
+
+
+
+ public static String TLOG_NAME="tlog";
+
+ long id = -1;
+ private State state = State.ACTIVE;
+
+ private TransactionLog tlog;
+ private TransactionLog prevTlog;
+ private Deque<TransactionLog> logs = new LinkedList<TransactionLog>(); // list of recent logs, newest first
+ private TransactionLog newestLogOnStartup;
+ private int numOldRecords; // number of records in the recent logs
+
+ private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>();
+ private Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening
+ private Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening
+ private TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
+ private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
+
+ private final int numDeletesToKeep = 1000;
+ private final int numRecordsToKeep = 100;
+ // keep track of deletes only... this is not updated on an add
+ private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > numDeletesToKeep;
+ }
+ };
+
+ private String[] tlogFiles;
+ private File tlogDir;
+ private Collection<String> globalStrings;
+
+ private String dataDir;
+ private String lastDataDir;
+
+ private VersionInfo versionInfo;
+
+ private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
+
+ private volatile UpdateHandler uhandler; // a core reload can change this reference!
+ private volatile boolean cancelApplyBufferUpdate;
+
+
+ public static class LogPtr {
+ final long pointer;
+ final long version;
+
+ public LogPtr(long pointer, long version) {
+ this.pointer = pointer;
+ this.version = version;
+ }
+
+ public String toString() {
+ return "LogPtr(" + pointer + ")";
+ }
+ }
+
+
+ public VersionInfo getVersionInfo() {
+ return versionInfo;
+ }
+
+ public void init(PluginInfo info) {
+ dataDir = (String)info.initArgs.get("dir");
+ }
+
+ public void init(UpdateHandler uhandler, SolrCore core) {
+ if (dataDir == null || dataDir.length()==0) {
+ dataDir = core.getDataDir();
+ }
+
+ this.uhandler = uhandler;
+
+ if (dataDir.equals(lastDataDir)) {
+ // on a normal reopen, we currently shouldn't have to do anything
+ return;
+ }
+ lastDataDir = dataDir;
+ tlogDir = new File(dataDir, TLOG_NAME);
+ tlogDir.mkdirs();
+ tlogFiles = getLogList(tlogDir);
+ id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
+
+ TransactionLog oldLog = null;
+ for (String oldLogName : tlogFiles) {
+ File f = new File(tlogDir, oldLogName);
+ try {
+ oldLog = new TransactionLog( f, null, true );
+ addOldLog(oldLog);
+ } catch (Exception e) {
+ SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
+ f.delete();
+ }
+ }
+ newestLogOnStartup = oldLog;
+
+ versionInfo = new VersionInfo(uhandler, 256);
+ }
+
+ public File getLogDir() {
+ return tlogDir;
+ }
+
+ /* Takes over ownership of the log, keeping it until no longer needed
+ and then decrementing it's reference and dropping it.
+ */
+ private void addOldLog(TransactionLog oldLog) {
+ if (oldLog == null) return;
+
+ numOldRecords += oldLog.numRecords();
+
+ int currRecords = numOldRecords;
+
+ if (oldLog != tlog && tlog != null) {
+ currRecords += tlog.numRecords();
+ }
+
+ while (logs.size() > 0) {
+ TransactionLog log = logs.peekLast();
+ int nrec = log.numRecords();
+ // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+ // we already have the limit of 10 log files.
+ if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
+ currRecords -= nrec;
+ numOldRecords -= nrec;
+ logs.removeLast().decref(); // dereference so it will be deleted when no longer in use
+ continue;
+ }
+
+ break;
+ }
+
+ // don't incref... we are taking ownership from the caller.
+ logs.addFirst(oldLog);
+ }
+
+
+ public static String[] getLogList(File directory) {
+ final String prefix = TLOG_NAME+'.';
+ String[] names = directory.list(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith(prefix);
+ }
+ });
+ Arrays.sort(names);
+ return names;
+ }
+
+
+ public long getLastLogId() {
+ if (id != -1) return id;
+ if (tlogFiles.length == 0) return -1;
+ String last = tlogFiles[tlogFiles.length-1];
+ return Long.parseLong(last.substring(TLOG_NAME.length()+1));
+ }
+
+
+ public void add(AddUpdateCommand cmd) {
+ // don't log if we are replaying from another log
+ // TODO: we currently need to log to maintain correct versioning, rtg, etc
+ // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
+
+ synchronized (this) {
+ long pos = -1;
+
+ // don't log if we are replaying from another log
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog();
+ pos = tlog.write(cmd);
+ }
+
+ // TODO: in the future we could support a real position for a REPLAY update.
+ // Only currently would be useful for RTG while in recovery mode though.
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+ // only update our map if we're not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ map.put(cmd.getIndexedId(), ptr);
+ }
+
+ if (debug) {
+ log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+ public void delete(DeleteUpdateCommand cmd) {
+ BytesRef br = cmd.getIndexedId();
+
+ synchronized (this) {
+ long pos = -1;
+
+ // don't log if we are replaying from another log
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog();
+ pos = tlog.writeDelete(cmd);
+ }
+
+ LogPtr ptr = new LogPtr(pos, cmd.version);
+
+ // only update our map if we're not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ map.put(br, ptr);
+
+ oldDeletes.put(br, ptr);
+ }
+
+ if (debug) {
+ log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+ public void deleteByQuery(DeleteUpdateCommand cmd) {
+ synchronized (this) {
+ long pos = -1;
+ // don't log if we are replaying from another log
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog();
+ pos = tlog.writeDeleteByQuery(cmd);
+ }
+
+ // only change our caches if we are not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ // given that we just did a delete-by-query, we don't know what documents were
+ // affected and hence we must purge our caches.
+ map.clear();
+
+ // oldDeletes.clear();
+
+ // We must cause a new IndexReader to be opened before anything looks at these caches again
+ // so that a cache miss will read fresh data.
+ //
+ // TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands
+ try {
+ RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+ holder.decref();
+ } catch (Throwable e) {
+ SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+ }
+
+ }
+
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+ if (debug) {
+ log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+
+ private void newMap() {
+ prevMap2 = prevMap;
+ prevMapLog2 = prevMapLog;
+
+ prevMap = map;
+ prevMapLog = tlog;
+
+ map = new HashMap<BytesRef, LogPtr>();
+ }
+
+ private void clearOldMaps() {
+ prevMap = null;
+ prevMap2 = null;
+ }
+
+ public void preCommit(CommitUpdateCommand cmd) {
+ synchronized (this) {
+ if (debug) {
+ log.debug("TLOG: preCommit");
+ }
+
+ if (getState() != State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // if we aren't in the active state, and this isn't a replay
+ // from the recovery process, then we shouldn't mess with
+ // the current transaction log. This normally shouldn't happen
+ // as DistributedUpdateProcessor will prevent this. Commits
+ // that don't use the processor are possible though.
+ return;
+ }
+
+ // since we're changing the log, we must change the map.
+ newMap();
+
+ // since document additions can happen concurrently with commit, create
+ // a new transaction log first so that we know the old one is definitely
+ // in the index.
+ prevTlog = tlog;
+ tlog = null;
+ id++;
+
+ if (prevTlog != null) {
+ globalStrings = prevTlog.getGlobalStrings();
+ }
+
+ addOldLog(prevTlog);
+ }
+ }
+
+ public void postCommit(CommitUpdateCommand cmd) {
+ synchronized (this) {
+ if (debug) {
+ log.debug("TLOG: postCommit");
+ }
+ if (prevTlog != null) {
+ // if we made it through the commit, write a commit command to the log
+ // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
+ prevTlog.writeCommit(cmd);
+ // the old log list will decref when no longer needed
+ // prevTlog.decref();
+ prevTlog = null;
+ }
+ }
+ }
+
+ public void preSoftCommit(CommitUpdateCommand cmd) {
+ debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+
+ synchronized (this) {
+
+ if (!cmd.softCommit) return; // already handled this at the start of the hard commit
+ newMap();
+
+ // start adding documents to a new map since we won't know if
+ // any added documents will make it into this commit or not.
+ // But we do know that any updates already added will definitely
+ // show up in the latest reader after the commit succeeds.
+ map = new HashMap<BytesRef, LogPtr>();
+
+ if (debug) {
+ log.debug("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+ public void postSoftCommit(CommitUpdateCommand cmd) {
+ synchronized (this) {
+ // We can clear out all old maps now that a new searcher has been opened.
+ // This currently only works since DUH2 synchronizes around preCommit to avoid
+ // it being called in the middle of a preSoftCommit, postSoftCommit sequence.
+ // If this DUH2 synchronization were to be removed, preSoftCommit should
+ // record what old maps were created and only remove those.
+
+ if (debug) {
+ SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2));
+ }
+ clearOldMaps();
+ }
+ }
+
+ public Object lookup(BytesRef indexedId) {
+ LogPtr entry;
+ TransactionLog lookupLog;
+
+ synchronized (this) {
+ entry = map.get(indexedId);
+ lookupLog = tlog; // something found in "map" will always be in "tlog"
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ if (entry == null && prevMap != null) {
+ entry = prevMap.get(indexedId);
+ // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+ lookupLog = prevMapLog;
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ if (entry == null && prevMap2 != null) {
+ entry = prevMap2.get(indexedId);
+ // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+ lookupLog = prevMapLog2;
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+
+ if (entry == null) {
+ return null;
+ }
+ lookupLog.incref();
+ }
+
+ try {
+ // now do the lookup outside of the sync block for concurrency
+ return lookupLog.lookup(entry.pointer);
+ } finally {
+ lookupLog.decref();
+ }
+
+ }
+
+ // This method works like realtime-get... it only guarantees to return the latest
+ // version of the *completed* update. There can be updates in progress concurrently
+ // that have already grabbed higher version numbers. Higher level coordination or
+ // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
+ public Long lookupVersion(BytesRef indexedId) {
+ LogPtr entry;
+ TransactionLog lookupLog;
+
+ synchronized (this) {
+ entry = map.get(indexedId);
+ lookupLog = tlog; // something found in "map" will always be in "tlog"
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ if (entry == null && prevMap != null) {
+ entry = prevMap.get(indexedId);
+ // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+ lookupLog = prevMapLog;
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ if (entry == null && prevMap2 != null) {
+ entry = prevMap2.get(indexedId);
+ // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+ lookupLog = prevMapLog2;
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ }
+
+ if (entry != null) {
+ return entry.version;
+ }
+
+ // Now check real index
+ Long version = versionInfo.getVersionFromIndex(indexedId);
+
+ if (version != null) {
+ return version;
+ }
+
+ // We can't get any version info for deletes from the index, so if the doc
+ // wasn't found, check a cache of recent deletes.
+
+ synchronized (this) {
+ entry = oldDeletes.get(indexedId);
+ }
+
+ if (entry != null) {
+ return entry.version;
+ }
+
+ return null;
+ }
+
+ public void finish(SyncLevel syncLevel) {
+ if (syncLevel == null) {
+ syncLevel = defaultSyncLevel;
+ }
+ if (syncLevel == SyncLevel.NONE) {
+ return;
+ }
+
+ TransactionLog currLog;
+ synchronized (this) {
+ currLog = tlog;
+ if (currLog == null) return;
+ currLog.incref();
+ }
+
+ try {
+ currLog.finish(syncLevel);
+ } finally {
+ currLog.decref();
+ }
+ }
+
+ public Future<RecoveryInfo> recoverFromLog() {
+ recoveryInfo = new RecoveryInfo();
+ if (newestLogOnStartup == null) return null;
+
+ if (!newestLogOnStartup.try_incref()) return null; // log file was already closed
+
+ // now that we've incremented the reference, the log shouldn't go away.
+ try {
+ if (newestLogOnStartup.endsWithCommit()) {
+ newestLogOnStartup.decref();
+ return null;
+ }
+ } catch (IOException e) {
+ log.error("Error inspecting tlog " + newestLogOnStartup);
+ newestLogOnStartup.decref();
+ return null;
+ }
+
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+ LogReplayer replayer = new LogReplayer(newestLogOnStartup, false);
+
+ versionInfo.blockUpdates();
+ try {
+ state = State.REPLAYING;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+
+ return cs.submit(replayer, recoveryInfo);
+
+ }
+
+
+ private void ensureLog() {
+ if (tlog == null) {
+ String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+ try {
+ tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't open new tlog!", e);
+ }
+ }
+ }
+
+ public void close() {
+ synchronized (this) {
+ try {
+ recoveryExecutor.shutdownNow();
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ }
+
+ // Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions
+
+ if (prevTlog != null) {
+ prevTlog.deleteOnClose = false;
+ prevTlog.decref();
+ prevTlog.forceClose();
+ }
+ if (tlog != null) {
+ tlog.deleteOnClose = false;
+ tlog.decref();
+ tlog.forceClose();
+ }
+
+ for (TransactionLog log : logs) {
+ log.deleteOnClose = false;
+ log.decref();
+ log.forceClose();
+ }
+
+ }
+ }
+
+
+ static class Update {
+ TransactionLog log;
+ long version;
+ long pointer;
+ }
+
+ public class RecentUpdates {
+ Deque<TransactionLog> logList; // newest first
+ List<List<Update>> updateList;
+ HashMap<Long, Update> updates;
+ List<Update> deleteByQueryList;
+
+
+ public List<Long> getVersions(int n) {
+ List<Long> ret = new ArrayList(n);
+
+ for (List<Update> singleList : updateList) {
+ for (Update ptr : singleList) {
+ ret.add(ptr.version);
+ if (--n <= 0) return ret;
+ }
+ }
+
+ return ret;
+ }
+
+ public Object lookup(long version) {
+ Update update = updates.get(version);
+ if (update == null) return null;
+
+ return update.log.lookup(update.pointer);
+ }
+
+ /** Returns the list of deleteByQueries that happened after the given version */
+ public List<Object> getDeleteByQuery(long afterVersion) {
+ List<Object> result = new ArrayList<Object>(deleteByQueryList.size());
+ for (Update update : deleteByQueryList) {
+ if (Math.abs(update.version) > afterVersion) {
+ Object dbq = update.log.lookup(update.pointer);
+ result.add(dbq);
+ }
+ }
+ return result;
+ }
+
+ private void update() {
+ int numUpdates = 0;
+ updateList = new ArrayList<List<Update>>(logList.size());
+ deleteByQueryList = new ArrayList<Update>();
+ updates = new HashMap<Long,Update>(numRecordsToKeep);
+
+ for (TransactionLog oldLog : logList) {
+ List<Update> updatesForLog = new ArrayList<Update>();
+
+ TransactionLog.ReverseReader reader = null;
+ try {
+ reader = oldLog.getReverseReader();
+
+ while (numUpdates < numRecordsToKeep) {
+ Object o = reader.next();
+ if (o==null) break;
+ try {
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ // TODO: refactor this out so we get common error handling
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ case UpdateLog.DELETE:
+ case UpdateLog.DELETE_BY_QUERY:
+ Update update = new Update();
+ update.log = oldLog;
+ update.pointer = reader.position();
+ update.version = version;
+
+ updatesForLog.add(update);
+ updates.put(version, update);
+
+ if (oper == UpdateLog.DELETE_BY_QUERY) {
+ deleteByQueryList.add(update);
+ }
+
+ break;
+
+ case UpdateLog.COMMIT:
+ break;
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+ } catch (ClassCastException cl) {
+ log.warn("Unexpected log entry or corrupt log. Entry=" + o, cl);
+ // would be caused by a corrupt transaction log
+ } catch (Exception ex) {
+ log.warn("Exception reverse reading log", ex);
+ break;
+ }
+ }
+
+ } catch (IOException e) {
+ // failure to read a log record isn't fatal
+ log.error("Exception reading versions from log",e);
+ } finally {
+ if (reader != null) reader.close();
+ }
+
+ updateList.add(updatesForLog);
+ }
+
+ }
+
+ public void close() {
+ for (TransactionLog log : logList) {
+ log.decref();
+ }
+ }
+ }
+
+
+ public RecentUpdates getRecentUpdates() {
+ Deque<TransactionLog> logList;
+ synchronized (this) {
+ logList = new LinkedList<TransactionLog>(logs);
+ for (TransactionLog log : logList) {
+ log.incref();
+ }
+ if (prevTlog != null) {
+ prevTlog.incref();
+ logList.addFirst(prevTlog);
+ }
+ if (tlog != null) {
+ tlog.incref();
+ logList.addFirst(tlog);
+ }
+ }
+
+ // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
+ // one of the updates I originally handed out fell off the list). Over-request?
+ RecentUpdates recentUpdates = new RecentUpdates();
+ recentUpdates.logList = logList;
+ recentUpdates.update();
+
+ return recentUpdates;
+ }
+
+ public void bufferUpdates() {
+ // recovery trips this assert under some race - even when
+ // it checks the state first
+ // assert state == State.ACTIVE;
+
+ recoveryInfo = new RecoveryInfo();
+
+ // block all updates to eliminate race conditions
+ // reading state and acting on it in the update processor
+ versionInfo.blockUpdates();
+ try {
+ if (state != State.ACTIVE) return;
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting to buffer updates. " + this);
+ }
+
+ // since we blocked updates, this synchronization shouldn't strictly be necessary.
+ synchronized (this) {
+ recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+ }
+
+ state = State.BUFFERING;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+ }
+
+ /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */
+ public boolean dropBufferedUpdates() {
+ versionInfo.blockUpdates();
+ try {
+ if (state != State.BUFFERING) return false;
+
+ if (log.isInfoEnabled()) {
+ log.info("Dropping buffered updates " + this);
+ }
+
+ // since we blocked updates, this synchronization shouldn't strictly be necessary.
+ synchronized (this) {
+ if (tlog != null) {
+ tlog.rollback(recoveryInfo.positionOfStart);
+ }
+ }
+
+ state = State.ACTIVE;
+ } catch (IOException e) {
+ SolrException.log(log,"Error attempting to roll back log", e);
+ return false;
+ }
+ finally {
+ versionInfo.unblockUpdates();
+ }
+ return true;
+ }
+
+
+ /** Returns the Future to wait on, or null if no replay was needed */
+ public Future<RecoveryInfo> applyBufferedUpdates() {
+ // recovery trips this assert under some race - even when
+ // it checks the state first
+ // assert state == State.BUFFERING;
+
+ // block all updates to eliminate race conditions
+ // reading state and acting on it in the update processor
+ versionInfo.blockUpdates();
+ try {
+ cancelApplyBufferUpdate = false;
+ if (state != State.BUFFERING) return null;
+
+ // handle case when no log was even created because no updates
+ // were received.
+ if (tlog == null) {
+ state = State.ACTIVE;
+ return null;
+ }
+ tlog.incref();
+ state = State.APPLYING_BUFFERED;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+
+ if (recoveryExecutor.isShutdown()) {
+ tlog.decref();
+ throw new RuntimeException("executor is not running...");
+ }
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+ LogReplayer replayer = new LogReplayer(tlog, true);
+ return cs.submit(replayer, recoveryInfo);
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public String toString() {
+ return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}";
+ }
+
+
+ public static Runnable testing_logReplayHook; // called before each log read
+ public static Runnable testing_logReplayFinishHook; // called when log replay has finished
+
+
+
+ private RecoveryInfo recoveryInfo;
+
+ // TODO: do we let the log replayer run across core reloads?
+ class LogReplayer implements Runnable {
+ TransactionLog translog;
+ TransactionLog.LogReader tlogReader;
+ boolean activeLog;
+ boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
+
+
+ public LogReplayer(TransactionLog translog, boolean activeLog) {
+ this.translog = translog;
+ this.activeLog = activeLog;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ uhandler.core.log.warn("Starting log replay " + translog + " active="+activeLog + "starting pos=" + recoveryInfo.positionOfStart);
+
+ tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+ SolrQueryResponse rsp = new SolrQueryResponse();
+
+ // NOTE: we don't currently handle a core reload during recovery. This would cause the core
+ // to change underneath us.
+
+ // TODO: use the standard request factory? We won't get any custom configuration instantiating this way.
+ RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
+ DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
+ runFac.init(new NamedList());
+ magicFac.init(new NamedList());
+
+ UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
+
+ long commitVersion = 0;
+
+ for(;;) {
+ Object o = null;
+ if (cancelApplyBufferUpdate) break;
+ try {
+ if (testing_logReplayHook != null) testing_logReplayHook.run();
+ o = null;
+ o = tlogReader.next();
+ if (o == null && activeLog) {
+ if (!finishing) {
+ // block to prevent new adds, but don't immediately unlock since
+ // we could be starved from ever completing recovery. Only unlock
+ // after we've finished this recovery.
+ // NOTE: our own updates won't be blocked since the thread holding a write lock can
+ // lock a read lock.
+ versionInfo.blockUpdates();
+ finishing = true;
+ o = tlogReader.next();
+ } else {
+ // we had previously blocked updates, so this "null" from the log is final.
+
+ // Wait until our final commit to change the state and unlock.
+ // This is only so no new updates are written to the current log file, and is
+ // only an issue if we crash before the commit (and we are paying attention
+ // to incomplete log files).
+ //
+ // versionInfo.unblockUpdates();
+ }
+ }
+ } catch (InterruptedException e) {
+ SolrException.log(log,e);
+ } catch (IOException e) {
+ SolrException.log(log,e);
+ } catch (Throwable e) {
+ SolrException.log(log,e);
+ }
+
+ if (o == null) break;
+
+ try {
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ {
+ recoveryInfo.adds++;
+ // byte[] idBytes = (byte[]) entry.get(2);
+ SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processAdd(cmd);
+ break;
+ }
+ case UpdateLog.DELETE:
+ {
+ recoveryInfo.deletes++;
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY:
+ {
+ recoveryInfo.deleteByQuery++;
+ String query = (String)entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ case UpdateLog.COMMIT:
+ {
+ commitVersion = version;
+ break;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+
+ if (rsp.getException() != null) {
+ log.error("Exception replaying log", rsp.getException());
+ throw rsp.getException();
+ }
+ } catch (IOException ex) {
+ recoveryInfo.errors++;
+ log.warn("IOException reading log", ex);
+ // could be caused by an incomplete flush if recovering from log
+ } catch (ClassCastException cl) {
+ recoveryInfo.errors++;
+ log.warn("Unexpected log entry or corrupt log. Entry=" + o, cl);
+ // would be caused by a corrupt transaction log
+ } catch (Throwable ex) {
+ recoveryInfo.errors++;
+ log.warn("Exception replaying log", ex);
+ // something wrong with the request?
+ }
+ }
+
+ CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+ cmd.setVersion(commitVersion);
+ cmd.softCommit = false;
+ cmd.waitSearcher = true;
+ cmd.setFlags(UpdateCommand.REPLAY);
+ try {
+ uhandler.commit(cmd); // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart.
+ } catch (IOException ex) {
+ recoveryInfo.errors++;
+ log.error("Replay exception: final commit.", ex);
+ }
+
+ if (!activeLog) {
+ // if we are replaying an old tlog file, we need to add a commit to the end
+ // so we don't replay it again if we restart right after.
+ translog.writeCommit(cmd);
+ }
+
+ try {
+ proc.finish();
+ } catch (IOException ex) {
+ recoveryInfo.errors++;
+ log.error("Replay exception: finish()", ex);
+ }
+
+ tlogReader.close();
+ translog.decref();
+
+ } catch (Throwable e) {
+ recoveryInfo.errors++;
+ SolrException.log(log,e);
+ } finally {
+ // change the state while updates are still blocked to prevent races
+ state = State.ACTIVE;
+ if (finishing) {
+ versionInfo.unblockUpdates();
+ }
+ }
+
+ log.warn("Ending log replay " + tlogReader);
+
+ if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
+ }
+ }
+
+ public void cancelApplyBufferedUpdates() {
+ this.cancelApplyBufferUpdate = true;
+ }
+
+ ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0,
+ Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("recoveryExecutor"));
+
}
+
+
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+// TODO: make inner?
+// TODO: store the highest possible in the index on a commit (but how to not block adds?)
+// TODO: could also store highest possible in the transaction log after a commit.
+// Or on a new index, just scan "version" for the max?
+/** @lucene.internal */
+public class VersionBucket {
+ public long highest;
+
+ public void updateHighest(long val) {
+ if (highest != 0) {
+ highest = Math.max(highest, Math.abs(val));
+ }
+ }
+}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
+
+public class VersionInfo {
+ public static final String VERSION_FIELD="_version_";
+
+ private SolrCore core;
+ private UpdateHandler updateHandler;
+ private final VersionBucket[] buckets;
+ private SchemaField versionField;
+ private SchemaField idField;
+ final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
+ this.updateHandler = updateHandler;
+ this.core = updateHandler.core;
+ versionField = core.getSchema().getFieldOrNull(VERSION_FIELD);
+ idField = core.getSchema().getUniqueKeyField();
+ buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
+ for (int i=0; i<buckets.length; i++) {
+ buckets[i] = new VersionBucket();
+ }
+ }
+
+ public SchemaField getVersionField() {
+ return versionField;
+ }
+
+ public void lockForUpdate() {
+ lock.readLock().lock();
+ }
+
+ public void unlockForUpdate() {
+ lock.readLock().unlock();
+ }
+
+ public void blockUpdates() {
+ lock.writeLock().lock();
+ }
+
+ public void unblockUpdates() {
+ lock.writeLock().unlock();
+ }
+
+ /***
+ // todo: initialize... use current time to start?
+ // a clock that increments by 1 for every operation makes it easier to detect missing
+ // messages, but raises other issues:
+ // - need to initialize to largest thing in index or tlog
+ // - when becoming leader, need to make sure it's greater than
+ // - using to detect missing messages means we need to keep track per-leader, or make
+ // sure a new leader starts off with 1 greater than the last leader.
+ private final AtomicLong clock = new AtomicLong();
+
+ public long getNewClock() {
+ return clock.incrementAndGet();
+ }
+
+ // Named *old* to prevent accidental calling getClock and expecting a new updated clock.
+ public long getOldClock() {
+ return clock.get();
+ }
+ ***/
+
+ /** We are currently using this time-based clock to avoid going back in time on a
+ * server restart (i.e. we don't want version numbers to start at 1 again).
+ */
+
+ // Time-based lamport clock. Good for introducing some reality into clocks (to the degree
+ // that times are somewhat synchronized in the cluster).
+ // Good if we want to relax some constraints to scale down to where only one node may be
+ // up at a time. Possibly harder to detect missing messages (because versions are not contiguous.
+ long vclock;
+ long time;
+ private final Object clockSync = new Object();
+
+
+ public long getNewClock() {
+ synchronized (clockSync) {
+ time = System.currentTimeMillis();
+ long result = time << 20;
+ if (result <= vclock) {
+ result = vclock + 1;
+ }
+ vclock = result;
+ return vclock;
+ }
+ }
+
+ public long getOldClock() {
+ synchronized (clockSync) {
+ return vclock;
+ }
+ }
+
+ public void updateClock(long clock) {
+ synchronized (clockSync) {
+ vclock = Math.max(vclock, clock);
+ }
+ }
+
+
+ public VersionBucket bucket(int hash) {
+ // If this is a user provided hash, it may be poor in the right-hand bits.
+ // Make sure high bits are moved down, since only the low bits will matter.
+ // int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
+ // Assume good hash codes for now.
+
+ int slot = hash & (buckets.length-1);
+ return buckets[slot];
+ }
+
+ public Long lookupVersion(BytesRef idBytes) {
+ return updateHandler.ulog.lookupVersion(idBytes);
+ }
+
+ public Long getVersionFromIndex(BytesRef idBytes) {
+ // TODO: we could cache much of this and invalidate during a commit.
+ // TODO: most DocValues classes are threadsafe - expose which.
+
+ RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
+ try {
+ SolrIndexSearcher searcher = newestSearcher.get();
+ long lookup = searcher.lookupId(idBytes);
+ if (lookup < 0) return null;
+
+ ValueSource vs = versionField.getType().getValueSource(versionField, null);
+ Map context = ValueSource.newContext(searcher);
+ vs.createWeight(context, searcher);
+ FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves()[(int)(lookup>>32)]);
+ long ver = fv.longVal((int)lookup);
+ return ver;
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
+ } finally {
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+ }
+
+}