You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/19 18:52:28 UTC

svn commit: r1204028 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/update/ solrj/src/java/org/apache/solr/common/util/

Author: yonik
Date: Sat Nov 19 17:52:27 2011
New Revision: 1204028

URL: http://svn.apache.org/viewvc?rev=1204028&view=rev
Log:
SOLR-2808: reimplement log recovery for concurrent reading

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Sat Nov 19 17:52:27 2011
@@ -55,6 +55,9 @@ public class DeleteUpdateCommand extends
     return indexedId;
   }
 
+  public void setIndexedId(BytesRef indexedId) {
+    this.indexedId = indexedId;
+  }
 
   @Override
   public String toString() {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Sat Nov 19 17:52:27 2011
@@ -446,21 +446,16 @@ public class FSUpdateLog extends UpdateL
   @Override
   public boolean recoverFromLog() {
     if (tlogFiles.length == 0) return false;
-    TransactionLogReader tlogReader = null;
+    TransactionLog oldTlog = null;     // todo - change name
     try {
-      tlogReader = new TransactionLogReader( new File(tlogDir, tlogFiles[tlogFiles.length-1]) );
-      boolean completed = tlogReader.completed();
-      if (completed) {
-        return true;
-      }
-
-      recoveryExecutor.execute(new LogReplayer(tlogReader));
+      oldTlog = new TransactionLog( new File(tlogDir, tlogFiles[tlogFiles.length-1]), null, true );
+      recoveryExecutor.execute(new LogReplayer(oldTlog));
       return true;
 
     } catch (Exception ex) {
       // an error during recovery
-      uhandler.log.warn("Exception during recovery", ex);
-      if (tlogReader != null) tlogReader.close();
+      log.warn("Exception during recovery", ex);
+      if (oldTlog != null) oldTlog.decref();
     }
 
     return false;
@@ -486,22 +481,38 @@ public class FSUpdateLog extends UpdateL
     }
   }
 
+
+
   // TODO: do we let the log replayer run across core reloads?
   class LogReplayer implements Runnable {
-    TransactionLogReader tlogReader;
-    public LogReplayer(TransactionLogReader tlogReader) {
-      this.tlogReader = tlogReader;
+    TransactionLog tlog;
+    TransactionLog.LogReader tlogReader;
+
+
+    public LogReplayer(TransactionLog tlog) {
+      this.tlog = tlog;
     }
 
     @Override
     public void run() {
       uhandler.core.log.warn("Starting log replay " + tlogReader);
 
+      tlogReader = tlog.getReader();
+
       SolrParams params = new ModifiableSolrParams();
       long commitVersion = 0;
 
       for(;;) {
-        Object o = tlogReader.readNext();
+        Object o = null;
+
+        try {
+          o = tlogReader.next(null);
+        } catch (InterruptedException e) {
+          SolrException.log(log,e);
+        } catch (IOException e) {
+          SolrException.log(log,e);
+        }
+
         if (o == null) break;
 
         // create a new request each time since the update handler and core could change
@@ -529,11 +540,15 @@ public class FSUpdateLog extends UpdateL
               cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
               uhandler.addDoc(cmd);
               break;
+
+              // TODO: updates need to go through versioning code for handing reorders? (for replicas at least,
+              // depending on how they recover.
             }
             case UpdateLog.DELETE:
             {
               byte[] idBytes = (byte[]) entry.get(2);
               DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+              cmd.setIndexedId(new BytesRef(idBytes));
               cmd.setVersion(version);
               cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
               uhandler.delete(cmd);
@@ -542,13 +557,18 @@ public class FSUpdateLog extends UpdateL
 
             case UpdateLog.DELETE_BY_QUERY:
             {
-              // TODO
+              String query = (String)entry.get(2);
+              DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+              cmd.query = query;
+              cmd.setVersion(version);
+              cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+              uhandler.delete(cmd);
               break;
             }
 
             case UpdateLog.COMMIT:
             {
-              // TODO
+              // currently we don't log commits
               commitVersion = version;
               break;
             }
@@ -559,15 +579,16 @@ public class FSUpdateLog extends UpdateL
         } catch (IOException ex) {
 
         } catch (ClassCastException cl) {
-          uhandler.log.warn("Corrupt log", cl);
+          log.warn("Corrupt log", cl);
           // would be caused by a corrupt transaction log
         } catch (Exception ex) {
-          uhandler.log.warn("Exception replaying log", ex);
+          log.warn("Exception replaying log", ex);
 
           // something wrong with the request?
         }
       }
       tlogReader.close();
+      tlog.decref();
 
       SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
       CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
@@ -578,16 +599,14 @@ public class FSUpdateLog extends UpdateL
       try {
         uhandler.commit(cmd);
       } catch (IOException ex) {
-        uhandler.log.error("Replay exception: final commit.", ex);
+        log.error("Replay exception: final commit.", ex);
       }
-      tlogReader.delete();
 
-      uhandler.core.log.warn("Ending log replay " + tlogReader);
+      log.warn("Ending log replay " + tlogReader);
 
     }
   }
 
-
   static ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
       1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Sat Nov 19 17:52:27 2011
@@ -23,12 +23,14 @@ import org.apache.solr.common.SolrInputD
 import org.apache.solr.common.util.FastInputStream;
 import org.apache.solr.common.util.FastOutputStream;
 import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.zookeeper.Transaction;
 
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -57,8 +59,7 @@ public class TransactionLog {
   RandomAccessFile raf;
   FileChannel channel;
   OutputStream os;
-  FastOutputStream fos;
-  InputStream is;
+  FastOutputStream fos;    // all accesses to this stream should be synchronized on "this"
 
   volatile boolean deleteOnClose = true;  // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
 
@@ -66,6 +67,8 @@ public class TransactionLog {
   Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
   List<String> globalStringList = new ArrayList<String>();
 
+  CountDownLatch latch;  // if set, used to signal that we just added another log record
+
   // write a BytesRef as a byte array
   JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
     @Override
@@ -130,24 +133,54 @@ public class TransactionLog {
   }
 
   TransactionLog(File tlogFile, Collection<String> globalStrings) {
+    this(tlogFile, globalStrings, false);
+  }
+
+  TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
     try {
       this.tlogFile = tlogFile;
       raf = new RandomAccessFile(this.tlogFile, "rw");
       long start = raf.length();
-      assert start==0;
-      if (start > 0) {
-        raf.setLength(0);
-      }
-      // System.out.println("###start= "+start);
       channel = raf.getChannel();
       os = Channels.newOutputStream(channel);
       fos = FastOutputStream.wrap(os);
-      addGlobalStrings(globalStrings);
+
+      if (openExisting) {
+        if (start > 0) {
+          readHeader(null);
+          fos.setWritten(start);    // reflect that we aren't starting at the beginning
+        } else {
+          addGlobalStrings(globalStrings);
+        }
+      } else {
+        assert start==0;
+        if (start > 0) {
+          raf.setLength(0);
+        }
+        addGlobalStrings(globalStrings);
+      }
+
     } catch (IOException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
   }
 
+  private void readHeader(FastInputStream fis) throws IOException {
+    // read existing header
+    fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
+    LogCodec codec = new LogCodec();
+    Map header = (Map)codec.unmarshal(fis);
+    // needed to read other records
+
+    synchronized (this) {
+      globalStringList = (List<String>)header.get("strings");
+      globalStringMap = new HashMap<String, Integer>(globalStringList.size());
+      for (int i=0; i<globalStringList.size(); i++) {
+        globalStringMap.put( globalStringList.get(i), i+1);
+      }
+    }
+  }
+
   private void addGlobalStrings(Collection<String> strings) {
     if (strings == null) return;
     int origSize = globalStringMap.size();
@@ -205,7 +238,7 @@ public class TransactionLog {
         // fos.flushBuffer();  // flush later
 
 
-
+        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -229,6 +262,7 @@ public class TransactionLog {
         BytesRef br = cmd.getIndexedId();
         codec.writeByteArray(br.bytes, br.offset, br.length);
         // fos.flushBuffer();  // flush later
+        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -251,6 +285,7 @@ public class TransactionLog {
         codec.writeLong(cmd.getVersion());
         codec.writeStr(cmd.query);
         // fos.flushBuffer();  // flush later
+        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -274,6 +309,7 @@ public class TransactionLog {
         codec.writeLong(cmd.getVersion());
         codec.writeStr(END_MESSAGE);  // ensure these bytes are the last in the file
         // fos.flushBuffer();  // flush later
+        endWrite();
         return pos;
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -281,6 +317,12 @@ public class TransactionLog {
     }
   }
 
+  private void endWrite() {
+    if (latch != null) {
+      latch.countDown();
+    }
+  }
+
 
   /* This method is thread safe */
   public Object lookup(long pos) {
@@ -306,7 +348,10 @@ public class TransactionLog {
   }
 
   public void incref() {
-    refcount.incrementAndGet();
+    int result = refcount.incrementAndGet();
+    if (result <= 1) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "incref on a closed log: " + this);
+    }
   }
 
   public void decref() {
@@ -350,11 +395,106 @@ public class TransactionLog {
     return tlogFile.toString();
   }
 
+  /** Returns a reader that can be used while a log is still in use.
+   * Currently only *one* log may be outstanding, and that log may only
+   * be used from a single thread. */
+  public LogReader getReader() {
+    return new LogReader();
+  }
+
+
+
+  public class LogReader {
+    ChannelFastInputStream fis = new ChannelFastInputStream(channel, 0);
+    private LogCodec codec = new LogCodec();
+
+    public LogReader() {
+      incref();
+    }
+
+    /** Returns the next object from the log.  If this log is being concurrently written
+     * to, and you wish to block until a new record is available, then pass in a latch
+     * to use.
+     *
+     * @param latch The latch to use to block, waiting for the next log record
+     * @return The log record, or null if EOF
+     * @throws IOException
+     */
+    public Object next(CountDownLatch latch) throws IOException, InterruptedException {
+      long pos = fis.position();
+
+      synchronized (TransactionLog.this) {
+        if (pos >= fos.size()) {
+          // we caught up.
+          TransactionLog.this.latch = latch;
+
+
+          // TODO: how to prevent a race between catching up and
+          // switching to "active"... need higher level coordination?
+          // Probably since updating this log is normally done after
+          // updating the index.  Perhaps more than one phase...
+          // 1) next() returns null
+          // 2) replayer sets flag to "almost active" and updates start going to the index
+          // 3) replayer continues calling next() to get any records that were added inbetween.
+          // This *still* doesn't work since a thread that skipped adding to the index could
+          // be delayed, next() could return null, and *then* the thread would write to the
+          // log.
+          // TODO: may need to utilize a read-write lock around all the updates (this
+          // may be needed for deleteByQuery anyway)
+          return null;
+        }
+
+        fos.flushBuffer();
+      }
+
+      if (TransactionLog.this.latch != null) {
+        TransactionLog.this.latch.await();
+
+        synchronized (TransactionLog.this) {
+          TransactionLog.this.latch = null;
+          if (fis.position() >= fos.size()) {
+            // still EOF... someone else must have tripped the latch.
+            return null;
+          }
+          fos.flushBuffer();
+        }
+
+      }
+
+      if (pos == 0) {
+        readHeader(fis);
+
+        // shouldn't currently happen - header and first record are currently written at the same time
+        synchronized (TransactionLog.this) {
+          if (fis.position() >= fos.size()) {
+            return null;
+          }
+        }
+      }
+
+      return codec.readVal(fis);
+    }
+
+    public void close() {
+      decref();
+    }
+
+    @Override
+    public String toString() {
+      synchronized (TransactionLog.this) {
+        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+      }
+    }
+
+  }
+
 }
 
+
+
 class ChannelFastInputStream extends FastInputStream {
   FileChannel ch;
-  long chPosition;
+  private long chPosition;
 
   public ChannelFastInputStream(FileChannel ch, long chPosition) {
     super(null);
@@ -378,4 +518,3 @@ class ChannelFastInputStream extends Fas
   }
 }
 
-

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sat Nov 19 17:52:27 2011
@@ -20,9 +20,13 @@ package org.apache.solr.update;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** @lucene.experimental */
 public abstract class UpdateLog implements PluginInfoInitialized {
+  public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
+
   public enum SyncLevel { NONE, FLUSH, FSYNC }
 
   public static final int ADD = 0x01;

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java Sat Nov 19 17:52:27 2011
@@ -27,6 +27,7 @@ public class FastInputStream extends Inp
   private final byte[] buf;
   private int pos;
   private int end;
+  private long readFromStream; // number of bytes read from the underlying inputstream
 
   public FastInputStream(InputStream in) {
   // use default BUFSIZE of BufferedOutputStream so if we wrap that
@@ -73,7 +74,13 @@ public class FastInputStream extends Inp
   }
 
   public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
-    return in.read(target, offset, len);
+    int ret =  in.read(target, offset, len);
+    if (ret > 0) readFromStream += ret;
+    return ret;
+  }
+
+  public long position() {
+    return readFromStream - (end - pos);
   }
 
   public void refill() throws IOException {

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java?rev=1204028&r1=1204027&r2=1204028&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastOutputStream.java Sat Nov 19 17:52:27 2011
@@ -25,7 +25,7 @@ import java.io.*;
 public class FastOutputStream extends OutputStream implements DataOutput {
   private final OutputStream out;
   private final byte[] buf;
-  private long written;  // how many bytes written
+  private long written;  // how many bytes written to the underlying stream
   private int pos;
 
   public FastOutputStream(OutputStream w) {
@@ -191,4 +191,17 @@ public class FastOutputStream extends Ou
   public long size() {
     return written + pos;
   }
+
+  /** Returns the number of bytes actually written to the underlying OutputStream, not including
+   * anything currently buffered by this class itself.
+   */
+  public long written() {
+    return written;
+  }
+
+  /** Resets the count returned by written() */
+  public void setWritten(long written) {
+    this.written = written;
+  }
+
 }