You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC

svn commit: r1235888 [5/12] - in /lucene/dev/trunk: dev-tools/eclipse/ dev-tools/maven/ solr/ solr/cloud-dev/ solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.rmi.registry.LocateRegistry;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *  Log Format: List{Operation, Version, ...}
+ *  ADD, VERSION, DOC
+ *  DELETE, VERSION, ID_BYTES
+ *  DELETE_BY_QUERY, VERSION, String
+ *
+ *  TODO: keep two files, one for [operation, version, id] and the other for the actual
+ *  document data.  That way we could throw away document log files more readily
+ *  while retaining the smaller operation log files longer (and we can retrieve
+ *  the stored fields from the latest documents from the index).
+ *
+ *  This would require keeping all source fields stored of course.
+ *
+ *  This would also allow to not log document data for requests with commit=true
+ *  in them (since we know that if the request succeeds, all docs will be committed)
+ *
+ */
+public class TransactionLog {
+  public static Logger log = LoggerFactory.getLogger(TransactionLog.class);
+
+  public final static String END_MESSAGE="SOLR_TLOG_END";
+
+  long id;
+  File tlogFile;
+  RandomAccessFile raf;
+  FileChannel channel;
+  OutputStream os;
+  FastOutputStream fos;    // all accesses to this stream should be synchronized on "this" (The TransactionLog)
+  int numRecords;
+  
+  volatile boolean deleteOnClose = true;  // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
+
+  AtomicInteger refcount = new AtomicInteger(1);
+  Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
+  List<String> globalStringList = new ArrayList<String>();
+  final boolean debug = log.isDebugEnabled();
+
+  long snapshot_size;
+  int snapshot_numRecords;
+  
+  // write a BytesRef as a byte array
+  JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
+    @Override
+    public Object resolve(Object o, JavaBinCodec codec) throws IOException {
+      if (o instanceof BytesRef) {
+        BytesRef br = (BytesRef)o;
+        codec.writeByteArray(br.bytes, br.offset, br.length);
+        return null;
+      }
+      return o;
+    }
+  };
+
+  public class LogCodec extends JavaBinCodec {
+    public LogCodec() {
+      super(resolver);
+    }
+
+    @Override
+    public void writeExternString(String s) throws IOException {
+      if (s == null) {
+        writeTag(NULL);
+        return;
+      }
+
+      // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
+      Integer idx = globalStringMap.get(s);
+      if (idx == null) {
+        // write a normal string
+        writeStr(s);
+      } else {
+        // write the extern string
+        writeTag(EXTERN_STRING, idx);
+      }
+    }
+
+    @Override
+    public String readExternString(FastInputStream fis) throws IOException {
+      int idx = readSize(fis);
+      if (idx != 0) {// idx != 0 is the index of the extern string
+      // no need to synchronize globalStringList - it's only updated before the first record is written to the log
+        return globalStringList.get(idx - 1);
+      } else {// idx == 0 means it has a string value
+        // this shouldn't happen with this codec subclass.
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
+      }
+    }
+
+
+  }
+
+
+  TransactionLog(File tlogFile, Collection<String> globalStrings) throws IOException {
+    this(tlogFile, globalStrings, false);
+  }
+
+  TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) throws IOException {
+    try {
+      if (debug) {
+        log.debug("New TransactionLog file=" + tlogFile + ", exists=" + tlogFile.exists() + ", size=" + tlogFile.length() + ", openExisting=" + openExisting);
+      }
+
+      this.tlogFile = tlogFile;
+      raf = new RandomAccessFile(this.tlogFile, "rw");
+      long start = raf.length();
+      channel = raf.getChannel();
+      os = Channels.newOutputStream(channel);
+      fos = FastOutputStream.wrap(os);
+
+      if (openExisting) {
+        if (start > 0) {
+          readHeader(null);
+          raf.seek(start);
+          assert channel.position() == start;
+          fos.setWritten(start);    // reflect that we aren't starting at the beginning
+          assert fos.size() == channel.size();
+        } else {
+          addGlobalStrings(globalStrings);
+        }
+      } else {
+        assert start==0;
+        if (start > 0) {
+          raf.setLength(0);
+        }
+        addGlobalStrings(globalStrings);
+      }
+
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  /** Returns the number of records in the log (currently includes the header and an optional commit).
+   * Note: currently returns 0 for reopened existing log files.
+   */
+  public int numRecords() {
+    synchronized (this) {
+      return this.numRecords;
+    }
+  }
+
+  public boolean endsWithCommit() throws IOException {
+    long size;
+    synchronized (this) {
+      fos.flush();
+      size = fos.size();
+    }
+
+    
+    // the end of the file should have the end message (added during a commit) plus a 4 byte size
+    byte[] buf = new byte[ END_MESSAGE.length() ];
+    long pos = size - END_MESSAGE.length() - 4;
+    if (pos < 0) return false;
+    ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
+    is.read(buf);
+    for (int i=0; i<buf.length; i++) {
+      if (buf[i] != END_MESSAGE.charAt(i)) return false;
+    }
+    return true;
+  }
+
+  /** takes a snapshot of the current position and number of records
+   * for later possible rollback, and returns the position */
+  public long snapshot() {
+    synchronized (this) {
+      snapshot_size = fos.size();
+      snapshot_numRecords = numRecords;
+      return snapshot_size;
+    }    
+  }
+  
+  // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
+  // This should only be used to roll back buffered updates, not actually applied updates.
+  public void rollback(long pos) throws IOException {
+    synchronized (this) {
+      assert snapshot_size == pos;
+      fos.flush();
+      raf.setLength(pos);
+      fos.setWritten(pos);
+      assert fos.size() == pos;
+      numRecords = snapshot_numRecords;
+    }
+  }
+
+
+  public long writeData(Object o) {
+    LogCodec codec = new LogCodec();
+    try {
+      long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+      codec.init(fos);
+      codec.writeVal(o);
+      return pos;
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+
+  private void readHeader(FastInputStream fis) throws IOException {
+    // read existing header
+    fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
+    LogCodec codec = new LogCodec();
+    Map header = (Map)codec.unmarshal(fis);
+
+    fis.readInt(); // skip size
+
+    // needed to read other records
+
+    synchronized (this) {
+      globalStringList = (List<String>)header.get("strings");
+      globalStringMap = new HashMap<String, Integer>(globalStringList.size());
+      for (int i=0; i<globalStringList.size(); i++) {
+        globalStringMap.put( globalStringList.get(i), i+1);
+      }
+    }
+  }
+
+  private void addGlobalStrings(Collection<String> strings) {
+    if (strings == null) return;
+    int origSize = globalStringMap.size();
+    for (String s : strings) {
+      Integer idx = null;
+      if (origSize > 0) {
+        idx = globalStringMap.get(s);
+      }
+      if (idx != null) continue;  // already in list
+      globalStringList.add(s);
+      globalStringMap.put(s, globalStringList.size());
+    }
+    assert globalStringMap.size() == globalStringList.size();
+  }
+
+  Collection<String> getGlobalStrings() {
+    synchronized (this) {
+      return new ArrayList<String>(globalStringList);
+    }
+  }
+
+  private void writeLogHeader(LogCodec codec) throws IOException {
+    long pos = fos.size();
+    assert pos == 0;
+
+    Map header = new LinkedHashMap<String,Object>();
+    header.put("SOLR_TLOG",1); // a magic string + version number
+    header.put("strings",globalStringList);
+    codec.marshal(header, fos);
+
+    endRecord(pos);
+  }
+
+  private void endRecord(long startRecordPosition) throws IOException {
+    fos.writeInt((int)(fos.size() - startRecordPosition));
+    numRecords++;
+  }
+
+
+  public long write(AddUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    long pos = 0;
+    synchronized (this) {
+      try {
+        pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        SolrInputDocument sdoc = cmd.getSolrInputDocument();
+
+        if (pos == 0) { // TODO: needs to be changed if we start writing a header first
+          addGlobalStrings(sdoc.getFieldNames());
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+
+        /***
+        System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+         if (pos != fos.size()) {
+          throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+        }
+         ***/
+
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.ADD);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+
+        endRecord(pos);
+        // fos.flushBuffer();  // flush later
+        return pos;
+      } catch (IOException e) {
+        // TODO: reset our file pointer back to "pos", the start of this record.
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
+      }
+    }
+  }
+
+  public long writeDelete(DeleteUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (this) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.DELETE);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        BytesRef br = cmd.getIndexedId();
+        codec.writeByteArray(br.bytes, br.offset, br.length);
+
+        endRecord(pos);
+        // fos.flushBuffer();  // flush later
+
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (this) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeStr(cmd.query);
+
+        endRecord(pos);
+        // fos.flushBuffer();  // flush later
+
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+
+  public long writeCommit(CommitUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (this) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
+
+        endRecord(pos);
+        
+        fos.flush();  // flush since this will be the last record in a log fill
+        assert fos.size() == channel.size();
+
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+
+  /* This method is thread safe */
+  public Object lookup(long pos) {
+    // A negative position can result from a log replay (which does not re-log, but does
+    // update the version map.  This is OK since the node won't be ACTIVE when this happens.
+    if (pos < 0) return null;
+
+    try {
+      // make sure any unflushed buffer has been flushed
+      synchronized (this) {
+        // TODO: optimize this by keeping track of what we have flushed up to
+        fos.flushBuffer();
+        /***
+         System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+        if (fos.size() != raf.length() || pos >= fos.size() ) {
+          throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+        }
+        ***/
+      }
+
+      ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
+      LogCodec codec = new LogCodec();
+      return codec.readVal(fis);
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  public void incref() {
+    int result = refcount.incrementAndGet();
+    if (result <= 1) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "incref on a closed log: " + this);
+    }
+  }
+
+  public boolean try_incref() {
+    return refcount.incrementAndGet() > 1;
+  }
+
+  public void decref() {
+    if (refcount.decrementAndGet() == 0) {
+      close();
+    }
+  }
+
+  /** returns the current position in the log file */
+  public long position() {
+    synchronized (this) {
+      return fos.size();
+    }
+  }
+
+  public void finish(UpdateLog.SyncLevel syncLevel) {
+    if (syncLevel == UpdateLog.SyncLevel.NONE) return;
+    try {
+      synchronized (this) {
+        fos.flushBuffer();
+      }
+
+      if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
+        // Since fsync is outside of synchronized block, we can end up with a partial
+        // last record on power failure (which is OK, and does not represent an error...
+        // we just need to be aware of it when reading).
+        raf.getFD().sync();
+      }
+
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private void close() {
+    try {
+      if (debug) {
+        log.debug("Closing tlog" + this);
+      }
+
+      synchronized (this) {
+        fos.flush();
+        fos.close();
+      }
+
+      if (deleteOnClose) {
+        tlogFile.delete();
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+  
+  public void forceClose() {
+    if (refcount.get() > 0) {
+      log.error("Error: Forcing close of " + this);
+      refcount.set(0);
+      close();
+    }
+  }
+
+  public String toString() {
+    return "tlog{file=" + tlogFile.toString() + " refcount=" + refcount.get() + "}";
+  }
+
+  /** Returns a reader that can be used while a log is still in use.
+   * Currently only *one* LogReader may be outstanding, and that log may only
+   * be used from a single thread. */
+  public LogReader getReader(long startingPos) {
+    return new LogReader(startingPos);
+  }
+
+  /** Returns a single threaded reverse reader */
+  public ReverseReader getReverseReader() throws IOException {
+    return new ReverseReader();
+  }
+
+
+  public class LogReader {
+    ChannelFastInputStream fis;
+    private LogCodec codec = new LogCodec();
+
+    public LogReader(long startingPos) {
+      incref();
+      fis = new ChannelFastInputStream(channel, startingPos);
+    }
+
+    /** Returns the next object from the log, or null if none available.
+     *
+     * @return The log record, or null if EOF
+     * @throws IOException
+     */
+    public Object next() throws IOException, InterruptedException {
+      long pos = fis.position();
+
+
+      synchronized (TransactionLog.this) {
+        if (debug) {
+          log.debug("Reading log record.  pos="+pos+" currentSize="+fos.size());
+        }
+
+        if (pos >= fos.size()) {
+          return null;
+        }
+
+        fos.flushBuffer();
+      }
+
+      if (pos == 0) {
+        readHeader(fis);
+
+        // shouldn't currently happen - header and first record are currently written at the same time
+        synchronized (TransactionLog.this) {
+          if (fis.position() >= fos.size()) {
+            return null;
+          }
+          pos = fis.position();
+        }
+      }
+
+      Object o = codec.readVal(fis);
+
+      // skip over record size
+      int size = fis.readInt();
+      assert size == fis.position() - pos - 4;
+
+      return o;
+    }
+
+    public void close() {
+      decref();
+    }
+
+    @Override
+    public String toString() {
+      synchronized (TransactionLog.this) {
+        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+      }
+    }
+
+  }
+
+  public class ReverseReader {
+    ChannelFastInputStream fis;
+    private LogCodec codec = new LogCodec() {
+      @Override
+      public SolrInputDocument readSolrInputDocument(FastInputStream dis) throws IOException {
+        // Given that the SolrInputDocument is last in an add record, it's OK to just skip
+        // reading it completely.
+        return null;
+      }
+    };
+
+    int nextLength;  // length of the next record (the next one closer to the start of the log file)
+    long prevPos;    // where we started reading from last time (so prevPos - nextLength == start of next record)
+
+    public ReverseReader() throws IOException {
+      incref();
+
+      long sz;
+      synchronized (TransactionLog.this) {
+        fos.flushBuffer();
+        sz = fos.size();
+        assert sz == channel.size();
+      }
+
+      fis = new ChannelFastInputStream(channel, 0);
+      if (sz >=4) {
+        // readHeader(fis);  // should not be needed
+        prevPos = sz - 4;
+        fis.seek(prevPos);
+        nextLength = fis.readInt();
+      }
+    }
+
+
+    /** Returns the next object from the log, or null if none available.
+     *
+     * @return The log record, or null if EOF
+     * @throws IOException
+     */
+    public Object next() throws IOException {
+      if (prevPos <= 0) return null;
+
+      long endOfThisRecord = prevPos;
+
+      int thisLength = nextLength;
+
+      long recordStart = prevPos - thisLength;  // back up to the beginning of the next record
+      prevPos = recordStart - 4;  // back up 4 more to read the length of the next record
+
+      if (prevPos <= 0) return null;  // this record is the header
+
+      long bufferPos = fis.getBufferPos();
+      if (prevPos >= bufferPos) {
+        // nothing to do... we're within the current buffer
+      } else {
+        // Position buffer so that this record is at the end.
+        // For small records, this will cause subsequent calls to next() to be within the buffer.
+        long seekPos =  endOfThisRecord - fis.getBufferSize();
+        seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
+        seekPos = Math.max(seekPos, 0);
+        fis.seek(seekPos);
+        fis.peek();  // cause buffer to be filled
+      }
+
+      fis.seek(prevPos);
+      nextLength = fis.readInt();     // this is the length of the *next* record (i.e. closer to the beginning)
+
+      // TODO: optionally skip document data
+      Object o = codec.readVal(fis);
+
+      // assert fis.position() == prevPos + 4 + thisLength;  // this is only true if we read all the data (and we currently skip reading SolrInputDocument
+
+      return o;
+    }
+
+    /* returns the position in the log file of the last record returned by next() */
+    public long position() {
+      return prevPos + 4;  // skip the length
+    }
+
+    public void close() {
+      decref();
+    }
+
+    @Override
+    public String toString() {
+      synchronized (TransactionLog.this) {
+        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+      }
+    }
+
+
+  }
+
+}
+
+
+
+class ChannelFastInputStream extends FastInputStream {
+  private FileChannel ch;
+
+  public ChannelFastInputStream(FileChannel ch, long chPosition) {
+    // super(null, new byte[10],0,0);    // a small buffer size for testing purposes
+    super(null);
+    this.ch = ch;
+    super.readFromStream = chPosition;
+  }
+
+  @Override
+  public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
+    ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
+    int ret = ch.read(bb, readFromStream);
+    return ret;
+  }
+
+  public void seek(long position) throws IOException {
+    if (position <= readFromStream && position >= getBufferPos()) {
+      // seek within buffer
+      pos = (int)(position - getBufferPos());
+    } else {
+      // long currSize = ch.size();   // not needed - underlying read should handle (unless read never done)
+      // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
+      readFromStream = position;
+      end = pos = 0;
+    }
+    assert position() == position;
+  }
+
+  /** where is the start of the buffer relative to the whole file */
+  public long getBufferPos() {
+    return readFromStream - end;
+  }
+
+  public int getBufferSize() {
+    return buf.length;
+  }
+
+  @Override
+  public void close() throws IOException {
+    ch.close();
+  }
+  
+  @Override
+  public String toString() {
+    return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
+  }
+}
+

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Wed Jan 25 19:49:26 2012
@@ -17,6 +17,7 @@
 
 package org.apache.solr.update;
 
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.request.SolrQueryRequest;
 
 
@@ -24,17 +25,56 @@ import org.apache.solr.request.SolrQuery
  *
  *
  */
-  public class UpdateCommand {
-    protected final SolrQueryRequest req;
-    protected final String commandName;
-
-    public UpdateCommand(String commandName, SolrQueryRequest req) {
-      this.req = req;
-      this.commandName = commandName;
-    }
+public abstract class UpdateCommand implements Cloneable {
+  protected SolrQueryRequest req;
+  protected long version;
+  protected int flags;
+
+  public static int BUFFERING = 0x00000001;    // update command is being buffered.
+  public static int REPLAY    = 0x00000002;    // update command is from replaying a log.
+  public static int PEER_SYNC    = 0x00000004; // update command is a missing update being provided by a peer.
+  public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
+
+  public UpdateCommand(SolrQueryRequest req) {
+    this.req = req;
+  }
+
+  public abstract String name();
+
+  @Override
+  public String toString() {
+    return name() + "{flags="+flags+",version="+version;
+  }
+
+  public long getVersion() {
+    return version;
+  }
+  public void setVersion(long version) {
+    this.version = version;
+  }
+
+  public void setFlags(int flags) {
+    this.flags = flags;
+  }
+
+  public int getFlags() {
+    return flags;
+  }
+
+  public SolrQueryRequest getReq() {
+    return req;
+  }
+
+  public void setReq(SolrQueryRequest req) {
+    this.req = req;
+  }
 
-    @Override
-    public String toString() {
-      return commandName;
+  @Override
+  public UpdateCommand clone() {
+    try {
+      return (UpdateCommand) super.clone();
+    } catch (CloneNotSupportedException e) {
+      return null;
     }
   }
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Wed Jan 25 19:49:26 2012
@@ -88,12 +88,11 @@ public abstract class UpdateHandler impl
   private void initLog() {
     PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
     if (ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
-      ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
-    } else {
-      ulog = new NullUpdateLog();
-      ulog.init(null);
+      ulog = new UpdateLog();
+      ulog.init(ulogPluginInfo);
+      // ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
+      ulog.init(this, core);
     }
-    ulog.init(this, core);
   }
 
 
@@ -123,16 +122,7 @@ public abstract class UpdateHandler impl
     parseEventListeners();
     initLog();
   }
-  
-  /**
-   * Allows the UpdateHandler to create the SolrIndexSearcher after it
-   * has issued a 'softCommit'. 
-   * 
-   * @param previousSearcher
-   * @throws IOException
-   */
-  public abstract SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException;
-  
+
   /**
    * Called when the Writer should be opened again - eg when replication replaces
    * all of the index files.
@@ -141,7 +131,7 @@ public abstract class UpdateHandler impl
    */
   public abstract void newIndexWriter() throws IOException;
 
-  public abstract SolrCoreState getIndexWriterProvider();
+  public abstract SolrCoreState getSolrCoreState();
 
   public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
   public abstract void delete(DeleteUpdateCommand cmd) throws IOException;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Wed Jan 25 19:49:26 2012
@@ -18,23 +18,1051 @@
 package org.apache.solr.update;
 
 import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
+import org.apache.solr.update.processor.RunUpdateProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
 
 /** @lucene.experimental */
-public abstract class UpdateLog implements PluginInfoInitialized {
-  public static final int ADD = 0x00;
-  public static final int DELETE = 0x01;
-  public static final int DELETE_BY_QUERY = 0x02;
-
-  public abstract void init(UpdateHandler uhandler, SolrCore core);
-  public abstract void add(AddUpdateCommand cmd);
-  public abstract void delete(DeleteUpdateCommand cmd);
-  public abstract void deleteByQuery(DeleteUpdateCommand cmd);
-  public abstract void preCommit(CommitUpdateCommand cmd);
-  public abstract void postCommit(CommitUpdateCommand cmd);
-  public abstract void preSoftCommit(CommitUpdateCommand cmd);
-  public abstract void postSoftCommit(CommitUpdateCommand cmd);
-  public abstract Object lookup(BytesRef indexedId);
-  public abstract void close();
+public class UpdateLog implements PluginInfoInitialized {
+  public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
+  public boolean debug = log.isDebugEnabled();
+
+
+  public enum SyncLevel { NONE, FLUSH, FSYNC }
+  public enum State { REPLAYING, BUFFERING, APPLYING_BUFFERED, ACTIVE }
+
+  public static final int ADD = 0x01;
+  public static final int DELETE = 0x02;
+  public static final int DELETE_BY_QUERY = 0x03;
+  public static final int COMMIT = 0x04;
+
+  public static class RecoveryInfo {
+    public long positionOfStart;
+
+    public int adds;
+    public int deletes;
+    public int deleteByQuery;
+    public int errors;
+  }
+
+
+
+  public static String TLOG_NAME="tlog";
+
+  long id = -1;
+  private State state = State.ACTIVE;
+
+  private TransactionLog tlog;
+  private TransactionLog prevTlog;
+  private Deque<TransactionLog> logs = new LinkedList<TransactionLog>();  // list of recent logs, newest first
+  private TransactionLog newestLogOnStartup;
+  private int numOldRecords;  // number of records in the recent logs
+
+  private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>();
+  private Map<BytesRef,LogPtr> prevMap;  // used while committing/reopening is happening
+  private Map<BytesRef,LogPtr> prevMap2;  // used while committing/reopening is happening
+  private TransactionLog prevMapLog;  // the transaction log used to look up entries found in prevMap
+  private TransactionLog prevMapLog2;  // the transaction log used to look up entries found in prevMap
+
+  private final int numDeletesToKeep = 1000;
+  private final int numRecordsToKeep = 100;
+  // keep track of deletes only... this is not updated on an add
+  private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
+    protected boolean removeEldestEntry(Map.Entry eldest) {
+      return size() > numDeletesToKeep;
+    }
+  };
+
+  private String[] tlogFiles;
+  private File tlogDir;
+  private Collection<String> globalStrings;
+
+  private String dataDir;
+  private String lastDataDir;
+
+  private VersionInfo versionInfo;
+
+  private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
+
+  private volatile UpdateHandler uhandler;    // a core reload can change this reference!
+  private volatile boolean cancelApplyBufferUpdate;
+
+
+  public static class LogPtr {
+    final long pointer;
+    final long version;
+
+    public LogPtr(long pointer, long version) {
+      this.pointer = pointer;
+      this.version = version;
+    }
+
+    public String toString() {
+      return "LogPtr(" + pointer + ")";
+    }
+  }
+
+
+  public VersionInfo getVersionInfo() {
+    return versionInfo;
+  }
+
+  public void init(PluginInfo info) {
+    dataDir = (String)info.initArgs.get("dir");
+  }
+
+  public void init(UpdateHandler uhandler, SolrCore core) {
+    if (dataDir == null || dataDir.length()==0) {
+      dataDir = core.getDataDir();
+    }
+
+    this.uhandler = uhandler;
+
+    if (dataDir.equals(lastDataDir)) {
+      // on a normal reopen, we currently shouldn't have to do anything
+      return;
+    }
+    lastDataDir = dataDir;
+    tlogDir = new File(dataDir, TLOG_NAME);
+    tlogDir.mkdirs();
+    tlogFiles = getLogList(tlogDir);
+    id = getLastLogId() + 1;   // add 1 since we will create a new log for the next update
+
+    TransactionLog oldLog = null;
+    for (String oldLogName : tlogFiles) {
+      File f = new File(tlogDir, oldLogName);
+      try {
+        oldLog = new TransactionLog( f, null, true );
+        addOldLog(oldLog);
+      } catch (Exception e) {
+        SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
+        f.delete();
+      }
+    }
+    newestLogOnStartup = oldLog;
+
+    versionInfo = new VersionInfo(uhandler, 256);
+  }
+  
+  public File getLogDir() {
+    return tlogDir;
+  }
+
+  /* Takes over ownership of the log, keeping it until no longer needed
+     and then decrementing it's reference and dropping it.
+   */
+  private void addOldLog(TransactionLog oldLog) {
+    if (oldLog == null) return;
+
+    numOldRecords += oldLog.numRecords();
+
+    int currRecords = numOldRecords;
+
+    if (oldLog != tlog &&  tlog != null) {
+      currRecords += tlog.numRecords();
+    }
+
+    while (logs.size() > 0) {
+      TransactionLog log = logs.peekLast();
+      int nrec = log.numRecords();
+      // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+      // we already have the limit of 10 log files.
+      if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
+        currRecords -= nrec;
+        numOldRecords -= nrec;
+        logs.removeLast().decref();  // dereference so it will be deleted when no longer in use
+        continue;
+      }
+
+      break;
+    }
+
+    // don't incref... we are taking ownership from the caller.
+    logs.addFirst(oldLog);
+  }
+
+
+  public static String[] getLogList(File directory) {
+    final String prefix = TLOG_NAME+'.';
+    String[] names = directory.list(new FilenameFilter() {
+      public boolean accept(File dir, String name) {
+        return name.startsWith(prefix);
+      }
+    });
+    Arrays.sort(names);
+    return names;
+  }
+
+
+  public long getLastLogId() {
+    if (id != -1) return id;
+    if (tlogFiles.length == 0) return -1;
+    String last = tlogFiles[tlogFiles.length-1];
+    return Long.parseLong(last.substring(TLOG_NAME.length()+1));
+  }
+
+
+  public void add(AddUpdateCommand cmd) {
+    // don't log if we are replaying from another log
+    // TODO: we currently need to log to maintain correct versioning, rtg, etc
+    // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
+
+    synchronized (this) {
+      long pos = -1;
+
+      // don't log if we are replaying from another log
+      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        ensureLog();
+        pos = tlog.write(cmd);
+      }
+
+      // TODO: in the future we could support a real position for a REPLAY update.
+      // Only currently would be useful for RTG while in recovery mode though.
+      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+      // only update our map if we're not buffering
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+        map.put(cmd.getIndexedId(), ptr);
+      }
+
+      if (debug) {
+        log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      }
+    }
+  }
+
+  public void delete(DeleteUpdateCommand cmd) {
+    BytesRef br = cmd.getIndexedId();
+
+    synchronized (this) {
+      long pos = -1;
+
+      // don't log if we are replaying from another log
+      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        ensureLog();
+        pos = tlog.writeDelete(cmd);
+      }
+
+      LogPtr ptr = new LogPtr(pos, cmd.version);
+
+      // only update our map if we're not buffering
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+        map.put(br, ptr);
+
+        oldDeletes.put(br, ptr);
+      }
+
+      if (debug) {
+        log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      }
+    }
+  }
+
+  public void deleteByQuery(DeleteUpdateCommand cmd) {
+    synchronized (this) {
+      long pos = -1;
+      // don't log if we are replaying from another log
+      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        ensureLog();
+        pos = tlog.writeDeleteByQuery(cmd);
+      }
+
+      // only change our caches if we are not buffering
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+        // given that we just did a delete-by-query, we don't know what documents were
+        // affected and hence we must purge our caches.
+        map.clear();
+
+        // oldDeletes.clear();
+
+        // We must cause a new IndexReader to be opened before anything looks at these caches again
+        // so that a cache miss will read fresh data.
+        //
+        // TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands
+        try {
+          RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+          holder.decref();
+        } catch (Throwable e) {
+          SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+        }
+
+      }
+
+      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+      if (debug) {
+        log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      }
+    }
+  }
+
+
+  private void newMap() {
+    prevMap2 = prevMap;
+    prevMapLog2 = prevMapLog;
+
+    prevMap = map;
+    prevMapLog = tlog;
+
+    map = new HashMap<BytesRef, LogPtr>();
+  }
+
+  private void clearOldMaps() {
+    prevMap = null;
+    prevMap2 = null;
+  }
+
+  public void preCommit(CommitUpdateCommand cmd) {
+    synchronized (this) {
+      if (debug) {
+        log.debug("TLOG: preCommit");
+      }
+
+      if (getState() != State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        // if we aren't in the active state, and this isn't a replay
+        // from the recovery process, then we shouldn't mess with
+        // the current transaction log.  This normally shouldn't happen
+        // as DistributedUpdateProcessor will prevent this.  Commits
+        // that don't use the processor are possible though.
+        return;
+      }
+
+      // since we're changing the log, we must change the map.
+      newMap();
+
+      // since document additions can happen concurrently with commit, create
+      // a new transaction log first so that we know the old one is definitely
+      // in the index.
+      prevTlog = tlog;
+      tlog = null;
+      id++;
+
+      if (prevTlog != null) {
+        globalStrings = prevTlog.getGlobalStrings();
+      }
+
+      addOldLog(prevTlog);
+    }
+  }
+
+  public void postCommit(CommitUpdateCommand cmd) {
+    synchronized (this) {
+      if (debug) {
+        log.debug("TLOG: postCommit");
+      }
+      if (prevTlog != null) {
+        // if we made it through the commit, write a commit command to the log
+        // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
+        prevTlog.writeCommit(cmd);
+        // the old log list will decref when no longer needed
+        // prevTlog.decref();
+        prevTlog = null;
+      }
+    }
+  }
+
+  public void preSoftCommit(CommitUpdateCommand cmd) {
+    debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+
+    synchronized (this) {
+
+      if (!cmd.softCommit) return;  // already handled this at the start of the hard commit
+      newMap();
+
+      // start adding documents to a new map since we won't know if
+      // any added documents will make it into this commit or not.
+      // But we do know that any updates already added will definitely
+      // show up in the latest reader after the commit succeeds.
+      map = new HashMap<BytesRef, LogPtr>();
+
+      if (debug) {
+        log.debug("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+      }
+    }
+  }
+
+  public void postSoftCommit(CommitUpdateCommand cmd) {
+    synchronized (this) {
+      // We can clear out all old maps now that a new searcher has been opened.
+      // This currently only works since DUH2 synchronizes around preCommit to avoid
+      // it being called in the middle of a preSoftCommit, postSoftCommit sequence.
+      // If this DUH2 synchronization were to be removed, preSoftCommit should
+      // record what old maps were created and only remove those.
+
+      if (debug) {
+        SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2));
+      }
+      clearOldMaps();
+    }
+  }
+
+  public Object lookup(BytesRef indexedId) {
+    LogPtr entry;
+    TransactionLog lookupLog;
+
+    synchronized (this) {
+      entry = map.get(indexedId);
+      lookupLog = tlog;  // something found in "map" will always be in "tlog"
+      // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      if (entry == null && prevMap != null) {
+        entry = prevMap.get(indexedId);
+        // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+        lookupLog = prevMapLog;
+        // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+      if (entry == null && prevMap2 != null) {
+        entry = prevMap2.get(indexedId);
+        // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+        lookupLog = prevMapLog2;
+        // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+
+      if (entry == null) {
+        return null;
+      }
+      lookupLog.incref();
+    }
+
+    try {
+      // now do the lookup outside of the sync block for concurrency
+      return lookupLog.lookup(entry.pointer);
+    } finally {
+      lookupLog.decref();
+    }
+
+  }
+
+  // This method works like realtime-get... it only guarantees to return the latest
+  // version of the *completed* update.  There can be updates in progress concurrently
+  // that have already grabbed higher version numbers.  Higher level coordination or
+  // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
+  public Long lookupVersion(BytesRef indexedId) {
+    LogPtr entry;
+    TransactionLog lookupLog;
+
+    synchronized (this) {
+      entry = map.get(indexedId);
+      lookupLog = tlog;  // something found in "map" will always be in "tlog"
+      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      if (entry == null && prevMap != null) {
+        entry = prevMap.get(indexedId);
+        // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+        lookupLog = prevMapLog;
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+      if (entry == null && prevMap2 != null) {
+        entry = prevMap2.get(indexedId);
+        // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+        lookupLog = prevMapLog2;
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+      }
+    }
+
+    if (entry != null) {
+      return entry.version;
+    }
+
+    // Now check real index
+    Long version = versionInfo.getVersionFromIndex(indexedId);
+
+    if (version != null) {
+      return version;
+    }
+
+    // We can't get any version info for deletes from the index, so if the doc
+    // wasn't found, check a cache of recent deletes.
+
+    synchronized (this) {
+      entry = oldDeletes.get(indexedId);
+    }
+
+    if (entry != null) {
+      return entry.version;
+    }
+
+    return null;
+  }
+
+  public void finish(SyncLevel syncLevel) {
+    if (syncLevel == null) {
+      syncLevel = defaultSyncLevel;
+    }
+    if (syncLevel == SyncLevel.NONE) {
+      return;
+    }
+
+    TransactionLog currLog;
+    synchronized (this) {
+      currLog = tlog;
+      if (currLog == null) return;
+      currLog.incref();
+    }
+
+    try {
+      currLog.finish(syncLevel);
+    } finally {
+      currLog.decref();
+    }
+  }
+
+  public Future<RecoveryInfo> recoverFromLog() {
+    recoveryInfo = new RecoveryInfo();
+    if (newestLogOnStartup == null) return null;
+
+    if (!newestLogOnStartup.try_incref()) return null;   // log file was already closed
+
+    // now that we've incremented the reference, the log shouldn't go away.
+    try {
+      if (newestLogOnStartup.endsWithCommit()) {
+        newestLogOnStartup.decref();
+        return null;
+      }
+    } catch (IOException e) {
+      log.error("Error inspecting tlog " + newestLogOnStartup);
+      newestLogOnStartup.decref();
+      return null;
+    }
+
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+    LogReplayer replayer = new LogReplayer(newestLogOnStartup, false);
+
+    versionInfo.blockUpdates();
+    try {
+      state = State.REPLAYING;
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+
+    return cs.submit(replayer, recoveryInfo);
+
+  }
+
+
+  private void ensureLog() {
+    if (tlog == null) {
+      String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+      try {
+        tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't open new tlog!", e);
+      }
+    }
+  }
+
+  public void close() {
+    synchronized (this) {
+      try {
+        recoveryExecutor.shutdownNow();
+      } catch (Exception e) {
+        SolrException.log(log, e);
+      }
+
+      // Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions
+
+      if (prevTlog != null) {
+        prevTlog.deleteOnClose = false;
+        prevTlog.decref();
+        prevTlog.forceClose();
+      }
+      if (tlog != null) {
+        tlog.deleteOnClose = false;
+        tlog.decref();
+        tlog.forceClose();
+      }
+
+      for (TransactionLog log : logs) {
+        log.deleteOnClose = false;
+        log.decref();
+        log.forceClose();
+      }
+
+    }
+  }
+
+
+  static class Update {
+    TransactionLog log;
+    long version;
+    long pointer;
+  } 
+  
+  public class RecentUpdates {
+    Deque<TransactionLog> logList;    // newest first
+    List<List<Update>> updateList;
+    HashMap<Long, Update> updates;
+    List<Update> deleteByQueryList;
+
+
+    public List<Long> getVersions(int n) {
+      List<Long> ret = new ArrayList(n);
+      
+      for (List<Update> singleList : updateList) {
+        for (Update ptr : singleList) {
+          ret.add(ptr.version);
+          if (--n <= 0) return ret;
+        }
+      }
+      
+      return ret;
+    }
+    
+    public Object lookup(long version) {
+      Update update = updates.get(version);
+      if (update == null) return null;
+
+      return update.log.lookup(update.pointer);
+    }
+
+    /** Returns the list of deleteByQueries that happened after the given version */
+    public List<Object> getDeleteByQuery(long afterVersion) {
+      List<Object> result = new ArrayList<Object>(deleteByQueryList.size());
+      for (Update update : deleteByQueryList) {
+        if (Math.abs(update.version) > afterVersion) {
+          Object dbq = update.log.lookup(update.pointer);
+          result.add(dbq);
+        }
+      }
+      return result;
+    }
+
+    private void update() {
+      int numUpdates = 0;
+      updateList = new ArrayList<List<Update>>(logList.size());
+      deleteByQueryList = new ArrayList<Update>();
+      updates = new HashMap<Long,Update>(numRecordsToKeep);
+
+      for (TransactionLog oldLog : logList) {
+        List<Update> updatesForLog = new ArrayList<Update>();
+
+        TransactionLog.ReverseReader reader = null;
+        try {
+          reader = oldLog.getReverseReader();
+
+          while (numUpdates < numRecordsToKeep) {
+            Object o = reader.next();
+            if (o==null) break;
+            try {
+
+              // should currently be a List<Oper,Ver,Doc/Id>
+              List entry = (List)o;
+
+              // TODO: refactor this out so we get common error handling
+              int oper = (Integer)entry.get(0);
+              long version = (Long) entry.get(1);
+
+              switch (oper) {
+                case UpdateLog.ADD:
+                case UpdateLog.DELETE:
+                case UpdateLog.DELETE_BY_QUERY:
+                  Update update = new Update();
+                  update.log = oldLog;
+                  update.pointer = reader.position();
+                  update.version = version;
+
+                  updatesForLog.add(update);
+                  updates.put(version, update);
+                  
+                  if (oper == UpdateLog.DELETE_BY_QUERY) {
+                    deleteByQueryList.add(update);
+                  }
+                  
+                  break;
+
+                case UpdateLog.COMMIT:
+                  break;
+                default:
+                  throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+              }
+            } catch (ClassCastException cl) {
+              log.warn("Unexpected log entry or corrupt log.  Entry=" + o, cl);
+              // would be caused by a corrupt transaction log
+            } catch (Exception ex) {
+              log.warn("Exception reverse reading log", ex);
+              break;
+            }
+          }
+
+        } catch (IOException e) {
+          // failure to read a log record isn't fatal
+          log.error("Exception reading versions from log",e);
+        } finally {
+          if (reader != null) reader.close();
+        }
+
+        updateList.add(updatesForLog);
+      }
+
+    }
+    
+    public void close() {
+      for (TransactionLog log : logList) {
+        log.decref();
+      }
+    }
+  }
+
+
+  public RecentUpdates getRecentUpdates() {
+    Deque<TransactionLog> logList;
+    synchronized (this) {
+      logList = new LinkedList<TransactionLog>(logs);
+      for (TransactionLog log : logList) {
+        log.incref();
+      }
+      if (prevTlog != null) {
+        prevTlog.incref();
+        logList.addFirst(prevTlog);
+      }
+      if (tlog != null) {
+        tlog.incref();
+        logList.addFirst(tlog);
+      }
+    }
+
+    // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
+    // one of the updates I originally handed out fell off the list).  Over-request?
+    RecentUpdates recentUpdates = new RecentUpdates();
+    recentUpdates.logList = logList;
+    recentUpdates.update();
+
+    return recentUpdates;
+  }
+
+  public void bufferUpdates() {
+    // recovery trips this assert under some race - even when
+    // it checks the state first
+    // assert state == State.ACTIVE;
+
+    recoveryInfo = new RecoveryInfo();
+
+    // block all updates to eliminate race conditions
+    // reading state and acting on it in the update processor
+    versionInfo.blockUpdates();
+    try {
+      if (state != State.ACTIVE) return;
+
+      if (log.isInfoEnabled()) {
+        log.info("Starting to buffer updates. " + this);
+      }
+
+      // since we blocked updates, this synchronization shouldn't strictly be necessary.
+      synchronized (this) {
+        recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+      }
+
+      state = State.BUFFERING;
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+  }
+
+  /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */
+  public boolean dropBufferedUpdates() {
+    versionInfo.blockUpdates();
+    try {
+      if (state != State.BUFFERING) return false;
+
+      if (log.isInfoEnabled()) {
+        log.info("Dropping buffered updates " + this);
+      }
+
+      // since we blocked updates, this synchronization shouldn't strictly be necessary.
+      synchronized (this) {
+        if (tlog != null) {
+          tlog.rollback(recoveryInfo.positionOfStart);
+        }
+      }
+
+      state = State.ACTIVE;
+    } catch (IOException e) {
+      SolrException.log(log,"Error attempting to roll back log", e);
+      return false;
+    }
+    finally {
+      versionInfo.unblockUpdates();
+    }
+    return true;
+  }
+
+
+  /** Returns the Future to wait on, or null if no replay was needed */
+  public Future<RecoveryInfo> applyBufferedUpdates() {
+    // recovery trips this assert under some race - even when
+    // it checks the state first
+    // assert state == State.BUFFERING;
+
+    // block all updates to eliminate race conditions
+    // reading state and acting on it in the update processor
+    versionInfo.blockUpdates();
+    try {
+      cancelApplyBufferUpdate = false;
+      if (state != State.BUFFERING) return null;
+
+      // handle case when no log was even created because no updates
+      // were received.
+      if (tlog == null) {
+        state = State.ACTIVE;
+        return null;
+      }
+      tlog.incref();
+      state = State.APPLYING_BUFFERED;
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+
+    if (recoveryExecutor.isShutdown()) {
+      tlog.decref();
+      throw new RuntimeException("executor is not running...");
+    }
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+    LogReplayer replayer = new LogReplayer(tlog, true);
+    return cs.submit(replayer, recoveryInfo);
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public String toString() {
+    return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}";
+  }
+
+
+  public static Runnable testing_logReplayHook;  // called before each log read
+  public static Runnable testing_logReplayFinishHook;  // called when log replay has finished
+
+
+
+  private RecoveryInfo recoveryInfo;
+
+  // TODO: do we let the log replayer run across core reloads?
+  class LogReplayer implements Runnable {
+    TransactionLog translog;
+    TransactionLog.LogReader tlogReader;
+    boolean activeLog;
+    boolean finishing = false;  // state where we lock out other updates and finish those updates that snuck in before we locked
+
+
+    public LogReplayer(TransactionLog translog, boolean activeLog) {
+      this.translog = translog;
+      this.activeLog = activeLog;
+    }
+
+    @Override
+    public void run() {
+      try {
+
+        uhandler.core.log.warn("Starting log replay " + translog + " active="+activeLog + "starting pos=" + recoveryInfo.positionOfStart);
+
+        tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
+        SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+        SolrQueryResponse rsp = new SolrQueryResponse();
+
+        // NOTE: we don't currently handle a core reload during recovery.  This would cause the core
+        // to change underneath us.
+
+        // TODO: use the standard request factory?  We won't get any custom configuration instantiating this way.
+        RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
+        DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
+        runFac.init(new NamedList());
+        magicFac.init(new NamedList());
+
+        UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
+
+        long commitVersion = 0;
+
+        for(;;) {
+          Object o = null;
+          if (cancelApplyBufferUpdate) break;
+          try {
+            if (testing_logReplayHook != null) testing_logReplayHook.run();
+            o = null;
+            o = tlogReader.next();
+            if (o == null && activeLog) {
+              if (!finishing) {
+                // block to prevent new adds, but don't immediately unlock since
+                // we could be starved from ever completing recovery.  Only unlock
+                // after we've finished this recovery.
+                // NOTE: our own updates won't be blocked since the thread holding a write lock can
+                // lock a read lock.
+                versionInfo.blockUpdates();
+                finishing = true;
+                o = tlogReader.next();
+              } else {
+                // we had previously blocked updates, so this "null" from the log is final.
+
+                // Wait until our final commit to change the state and unlock.
+                // This is only so no new updates are written to the current log file, and is
+                // only an issue if we crash before the commit (and we are paying attention
+                // to incomplete log files).
+                //
+                // versionInfo.unblockUpdates();
+              }
+            }
+          } catch (InterruptedException e) {
+            SolrException.log(log,e);
+          } catch (IOException e) {
+            SolrException.log(log,e);
+          } catch (Throwable e) {
+            SolrException.log(log,e);
+          }
+
+          if (o == null) break;
+
+          try {
+
+            // should currently be a List<Oper,Ver,Doc/Id>
+            List entry = (List)o;
+
+            int oper = (Integer)entry.get(0);
+            long version = (Long) entry.get(1);
+
+            switch (oper) {
+              case UpdateLog.ADD:
+              {
+                recoveryInfo.adds++;
+                // byte[] idBytes = (byte[]) entry.get(2);
+                SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+                AddUpdateCommand cmd = new AddUpdateCommand(req);
+                // cmd.setIndexedId(new BytesRef(idBytes));
+                cmd.solrDoc = sdoc;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+                proc.processAdd(cmd);
+                break;
+              }
+              case UpdateLog.DELETE:
+              {
+                recoveryInfo.deletes++;
+                byte[] idBytes = (byte[]) entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.setIndexedId(new BytesRef(idBytes));
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+                proc.processDelete(cmd);
+                break;
+              }
+
+              case UpdateLog.DELETE_BY_QUERY:
+              {
+                recoveryInfo.deleteByQuery++;
+                String query = (String)entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.query = query;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+                proc.processDelete(cmd);
+                break;
+              }
+
+              case UpdateLog.COMMIT:
+              {
+                commitVersion = version;
+                break;
+              }
+
+              default:
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+            }
+
+            if (rsp.getException() != null) {
+              log.error("Exception replaying log", rsp.getException());
+              throw rsp.getException();
+            }
+          } catch (IOException ex) {
+            recoveryInfo.errors++;
+            log.warn("IOException reading log", ex);
+            // could be caused by an incomplete flush if recovering from log
+          } catch (ClassCastException cl) {
+            recoveryInfo.errors++;
+            log.warn("Unexpected log entry or corrupt log.  Entry=" + o, cl);
+            // would be caused by a corrupt transaction log
+          } catch (Throwable ex) {
+            recoveryInfo.errors++;
+            log.warn("Exception replaying log", ex);
+            // something wrong with the request?
+          }
+        }
+
+        CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+        cmd.setVersion(commitVersion);
+        cmd.softCommit = false;
+        cmd.waitSearcher = true;
+        cmd.setFlags(UpdateCommand.REPLAY);
+        try {
+          uhandler.commit(cmd);          // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart.
+        } catch (IOException ex) {
+          recoveryInfo.errors++;
+          log.error("Replay exception: final commit.", ex);
+        }
+        
+        if (!activeLog) {
+          // if we are replaying an old tlog file, we need to add a commit to the end
+          // so we don't replay it again if we restart right after.
+          translog.writeCommit(cmd);
+        }
+
+        try {
+          proc.finish();
+        } catch (IOException ex) {
+          recoveryInfo.errors++;
+          log.error("Replay exception: finish()", ex);
+        }
+
+        tlogReader.close();
+        translog.decref();
+
+      } catch (Throwable e) {
+        recoveryInfo.errors++;
+        SolrException.log(log,e);
+      } finally {
+        // change the state while updates are still blocked to prevent races
+        state = State.ACTIVE;
+        if (finishing) {
+          versionInfo.unblockUpdates();
+        }
+      }
+
+      log.warn("Ending log replay " + tlogReader);
+
+      if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
+    }
+  }
+  
+  public void cancelApplyBufferedUpdates() {
+    this.cancelApplyBufferUpdate = true;
+  }
+
+  ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0,
+      Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new DefaultSolrThreadFactory("recoveryExecutor"));
+
 }
+
+
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionBucket.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+// TODO: make inner?
+// TODO: store the highest possible in the index on a commit (but how to not block adds?)
+// TODO: could also store highest possible in the transaction log after a commit.
+// Or on a new index, just scan "version" for the max?
+/** @lucene.internal */
+public class VersionBucket {
+  public long highest;
+
+  public void updateHighest(long val) {
+    if (highest != 0) {
+      highest = Math.max(highest, Math.abs(val));
+    }
+  }
+}

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/VersionInfo.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
+
+public class VersionInfo {
+  public static final String VERSION_FIELD="_version_";
+
+  private SolrCore core;
+  private UpdateHandler updateHandler;
+  private final VersionBucket[] buckets;
+  private SchemaField versionField;
+  private SchemaField idField;
+  final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+  public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
+    this.updateHandler = updateHandler;
+    this.core = updateHandler.core;
+    versionField = core.getSchema().getFieldOrNull(VERSION_FIELD);
+    idField = core.getSchema().getUniqueKeyField();
+    buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
+    for (int i=0; i<buckets.length; i++) {
+      buckets[i] = new VersionBucket();
+    }
+  }
+
+  public SchemaField getVersionField() {
+    return versionField;
+  }
+
+  public void lockForUpdate() {
+    lock.readLock().lock();
+  }
+
+  public void unlockForUpdate() {
+    lock.readLock().unlock();
+  }
+
+  public void blockUpdates() {
+    lock.writeLock().lock();
+  }
+
+  public void unblockUpdates() {
+    lock.writeLock().unlock();
+  }
+
+  /***
+  // todo: initialize... use current time to start?
+  // a clock that increments by 1 for every operation makes it easier to detect missing
+  // messages, but raises other issues:
+  // - need to initialize to largest thing in index or tlog
+  // - when becoming leader, need to make sure it's greater than
+  // - using to detect missing messages means we need to keep track per-leader, or make
+  //   sure a new leader starts off with 1 greater than the last leader.
+  private final AtomicLong clock = new AtomicLong();
+
+  public long getNewClock() {
+    return clock.incrementAndGet();
+  }
+
+  // Named *old* to prevent accidental calling getClock and expecting a new updated clock.
+  public long getOldClock() {
+    return clock.get();
+  }
+  ***/
+
+  /** We are currently using this time-based clock to avoid going back in time on a
+   * server restart (i.e. we don't want version numbers to start at 1 again).
+   */
+
+  // Time-based lamport clock.  Good for introducing some reality into clocks (to the degree
+  // that times are somewhat synchronized in the cluster).
+  // Good if we want to relax some constraints to scale down to where only one node may be
+  // up at a time.  Possibly harder to detect missing messages (because versions are not contiguous.
+  long vclock;
+  long time;
+  private final Object clockSync = new Object();
+
+
+  public long getNewClock() {
+    synchronized (clockSync) {
+      time = System.currentTimeMillis();
+      long result = time << 20;
+      if (result <= vclock) {
+        result = vclock + 1;
+      }
+      vclock = result;
+      return vclock;
+    }
+  }
+
+  public long getOldClock() {
+    synchronized (clockSync) {
+      return vclock;
+    }
+  }
+
+  public void updateClock(long clock) {
+    synchronized (clockSync) {
+      vclock = Math.max(vclock, clock);
+    }
+  }
+
+
+  public VersionBucket bucket(int hash) {
+    // If this is a user provided hash, it may be poor in the right-hand bits.
+    // Make sure high bits are moved down, since only the low bits will matter.
+    // int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
+    // Assume good hash codes for now.
+
+    int slot = hash & (buckets.length-1);
+    return buckets[slot];
+  }
+
+  public Long lookupVersion(BytesRef idBytes) {
+    return updateHandler.ulog.lookupVersion(idBytes);
+  }
+
+  public Long getVersionFromIndex(BytesRef idBytes) {
+    // TODO: we could cache much of this and invalidate during a commit.
+    // TODO: most DocValues classes are threadsafe - expose which.
+
+    RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
+    try {
+      SolrIndexSearcher searcher = newestSearcher.get();
+      long lookup = searcher.lookupId(idBytes);
+      if (lookup < 0) return null;
+
+      ValueSource vs = versionField.getType().getValueSource(versionField, null);
+      Map context = ValueSource.newContext(searcher);
+      vs.createWeight(context, searcher);
+      FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves()[(int)(lookup>>32)]);
+      long ver = fv.longVal((int)lookup);
+      return ver;
+
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
+    } finally {
+      if (newestSearcher != null) {
+        newestSearcher.decref();
+      }
+    }
+  }
+
+}