You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/14 16:24:21 UTC

svn commit: r1201748 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ solrj/src/java/org/apache/solr/common/util/

Author: yonik
Date: Mon Nov 14 15:24:20 2011
New Revision: 1201748

URL: http://svn.apache.org/viewvc?rev=1201748&view=rev
Log:
SOLR-2808: first pass at recovering from tlog on startup

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java   (with props)
Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Mon Nov 14 15:24:20 2011
@@ -91,6 +91,10 @@ public class AddUpdateCommand extends Up
      return indexedId;
    }
 
+   public void setIndexedId(BytesRef idBytes) {
+     this.indexedId = indexedId;
+   }
+
    public String getPrintableId() {
      IndexSchema schema = req.getSchema();
      SchemaField sf = schema.getUniqueKeyField();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Mon Nov 14 15:24:20 2011
@@ -20,19 +20,20 @@ package org.apache.solr.update;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.FastInputStream;
-import org.apache.solr.common.util.FastOutputStream;
-import org.apache.solr.common.util.JavaBinCodec;
-import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
 
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.FileChannel;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /** @lucene.experimental */
 class NullUpdateLog extends UpdateLog {
@@ -90,6 +91,16 @@ class NullUpdateLog extends UpdateLog {
   public VersionInfo getVersionInfo() {
     return null;
   }
+
+  @Override
+  public void finish(SyncLevel synclevel) {
+  }
+
+  @Override
+  public boolean recoverFromLog() {
+    return false;
+  }
+
 }
 
 /** @lucene.experimental */
@@ -125,6 +136,11 @@ public class FSUpdateLog extends UpdateL
   private String lastDataDir;
 
   private VersionInfo versionInfo;
+
+  private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
+
+  private volatile UpdateHandler uhandler;    // a core reload can change this reference!
+
   @Override
   public VersionInfo getVersionInfo() {
     return versionInfo;
@@ -140,6 +156,8 @@ public class FSUpdateLog extends UpdateL
       dataDir = core.getDataDir();
     }
 
+    this.uhandler = uhandler;
+
     if (dataDir.equals(lastDataDir)) {
       // on a normal reopen, we currently shouldn't have to do anything
       return;
@@ -151,6 +169,8 @@ public class FSUpdateLog extends UpdateL
     id = getLastLogId() + 1;   // add 1 since we will create a new log for the next update
 
     versionInfo = new VersionInfo(uhandler, 256);
+
+    recoverFromLog();  // TODO: is this too early?
   }
 
   static class LogPtr {
@@ -389,327 +409,178 @@ public class FSUpdateLog extends UpdateL
     return null;
   }
 
-
-  private void ensureLog() {
-    if (tlog == null) {
-      String newLogName = String.format("%s.%019d", TLOG_NAME, id);
-      tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
-    }
-  }
-
   @Override
-  public void close() {
-    synchronized (this) {
-      if (prevTlog != null) {
-        prevTlog.decref();
-      }
-      if (tlog != null) {
-        tlog.decref();
-      }
+  public void finish(SyncLevel syncLevel) {
+    if (syncLevel == null) {
+      syncLevel = defaultSyncLevel;
+    }
+    if (syncLevel == SyncLevel.NONE) {
+      return;
     }
-  }
-
-
-}
-
-
-/**
- *  Log Format: List{Operation, Version, ...}
- *  ADD, VERSION, DOC
- *  DELETE, VERSION, ID_BYTES
- *  DELETE_BY_QUERY, VERSION, String
- *
- *  TODO: keep two files, one for [operation, version, id] and the other for the actual
- *  document data.  That way we could throw away document log files more readily
- *  while retaining the smaller operation log files longer (and we can retrieve
- *  the stored fields from the latest documents from the index).
- *
- *  This would require keeping all source fields stored of course.
- *
- *  This would also allow to not log document data for requests with commit=true
- *  in them (since we know that if the request succeeds, all docs will be committed)
- *
- */
-class TransactionLog {
-
-  long id;
-  File tlogFile;
-  RandomAccessFile raf;
-  FileChannel channel;
-  OutputStream os;
-  FastOutputStream fos;
-  InputStream is;
-  long start;
-
-  volatile boolean deleteOnClose = true;  // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
-
-  AtomicInteger refcount = new AtomicInteger(1);
-  Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
-  List<String> globalStringList = new ArrayList<String>();
 
-  // write a BytesRef as a byte array
-  JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
-    @Override
-    public Object resolve(Object o, JavaBinCodec codec) throws IOException {
-      if (o instanceof BytesRef) {
-        BytesRef br = (BytesRef)o;
-        codec.writeByteArray(br.bytes, br.offset, br.length);
-        return null;
-      }
-      return o;
+    TransactionLog currLog;
+    synchronized (this) {
+      currLog = tlog;
+      if (currLog == null) return;
+      currLog.incref();
     }
-  };
 
-  public class LogCodec extends JavaBinCodec {
-    public LogCodec() {
-      super(resolver);
+    try {
+      tlog.finish(syncLevel);
+    } finally {
+      currLog.decref();
     }
+  }
 
-    @Override
-    public void writeExternString(String s) throws IOException {
-      if (s == null) {
-        writeTag(NULL);
-        return;
+  @Override
+  public boolean recoverFromLog() {
+    if (tlogFiles.length == 0) return false;
+    TransactionLogReader tlogReader = null;
+    try {
+      tlogReader = new TransactionLogReader( new File(tlogDir, tlogFiles[tlogFiles.length-1]) );
+      boolean completed = tlogReader.completed();
+      if (completed) {
+        return true;
       }
 
-      // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
-      Integer idx = globalStringMap.get(s);
-      if (idx == null) {
-        // write a normal string
-        writeStr(s);
-      } else {
-        // write the extern string
-        writeTag(EXTERN_STRING, idx);
-      }
-    }
+      recoveryExecutor.execute(new LogReplayer(tlogReader));
+      return true;
 
-    @Override
-    public String readExternString(FastInputStream fis) throws IOException {
-      int idx = readSize(fis);
-      if (idx != 0) {// idx != 0 is the index of the extern string
-      // no need to synchronize globalStringList - it's only updated before the first record is written to the log
-        return globalStringList.get(idx - 1);
-      } else {// idx == 0 means it has a string value
-        // this shouldn't happen with this codec subclass.
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
-      }
+    } catch (Exception ex) {
+      // an error during recovery
+      uhandler.log.warn("Exception during recovery", ex);
+      if (tlogReader != null) tlogReader.close();
     }
 
-
+    return false;
   }
 
-  public long writeData(Object o) {
-    LogCodec codec = new LogCodec();
-    try {
-      long pos = start + fos.size();   // if we had flushed, this should be equal to channel.position()
-      codec.init(fos);
-      codec.writeVal(o);
-      return pos;
-    } catch (IOException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-    }
-  }
 
-  TransactionLog(File tlogFile, Collection<String> globalStrings) {
-    try {
-      this.tlogFile = tlogFile;
-      raf = new RandomAccessFile(this.tlogFile, "rw");
-      start = raf.length();
-      // System.out.println("###start= "+start);
-      channel = raf.getChannel();
-      os = Channels.newOutputStream(channel);
-      fos = FastOutputStream.wrap(os);
-      addGlobalStrings(globalStrings);
-    } catch (IOException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+  private void ensureLog() {
+    if (tlog == null) {
+      String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+      tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
     }
   }
 
-  private void addGlobalStrings(Collection<String> strings) {
-    if (strings == null) return;
-    int origSize = globalStringMap.size();
-    for (String s : strings) {
-      Integer idx = null;
-      if (origSize > 0) {
-        idx = globalStringMap.get(s);
+  @Override
+  public void close() {
+    synchronized (this) {
+      if (prevTlog != null) {
+        prevTlog.decref();
+      }
+      if (tlog != null) {
+        tlog.decref();
       }
-      if (idx != null) continue;  // already in list
-      globalStringList.add(s);
-      globalStringMap.put(s, globalStringList.size());
     }
-    assert globalStringMap.size() == globalStringList.size();
   }
 
-  Collection<String> getGlobalStrings() {
-    synchronized (fos) {
-      return new ArrayList<String>(globalStringList);
+  // TODO: do we let the log replayer run across core reloads?
+  class LogReplayer implements Runnable {
+    TransactionLogReader tlogReader;
+    public LogReplayer(TransactionLogReader tlogReader) {
+      this.tlogReader = tlogReader;
     }
-  }
-
-  private void writeLogHeader(LogCodec codec) throws IOException {
-    NamedList header = new NamedList<Object>();
-    header.add("SOLR_TLOG",1); // a magic string + version number?
-    header.add("strings",globalStringList);
-    codec.marshal(header, fos);
-  }
 
+    @Override
+    public void run() {
+      uhandler.core.log.warn("Starting log replay " + tlogReader);
 
-  public long write(AddUpdateCommand cmd) {
-    LogCodec codec = new LogCodec();
-    synchronized (fos) {
-      try {
-        long pos = start + fos.size();   // if we had flushed, this should be equal to channel.position()
-        SolrInputDocument sdoc = cmd.getSolrInputDocument();
-
-        if (pos == 0) { // TODO: needs to be changed if we start writing a header first
-          addGlobalStrings(sdoc.getFieldNames());
-          pos = start + fos.size();
-        }
-
-        /***
-        System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
-         if (pos != fos.size()) {
-          throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
-        }
-         ***/
-
-        codec.init(fos);
-        codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.ADD);  // should just take one byte
-        codec.writeLong(cmd.getVersion());
-        codec.writeSolrInputDocument(cmd.getSolrInputDocument());
-        // fos.flushBuffer();  // flush later
-
+      SolrParams params = new ModifiableSolrParams();
+      long commitVersion = 0;
 
+      for(;;) {
+        Object o = tlogReader.readNext();
+        if (o == null) break;
+
+        // create a new request each time since the update handler and core could change
+        SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+
+        // TODO: race?  This core could close on us if it was reloaded
+
+        try {
+
+          // should currently be a List<Oper,Ver,Doc/Id>
+          List entry = (List)o;
+
+          int oper = (Integer)entry.get(0);
+          long version = (Long) entry.get(1);
+
+          switch (oper) {
+            case UpdateLog.ADD:
+            {
+              // byte[] idBytes = (byte[]) entry.get(2);
+              SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+              AddUpdateCommand cmd = new AddUpdateCommand(req);
+              // cmd.setIndexedId(new BytesRef(idBytes));
+              cmd.solrDoc = sdoc;
+              cmd.setVersion(version);
+              cmd.setFlags(UpdateCommand.REPLAY);
+              uhandler.addDoc(cmd);
+              break;
+            }
+            case UpdateLog.DELETE:
+            {
+              byte[] idBytes = (byte[]) entry.get(2);
+              DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+              cmd.setVersion(version);
+              cmd.setFlags(UpdateCommand.REPLAY);
+              uhandler.delete(cmd);
+              break;
+            }
+
+            case UpdateLog.DELETE_BY_QUERY:
+            {
+              // TODO
+              break;
+            }
+
+            case UpdateLog.COMMIT:
+            {
+              // TODO
+              commitVersion = version;
+              break;
+            }
+
+            default:
+              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+          }
+        } catch (IOException ex) {
+
+        } catch (ClassCastException cl) {
+          uhandler.log.warn("Corrupt log", cl);
+          // would be caused by a corrupt transaction log
+        } catch (Exception ex) {
+          uhandler.log.warn("Exception replaying log", ex);
 
-        return pos;
-      } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      }
-    }
-  }
-
-  public long writeDelete(DeleteUpdateCommand cmd) {
-    LogCodec codec = new LogCodec();
-    synchronized (fos) {
-      try {
-        long pos = start + fos.size();   // if we had flushed, this should be equal to channel.position()
-        if (pos == 0) {
-          writeLogHeader(codec);
-          pos = start + fos.size();
+          // something wrong with the request?
         }
-        codec.init(fos);
-        codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.DELETE);  // should just take one byte
-        codec.writeLong(cmd.getVersion());
-        BytesRef br = cmd.getIndexedId();
-        codec.writeByteArray(br.bytes, br.offset, br.length);
-        // fos.flushBuffer();  // flush later
-        return pos;
-      } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
-    }
-  }
+      tlogReader.close();
 
-  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
-    LogCodec codec = new LogCodec();
-    synchronized (fos) {
+      SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+      CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+      cmd.setVersion(commitVersion);
+      cmd.softCommit = false;
+      cmd.waitSearcher = false;
+      cmd.setFlags(UpdateCommand.REPLAY);
       try {
-        long pos = start + fos.size();   // if we had flushed, this should be equal to channel.position()
-        if (pos == 0) {
-          writeLogHeader(codec);
-          pos = start + fos.size();
-        }
-        codec.init(fos);
-        codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
-        codec.writeLong(cmd.getVersion());
-        codec.writeStr(cmd.query);
-        // fos.flushBuffer();  // flush later
-        return pos;
-      } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      }
-    }
-  }
-
-  /* This method is thread safe */
-  public Object lookup(long pos) {
-    try {
-      // make sure any unflushed buffer has been flushed
-      synchronized (fos) {
-        // TODO: optimize this by keeping track of what we have flushed up to
-        fos.flushBuffer();
-        /***
-         System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
-        if (fos.size() != raf.length() || pos >= fos.size() ) {
-          throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
-        }
-        ***/
+        uhandler.commit(cmd);
+      } catch (IOException ex) {
+        uhandler.log.error("Replay exception: final commit.", ex);
       }
+      tlogReader.delete();
 
-      ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
-      LogCodec codec = new LogCodec();
-      return codec.readVal(fis);
-    } catch (IOException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-    }
-  }
+      uhandler.core.log.warn("Ending log replay " + tlogReader);
 
-  public void incref() {
-    refcount.incrementAndGet();
-  }
-
-  public void decref() {
-    if (refcount.decrementAndGet() == 0) {
-      close();
     }
   }
 
 
-  private void close() {
-    try {
-      fos.flush();
-      fos.close();
-      if (deleteOnClose) {
-        tlogFile.delete();
-      }
-    } catch (IOException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-    }
-  }
-
-  public String toString() {
-    return tlogFile.toString();
-  }
+  static ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+      1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
 
 }
 
 
-class ChannelFastInputStream extends FastInputStream {
-  FileChannel ch;
-  long chPosition;
-
-  public ChannelFastInputStream(FileChannel ch, long chPosition) {
-    super(null);
-    this.ch = ch;
-    this.chPosition = chPosition;
-  }
-
-  @Override
-  public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
-    ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
-    int ret = ch.read(bb, chPosition);
-    if (ret >= 0) {
-      chPosition += ret;
-    }
-    return ret;
-  }
-
-  @Override
-  public void close() throws IOException {
-    ch.close();
-  }
-}
 

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1201748&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Mon Nov 14 15:24:20 2011
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *  Log Format: List{Operation, Version, ...}
+ *  ADD, VERSION, DOC
+ *  DELETE, VERSION, ID_BYTES
+ *  DELETE_BY_QUERY, VERSION, String
+ *
+ *  TODO: keep two files, one for [operation, version, id] and the other for the actual
+ *  document data.  That way we could throw away document log files more readily
+ *  while retaining the smaller operation log files longer (and we can retrieve
+ *  the stored fields from the latest documents from the index).
+ *
+ *  This would require keeping all source fields stored of course.
+ *
+ *  This would also allow to not log document data for requests with commit=true
+ *  in them (since we know that if the request succeeds, all docs will be committed)
+ *
+ */
+public class TransactionLog {
+
+  public final static String END_MESSAGE="SOLR_TLOG_END";
+
+  long id;
+  File tlogFile;
+  RandomAccessFile raf;
+  FileChannel channel;
+  OutputStream os;
+  FastOutputStream fos;
+  InputStream is;
+
+  volatile boolean deleteOnClose = true;  // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
+
+  AtomicInteger refcount = new AtomicInteger(1);
+  Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
+  List<String> globalStringList = new ArrayList<String>();
+
+  // write a BytesRef as a byte array
+  JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
+    @Override
+    public Object resolve(Object o, JavaBinCodec codec) throws IOException {
+      if (o instanceof BytesRef) {
+        BytesRef br = (BytesRef)o;
+        codec.writeByteArray(br.bytes, br.offset, br.length);
+        return null;
+      }
+      return o;
+    }
+  };
+
+  public class LogCodec extends JavaBinCodec {
+    public LogCodec() {
+      super(resolver);
+    }
+
+    @Override
+    public void writeExternString(String s) throws IOException {
+      if (s == null) {
+        writeTag(NULL);
+        return;
+      }
+
+      // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
+      Integer idx = globalStringMap.get(s);
+      if (idx == null) {
+        // write a normal string
+        writeStr(s);
+      } else {
+        // write the extern string
+        writeTag(EXTERN_STRING, idx);
+      }
+    }
+
+    @Override
+    public String readExternString(FastInputStream fis) throws IOException {
+      int idx = readSize(fis);
+      if (idx != 0) {// idx != 0 is the index of the extern string
+      // no need to synchronize globalStringList - it's only updated before the first record is written to the log
+        return globalStringList.get(idx - 1);
+      } else {// idx == 0 means it has a string value
+        // this shouldn't happen with this codec subclass.
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
+      }
+    }
+
+
+  }
+
+  public long writeData(Object o) {
+    LogCodec codec = new LogCodec();
+    try {
+      long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+      codec.init(fos);
+      codec.writeVal(o);
+      return pos;
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  TransactionLog(File tlogFile, Collection<String> globalStrings) {
+    try {
+      this.tlogFile = tlogFile;
+      raf = new RandomAccessFile(this.tlogFile, "rw");
+      long start = raf.length();
+      assert start==0;
+      if (start > 0) {
+        raf.setLength(0);
+      }
+      // System.out.println("###start= "+start);
+      channel = raf.getChannel();
+      os = Channels.newOutputStream(channel);
+      fos = FastOutputStream.wrap(os);
+      addGlobalStrings(globalStrings);
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private void addGlobalStrings(Collection<String> strings) {
+    if (strings == null) return;
+    int origSize = globalStringMap.size();
+    for (String s : strings) {
+      Integer idx = null;
+      if (origSize > 0) {
+        idx = globalStringMap.get(s);
+      }
+      if (idx != null) continue;  // already in list
+      globalStringList.add(s);
+      globalStringMap.put(s, globalStringList.size());
+    }
+    assert globalStringMap.size() == globalStringList.size();
+  }
+
+  Collection<String> getGlobalStrings() {
+    synchronized (fos) {
+      return new ArrayList<String>(globalStringList);
+    }
+  }
+
+  private void writeLogHeader(LogCodec codec) throws IOException {
+    Map header = new LinkedHashMap<String,Object>();
+    header.put("SOLR_TLOG",1); // a magic string + version number
+    header.put("strings",globalStringList);
+    codec.marshal(header, fos);
+  }
+
+
+  public long write(AddUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (fos) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        SolrInputDocument sdoc = cmd.getSolrInputDocument();
+
+        if (pos == 0) { // TODO: needs to be changed if we start writing a header first
+          addGlobalStrings(sdoc.getFieldNames());
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+
+        /***
+        System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+         if (pos != fos.size()) {
+          throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+        }
+         ***/
+
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.ADD);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+        // fos.flushBuffer();  // flush later
+
+
+
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+  public long writeDelete(DeleteUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (fos) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.DELETE);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        BytesRef br = cmd.getIndexedId();
+        codec.writeByteArray(br.bytes, br.offset, br.length);
+        // fos.flushBuffer();  // flush later
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (fos) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeStr(cmd.query);
+        // fos.flushBuffer();  // flush later
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+
+  public long writeCommit(CommitUpdateCommand cmd) {
+    LogCodec codec = new LogCodec();
+    synchronized (fos) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 3);
+        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeStr(END_MESSAGE);  // ensure these bytes are the last in the file
+        // fos.flushBuffer();  // flush later
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+
+  /* This method is thread safe */
+  public Object lookup(long pos) {
+    try {
+      // make sure any unflushed buffer has been flushed
+      synchronized (fos) {
+        // TODO: optimize this by keeping track of what we have flushed up to
+        fos.flushBuffer();
+        /***
+         System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+        if (fos.size() != raf.length() || pos >= fos.size() ) {
+          throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+        }
+        ***/
+      }
+
+      ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
+      LogCodec codec = new LogCodec();
+      return codec.readVal(fis);
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  public void incref() {
+    refcount.incrementAndGet();
+  }
+
+  public void decref() {
+    if (refcount.decrementAndGet() == 0) {
+      close();
+    }
+  }
+
+  public void finish(UpdateLog.SyncLevel syncLevel) {
+    if (syncLevel == UpdateLog.SyncLevel.NONE) return;
+    try {
+      synchronized (fos) {
+        fos.flushBuffer();
+      }
+
+      if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
+        // Since fsync is outside of synchronized block, we can end up with a partial
+        // last record on power failure (which is OK, and does not represent an error...
+        // we just need to be aware of it when reading).
+        raf.getFD().sync();
+      }
+
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private void close() {
+    try {
+      fos.flush();
+      fos.close();
+      if (deleteOnClose) {
+        tlogFile.delete();
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  public String toString() {
+    return tlogFile.toString();
+  }
+
+}
+
+class ChannelFastInputStream extends FastInputStream {
+  FileChannel ch;
+  long chPosition;
+
+  public ChannelFastInputStream(FileChannel ch, long chPosition) {
+    super(null);
+    this.ch = ch;
+    this.chPosition = chPosition;
+  }
+
+  @Override
+  public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
+    ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
+    int ret = ch.read(bb, chPosition);
+    if (ret >= 0) {
+      chPosition += ret;
+    }
+    return ret;
+  }
+
+  @Override
+  public void close() throws IOException {
+    ch.close();
+  }
+}
+
+

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java?rev=1201748&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java Mon Nov 14 15:24:20 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/** Single threaded transaction log reader */
+public class TransactionLogReader {
+  private static byte[] END_MESSAGE_BYTES;
+  static {
+    try {
+      END_MESSAGE_BYTES = TransactionLog.END_MESSAGE.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      // impossible
+    }
+  }
+
+  private File tlogFile;
+  private RandomAccessFile raf;
+  private FileChannel channel;
+  private FastInputStream fis;
+  private boolean completed;
+  private LogCodec codec;
+
+  private Map header;
+  private List<String> globalStringList;
+
+
+  public class LogCodec extends JavaBinCodec {
+    public LogCodec() {
+    }
+
+    @Override
+    public String readExternString(FastInputStream fis) throws IOException {
+      int idx = readSize(fis);
+      if (idx != 0) {// idx != 0 is the index of the extern string
+      // no need to synchronize globalStringList - it's only updated before the first record is written to the log
+        return globalStringList.get(idx - 1);
+      } else {// idx == 0 means it has a string value
+        // this shouldn't happen with this codec subclass.
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
+      }
+    }
+  }
+
+
+  public TransactionLogReader(File tlogFile) {
+    try {
+      this.tlogFile = tlogFile;
+      raf = new RandomAccessFile(tlogFile,"r");
+      byte[] end = new byte[END_MESSAGE_BYTES.length];
+      long size = raf.length();
+      completed = false;
+      if (size >= end.length) {
+        raf.seek(size - end.length);
+        raf.readFully(end);
+        completed = Arrays.equals(end, END_MESSAGE_BYTES);
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  /** did the file end with END_MESSAGE_BYTES, implying that everything here was committed? */
+  public boolean completed() {
+    return completed;
+  }
+
+  public Map readHeader() {
+    if (header != null) return header;
+    try {
+      raf.seek(0);
+      channel = raf.getChannel();
+      InputStream is = Channels.newInputStream(channel);
+      fis = new FastInputStream(is);
+      codec = new LogCodec();
+      header = (Map)codec.unmarshal(fis);
+
+      // needed to read other records
+      globalStringList = (List<String>)header.get("strings");
+
+      return header;
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log", e);
+    }
+  }
+
+  public Object readNext() {
+    try {
+      readHeader();
+      if (fis.peek() == -1) return null;   // EOF
+      return codec.readVal(fis);
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log", e);
+    }
+  }
+
+  public void close() {
+    try {
+      raf.close();
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log", e);
+    }
+  }
+
+  public void delete() {
+    tlogFile.delete();
+  }
+
+  @Override
+  public String toString() {
+    return "TransactionLogReader{"+"file="+tlogFile+"}";
+  }
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Mon Nov 14 15:24:20 2011
@@ -28,6 +28,9 @@ import org.apache.solr.request.SolrQuery
     protected final SolrQueryRequest req;
     protected final String commandName;
     protected long version;
+    protected int flags;
+
+    public static int REPLAY = 0x00000001;     // update command is from replaying a log.
 
     public UpdateCommand(String commandName, SolrQueryRequest req) {
       this.req = req;
@@ -45,4 +48,12 @@ import org.apache.solr.request.SolrQuery
     public void setVersion(long version) {
       this.version = version;
     }
+
+    public void setFlags(int flags) {
+      this.flags = flags;
+    }
+
+    public int getFlags() {
+      return flags;
+    }
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Mon Nov 14 15:24:20 2011
@@ -23,9 +23,12 @@ import org.apache.solr.util.plugin.Plugi
 
 /** @lucene.experimental */
 public abstract class UpdateLog implements PluginInfoInitialized {
-  public static final int ADD = 0x00;
-  public static final int DELETE = 0x01;
-  public static final int DELETE_BY_QUERY = 0x02;
+  public enum SyncLevel { NONE, FLUSH, FSYNC }
+
+  public static final int ADD = 0x01;
+  public static final int DELETE = 0x02;
+  public static final int DELETE_BY_QUERY = 0x03;
+  public static final int COMMIT = 0x04;
 
   public abstract void init(UpdateHandler uhandler, SolrCore core);
   public abstract void add(AddUpdateCommand cmd);
@@ -39,4 +42,6 @@ public abstract class UpdateLog implemen
   public abstract Long lookupVersion(BytesRef indexedId);
   public abstract void close();
   public abstract VersionInfo getVersionInfo();
+  public abstract void finish(SyncLevel syncLevel);
+  public abstract boolean recoverFromLog();
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java Mon Nov 14 15:24:20 2011
@@ -21,13 +21,7 @@ import java.io.IOException;
 
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.DocumentBuilder;
-import org.apache.solr.update.MergeIndexesCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
-import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.*;
 
 
 /**
@@ -49,6 +43,8 @@ class RunUpdateProcessor extends UpdateR
   private final SolrQueryRequest req;
   private final UpdateHandler updateHandler;
 
+  private boolean changesSinceCommit = false;
+
   public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
     super( next );
     this.req = req;
@@ -59,6 +55,7 @@ class RunUpdateProcessor extends UpdateR
   public void processAdd(AddUpdateCommand cmd) throws IOException {
     updateHandler.addDoc(cmd);
     super.processAdd(cmd);
+    changesSinceCommit = true;
   }
 
   @Override
@@ -70,6 +67,7 @@ class RunUpdateProcessor extends UpdateR
       updateHandler.deleteByQuery(cmd);
     }
     super.processDelete(cmd);
+    changesSinceCommit = true;
   }
 
   @Override
@@ -83,6 +81,7 @@ class RunUpdateProcessor extends UpdateR
   {
     updateHandler.commit(cmd);
     super.processCommit(cmd);
+    changesSinceCommit = false;
   }
 
   /**
@@ -93,6 +92,16 @@ class RunUpdateProcessor extends UpdateR
   {
     updateHandler.rollback(cmd);
     super.processRollback(cmd);
+    changesSinceCommit = false;
+  }
+
+
+  @Override
+  public void finish() throws IOException {
+    if (changesSinceCommit) {
+      updateHandler.getUpdateLog().finish(null);
+    }
+    super.finish();
   }
 }
 

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java Mon Nov 14 15:24:20 2011
@@ -55,6 +55,15 @@ public class FastInputStream extends Inp
     return buf[pos++] & 0xff;     
   }
 
+  public int peek() throws IOException {
+    if (pos >= end) {
+      refill();
+      if (pos >= end) return -1;
+    }
+    return buf[pos] & 0xff;
+  }
+
+
   public int readUnsignedByte() throws IOException {
     if (pos >= end) {
       refill();