You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/14 16:24:21 UTC
svn commit: r1201748 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
solrj/src/java/org/apache/solr/common/util/
Author: yonik
Date: Mon Nov 14 15:24:20 2011
New Revision: 1201748
URL: http://svn.apache.org/viewvc?rev=1201748&view=rev
Log:
SOLR-2808: first pass at recovering from tlog on startup
Added:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (with props)
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java (with props)
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Mon Nov 14 15:24:20 2011
@@ -91,6 +91,10 @@ public class AddUpdateCommand extends Up
return indexedId;
}
+ public void setIndexedId(BytesRef idBytes) {
+ this.indexedId = indexedId;
+ }
+
public String getPrintableId() {
IndexSchema schema = req.getSchema();
SchemaField sf = schema.getUniqueKeyField();
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Mon Nov 14 15:24:20 2011
@@ -20,19 +20,20 @@ package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.FastInputStream;
-import org.apache.solr.common.util.FastOutputStream;
-import org.apache.solr.common.util.JavaBinCodec;
-import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.FileChannel;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/** @lucene.experimental */
class NullUpdateLog extends UpdateLog {
@@ -90,6 +91,16 @@ class NullUpdateLog extends UpdateLog {
public VersionInfo getVersionInfo() {
return null;
}
+
+ @Override
+ public void finish(SyncLevel synclevel) {
+ }
+
+ @Override
+ public boolean recoverFromLog() {
+ return false;
+ }
+
}
/** @lucene.experimental */
@@ -125,6 +136,11 @@ public class FSUpdateLog extends UpdateL
private String lastDataDir;
private VersionInfo versionInfo;
+
+ private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
+
+ private volatile UpdateHandler uhandler; // a core reload can change this reference!
+
@Override
public VersionInfo getVersionInfo() {
return versionInfo;
@@ -140,6 +156,8 @@ public class FSUpdateLog extends UpdateL
dataDir = core.getDataDir();
}
+ this.uhandler = uhandler;
+
if (dataDir.equals(lastDataDir)) {
// on a normal reopen, we currently shouldn't have to do anything
return;
@@ -151,6 +169,8 @@ public class FSUpdateLog extends UpdateL
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
versionInfo = new VersionInfo(uhandler, 256);
+
+ recoverFromLog(); // TODO: is this too early?
}
static class LogPtr {
@@ -389,327 +409,178 @@ public class FSUpdateLog extends UpdateL
return null;
}
-
- private void ensureLog() {
- if (tlog == null) {
- String newLogName = String.format("%s.%019d", TLOG_NAME, id);
- tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
- }
- }
-
@Override
- public void close() {
- synchronized (this) {
- if (prevTlog != null) {
- prevTlog.decref();
- }
- if (tlog != null) {
- tlog.decref();
- }
+ public void finish(SyncLevel syncLevel) {
+ if (syncLevel == null) {
+ syncLevel = defaultSyncLevel;
+ }
+ if (syncLevel == SyncLevel.NONE) {
+ return;
}
- }
-
-
-}
-
-
-/**
- * Log Format: List{Operation, Version, ...}
- * ADD, VERSION, DOC
- * DELETE, VERSION, ID_BYTES
- * DELETE_BY_QUERY, VERSION, String
- *
- * TODO: keep two files, one for [operation, version, id] and the other for the actual
- * document data. That way we could throw away document log files more readily
- * while retaining the smaller operation log files longer (and we can retrieve
- * the stored fields from the latest documents from the index).
- *
- * This would require keeping all source fields stored of course.
- *
- * This would also allow to not log document data for requests with commit=true
- * in them (since we know that if the request succeeds, all docs will be committed)
- *
- */
-class TransactionLog {
-
- long id;
- File tlogFile;
- RandomAccessFile raf;
- FileChannel channel;
- OutputStream os;
- FastOutputStream fos;
- InputStream is;
- long start;
-
- volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
-
- AtomicInteger refcount = new AtomicInteger(1);
- Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
- List<String> globalStringList = new ArrayList<String>();
- // write a BytesRef as a byte array
- JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
- @Override
- public Object resolve(Object o, JavaBinCodec codec) throws IOException {
- if (o instanceof BytesRef) {
- BytesRef br = (BytesRef)o;
- codec.writeByteArray(br.bytes, br.offset, br.length);
- return null;
- }
- return o;
+ TransactionLog currLog;
+ synchronized (this) {
+ currLog = tlog;
+ if (currLog == null) return;
+ currLog.incref();
}
- };
- public class LogCodec extends JavaBinCodec {
- public LogCodec() {
- super(resolver);
+ try {
+ tlog.finish(syncLevel);
+ } finally {
+ currLog.decref();
}
+ }
- @Override
- public void writeExternString(String s) throws IOException {
- if (s == null) {
- writeTag(NULL);
- return;
+ @Override
+ public boolean recoverFromLog() {
+ if (tlogFiles.length == 0) return false;
+ TransactionLogReader tlogReader = null;
+ try {
+ tlogReader = new TransactionLogReader( new File(tlogDir, tlogFiles[tlogFiles.length-1]) );
+ boolean completed = tlogReader.completed();
+ if (completed) {
+ return true;
}
- // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
- Integer idx = globalStringMap.get(s);
- if (idx == null) {
- // write a normal string
- writeStr(s);
- } else {
- // write the extern string
- writeTag(EXTERN_STRING, idx);
- }
- }
+ recoveryExecutor.execute(new LogReplayer(tlogReader));
+ return true;
- @Override
- public String readExternString(FastInputStream fis) throws IOException {
- int idx = readSize(fis);
- if (idx != 0) {// idx != 0 is the index of the extern string
- // no need to synchronize globalStringList - it's only updated before the first record is written to the log
- return globalStringList.get(idx - 1);
- } else {// idx == 0 means it has a string value
- // this shouldn't happen with this codec subclass.
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
- }
+ } catch (Exception ex) {
+ // an error during recovery
+ uhandler.log.warn("Exception during recovery", ex);
+ if (tlogReader != null) tlogReader.close();
}
-
+ return false;
}
- public long writeData(Object o) {
- LogCodec codec = new LogCodec();
- try {
- long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
- codec.init(fos);
- codec.writeVal(o);
- return pos;
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
- TransactionLog(File tlogFile, Collection<String> globalStrings) {
- try {
- this.tlogFile = tlogFile;
- raf = new RandomAccessFile(this.tlogFile, "rw");
- start = raf.length();
- // System.out.println("###start= "+start);
- channel = raf.getChannel();
- os = Channels.newOutputStream(channel);
- fos = FastOutputStream.wrap(os);
- addGlobalStrings(globalStrings);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ private void ensureLog() {
+ if (tlog == null) {
+ String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+ tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
}
}
- private void addGlobalStrings(Collection<String> strings) {
- if (strings == null) return;
- int origSize = globalStringMap.size();
- for (String s : strings) {
- Integer idx = null;
- if (origSize > 0) {
- idx = globalStringMap.get(s);
+ @Override
+ public void close() {
+ synchronized (this) {
+ if (prevTlog != null) {
+ prevTlog.decref();
+ }
+ if (tlog != null) {
+ tlog.decref();
}
- if (idx != null) continue; // already in list
- globalStringList.add(s);
- globalStringMap.put(s, globalStringList.size());
}
- assert globalStringMap.size() == globalStringList.size();
}
- Collection<String> getGlobalStrings() {
- synchronized (fos) {
- return new ArrayList<String>(globalStringList);
+ // TODO: do we let the log replayer run across core reloads?
+ class LogReplayer implements Runnable {
+ TransactionLogReader tlogReader;
+ public LogReplayer(TransactionLogReader tlogReader) {
+ this.tlogReader = tlogReader;
}
- }
-
- private void writeLogHeader(LogCodec codec) throws IOException {
- NamedList header = new NamedList<Object>();
- header.add("SOLR_TLOG",1); // a magic string + version number?
- header.add("strings",globalStringList);
- codec.marshal(header, fos);
- }
+ @Override
+ public void run() {
+ uhandler.core.log.warn("Starting log replay " + tlogReader);
- public long write(AddUpdateCommand cmd) {
- LogCodec codec = new LogCodec();
- synchronized (fos) {
- try {
- long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
- SolrInputDocument sdoc = cmd.getSolrInputDocument();
-
- if (pos == 0) { // TODO: needs to be changed if we start writing a header first
- addGlobalStrings(sdoc.getFieldNames());
- pos = start + fos.size();
- }
-
- /***
- System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
- if (pos != fos.size()) {
- throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
- }
- ***/
-
- codec.init(fos);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.ADD); // should just take one byte
- codec.writeLong(cmd.getVersion());
- codec.writeSolrInputDocument(cmd.getSolrInputDocument());
- // fos.flushBuffer(); // flush later
-
+ SolrParams params = new ModifiableSolrParams();
+ long commitVersion = 0;
+ for(;;) {
+ Object o = tlogReader.readNext();
+ if (o == null) break;
+
+ // create a new request each time since the update handler and core could change
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+
+ // TODO: race? This core could close on us if it was reloaded
+
+ try {
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ {
+ // byte[] idBytes = (byte[]) entry.get(2);
+ SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY);
+ uhandler.addDoc(cmd);
+ break;
+ }
+ case UpdateLog.DELETE:
+ {
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY);
+ uhandler.delete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY:
+ {
+ // TODO
+ break;
+ }
+
+ case UpdateLog.COMMIT:
+ {
+ // TODO
+ commitVersion = version;
+ break;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+ } catch (IOException ex) {
+
+ } catch (ClassCastException cl) {
+ uhandler.log.warn("Corrupt log", cl);
+ // would be caused by a corrupt transaction log
+ } catch (Exception ex) {
+ uhandler.log.warn("Exception replaying log", ex);
- return pos;
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
- }
-
- public long writeDelete(DeleteUpdateCommand cmd) {
- LogCodec codec = new LogCodec();
- synchronized (fos) {
- try {
- long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
- if (pos == 0) {
- writeLogHeader(codec);
- pos = start + fos.size();
+ // something wrong with the request?
}
- codec.init(fos);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.DELETE); // should just take one byte
- codec.writeLong(cmd.getVersion());
- BytesRef br = cmd.getIndexedId();
- codec.writeByteArray(br.bytes, br.offset, br.length);
- // fos.flushBuffer(); // flush later
- return pos;
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
- }
- }
+ tlogReader.close();
- public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
- LogCodec codec = new LogCodec();
- synchronized (fos) {
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+ CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+ cmd.setVersion(commitVersion);
+ cmd.softCommit = false;
+ cmd.waitSearcher = false;
+ cmd.setFlags(UpdateCommand.REPLAY);
try {
- long pos = start + fos.size(); // if we had flushed, this should be equal to channel.position()
- if (pos == 0) {
- writeLogHeader(codec);
- pos = start + fos.size();
- }
- codec.init(fos);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
- codec.writeLong(cmd.getVersion());
- codec.writeStr(cmd.query);
- // fos.flushBuffer(); // flush later
- return pos;
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
- }
-
- /* This method is thread safe */
- public Object lookup(long pos) {
- try {
- // make sure any unflushed buffer has been flushed
- synchronized (fos) {
- // TODO: optimize this by keeping track of what we have flushed up to
- fos.flushBuffer();
- /***
- System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
- if (fos.size() != raf.length() || pos >= fos.size() ) {
- throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
- }
- ***/
+ uhandler.commit(cmd);
+ } catch (IOException ex) {
+ uhandler.log.error("Replay exception: final commit.", ex);
}
+ tlogReader.delete();
- ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
- LogCodec codec = new LogCodec();
- return codec.readVal(fis);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
+ uhandler.core.log.warn("Ending log replay " + tlogReader);
- public void incref() {
- refcount.incrementAndGet();
- }
-
- public void decref() {
- if (refcount.decrementAndGet() == 0) {
- close();
}
}
- private void close() {
- try {
- fos.flush();
- fos.close();
- if (deleteOnClose) {
- tlogFile.delete();
- }
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
-
- public String toString() {
- return tlogFile.toString();
- }
+ static ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
-class ChannelFastInputStream extends FastInputStream {
- FileChannel ch;
- long chPosition;
-
- public ChannelFastInputStream(FileChannel ch, long chPosition) {
- super(null);
- this.ch = ch;
- this.chPosition = chPosition;
- }
-
- @Override
- public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
- ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
- int ret = ch.read(bb, chPosition);
- if (ret >= 0) {
- chPosition += ret;
- }
- return ret;
- }
-
- @Override
- public void close() throws IOException {
- ch.close();
- }
-}
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1201748&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Mon Nov 14 15:24:20 2011
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Log Format: List{Operation, Version, ...}
+ * ADD, VERSION, DOC
+ * DELETE, VERSION, ID_BYTES
+ * DELETE_BY_QUERY, VERSION, String
+ *
+ * TODO: keep two files, one for [operation, version, id] and the other for the actual
+ * document data. That way we could throw away document log files more readily
+ * while retaining the smaller operation log files longer (and we can retrieve
+ * the stored fields from the latest documents from the index).
+ *
+ * This would require keeping all source fields stored of course.
+ *
+ * This would also allow to not log document data for requests with commit=true
+ * in them (since we know that if the request succeeds, all docs will be committed)
+ *
+ */
+public class TransactionLog {
+
+ public final static String END_MESSAGE="SOLR_TLOG_END";
+
+ long id;
+ File tlogFile;
+ RandomAccessFile raf;
+ FileChannel channel;
+ OutputStream os;
+ FastOutputStream fos;
+ InputStream is;
+
+ volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
+
+ AtomicInteger refcount = new AtomicInteger(1);
+ Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
+ List<String> globalStringList = new ArrayList<String>();
+
+ // write a BytesRef as a byte array
+ JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
+ @Override
+ public Object resolve(Object o, JavaBinCodec codec) throws IOException {
+ if (o instanceof BytesRef) {
+ BytesRef br = (BytesRef)o;
+ codec.writeByteArray(br.bytes, br.offset, br.length);
+ return null;
+ }
+ return o;
+ }
+ };
+
+ public class LogCodec extends JavaBinCodec {
+ public LogCodec() {
+ super(resolver);
+ }
+
+ @Override
+ public void writeExternString(String s) throws IOException {
+ if (s == null) {
+ writeTag(NULL);
+ return;
+ }
+
+ // no need to synchronize globalStringMap - it's only updated before the first record is written to the log
+ Integer idx = globalStringMap.get(s);
+ if (idx == null) {
+ // write a normal string
+ writeStr(s);
+ } else {
+ // write the extern string
+ writeTag(EXTERN_STRING, idx);
+ }
+ }
+
+ @Override
+ public String readExternString(FastInputStream fis) throws IOException {
+ int idx = readSize(fis);
+ if (idx != 0) {// idx != 0 is the index of the extern string
+ // no need to synchronize globalStringList - it's only updated before the first record is written to the log
+ return globalStringList.get(idx - 1);
+ } else {// idx == 0 means it has a string value
+ // this shouldn't happen with this codec subclass.
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
+ }
+ }
+
+
+ }
+
+ public long writeData(Object o) {
+ LogCodec codec = new LogCodec();
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ codec.init(fos);
+ codec.writeVal(o);
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ TransactionLog(File tlogFile, Collection<String> globalStrings) {
+ try {
+ this.tlogFile = tlogFile;
+ raf = new RandomAccessFile(this.tlogFile, "rw");
+ long start = raf.length();
+ assert start==0;
+ if (start > 0) {
+ raf.setLength(0);
+ }
+ // System.out.println("###start= "+start);
+ channel = raf.getChannel();
+ os = Channels.newOutputStream(channel);
+ fos = FastOutputStream.wrap(os);
+ addGlobalStrings(globalStrings);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ private void addGlobalStrings(Collection<String> strings) {
+ if (strings == null) return;
+ int origSize = globalStringMap.size();
+ for (String s : strings) {
+ Integer idx = null;
+ if (origSize > 0) {
+ idx = globalStringMap.get(s);
+ }
+ if (idx != null) continue; // already in list
+ globalStringList.add(s);
+ globalStringMap.put(s, globalStringList.size());
+ }
+ assert globalStringMap.size() == globalStringList.size();
+ }
+
+ Collection<String> getGlobalStrings() {
+ synchronized (fos) {
+ return new ArrayList<String>(globalStringList);
+ }
+ }
+
+ private void writeLogHeader(LogCodec codec) throws IOException {
+ Map header = new LinkedHashMap<String,Object>();
+ header.put("SOLR_TLOG",1); // a magic string + version number
+ header.put("strings",globalStringList);
+ codec.marshal(header, fos);
+ }
+
+
+ public long write(AddUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (fos) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ SolrInputDocument sdoc = cmd.getSolrInputDocument();
+
+ if (pos == 0) { // TODO: needs to be changed if we start writing a header first
+ addGlobalStrings(sdoc.getFieldNames());
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+
+ /***
+ System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+ if (pos != fos.size()) {
+ throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
+ }
+ ***/
+
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.ADD); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ // fos.flushBuffer(); // flush later
+
+
+
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+ public long writeDelete(DeleteUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (fos) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.DELETE); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ BytesRef br = cmd.getIndexedId();
+ codec.writeByteArray(br.bytes, br.offset, br.length);
+ // fos.flushBuffer(); // flush later
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+ public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (fos) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeStr(cmd.query);
+ // fos.flushBuffer(); // flush later
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+
+ public long writeCommit(CommitUpdateCommand cmd) {
+ LogCodec codec = new LogCodec();
+ synchronized (fos) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.COMMIT); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeStr(END_MESSAGE); // ensure these bytes are the last in the file
+ // fos.flushBuffer(); // flush later
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+
+ /* This method is thread safe */
+ public Object lookup(long pos) {
+ try {
+ // make sure any unflushed buffer has been flushed
+ synchronized (fos) {
+ // TODO: optimize this by keeping track of what we have flushed up to
+ fos.flushBuffer();
+ /***
+ System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+ if (fos.size() != raf.length() || pos >= fos.size() ) {
+ throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+ }
+ ***/
+ }
+
+ ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
+ LogCodec codec = new LogCodec();
+ return codec.readVal(fis);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public void incref() {
+ refcount.incrementAndGet();
+ }
+
+ public void decref() {
+ if (refcount.decrementAndGet() == 0) {
+ close();
+ }
+ }
+
+ public void finish(UpdateLog.SyncLevel syncLevel) {
+ if (syncLevel == UpdateLog.SyncLevel.NONE) return;
+ try {
+ synchronized (fos) {
+ fos.flushBuffer();
+ }
+
+ if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
+ // Since fsync is outside of synchronized block, we can end up with a partial
+ // last record on power failure (which is OK, and does not represent an error...
+ // we just need to be aware of it when reading).
+ raf.getFD().sync();
+ }
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ private void close() {
+ try {
+ fos.flush();
+ fos.close();
+ if (deleteOnClose) {
+ tlogFile.delete();
+ }
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public String toString() {
+ return tlogFile.toString();
+ }
+
+}
+
+class ChannelFastInputStream extends FastInputStream {
+ FileChannel ch;
+ long chPosition;
+
+ public ChannelFastInputStream(FileChannel ch, long chPosition) {
+ super(null);
+ this.ch = ch;
+ this.chPosition = chPosition;
+ }
+
+ @Override
+ public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
+ ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
+ int ret = ch.read(bb, chPosition);
+ if (ret >= 0) {
+ chPosition += ret;
+ }
+ return ret;
+ }
+
+ @Override
+ public void close() throws IOException {
+ ch.close();
+ }
+}
+
+
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java?rev=1201748&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLogReader.java Mon Nov 14 15:24:20 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/** Single threaded transaction log reader */
+public class TransactionLogReader {
+ private static byte[] END_MESSAGE_BYTES;
+ static {
+ try {
+ END_MESSAGE_BYTES = TransactionLog.END_MESSAGE.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // impossible
+ }
+ }
+
+ private File tlogFile;
+ private RandomAccessFile raf;
+ private FileChannel channel;
+ private FastInputStream fis;
+ private boolean completed;
+ private LogCodec codec;
+
+ private Map header;
+ private List<String> globalStringList;
+
+
+ public class LogCodec extends JavaBinCodec {
+ public LogCodec() {
+ }
+
+ @Override
+ public String readExternString(FastInputStream fis) throws IOException {
+ int idx = readSize(fis);
+ if (idx != 0) {// idx != 0 is the index of the extern string
+ // no need to synchronize globalStringList - it's only updated before the first record is written to the log
+ return globalStringList.get(idx - 1);
+ } else {// idx == 0 means it has a string value
+ // this shouldn't happen with this codec subclass.
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
+ }
+ }
+ }
+
+
+ public TransactionLogReader(File tlogFile) {
+ try {
+ this.tlogFile = tlogFile;
+ raf = new RandomAccessFile(tlogFile,"r");
+ byte[] end = new byte[END_MESSAGE_BYTES.length];
+ long size = raf.length();
+ completed = false;
+ if (size >= end.length) {
+ raf.seek(size - end.length);
+ raf.readFully(end);
+ completed = Arrays.equals(end, END_MESSAGE_BYTES);
+ }
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ /** did the file end with END_MESSAGE_BYTES, implying that everything here was committed? */
+ public boolean completed() {
+ return completed;
+ }
+
+ public Map readHeader() {
+ if (header != null) return header;
+ try {
+ raf.seek(0);
+ channel = raf.getChannel();
+ InputStream is = Channels.newInputStream(channel);
+ fis = new FastInputStream(is);
+ codec = new LogCodec();
+ header = (Map)codec.unmarshal(fis);
+
+ // needed to read other records
+ globalStringList = (List<String>)header.get("strings");
+
+ return header;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log", e);
+ }
+ }
+
+ public Object readNext() {
+ try {
+ readHeader();
+ if (fis.peek() == -1) return null; // EOF
+ return codec.readVal(fis);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log", e);
+ }
+ }
+
+ public void close() {
+ try {
+ raf.close();
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log", e);
+ }
+ }
+
+ public void delete() {
+ tlogFile.delete();
+ }
+
+ @Override
+ public String toString() {
+ return "TransactionLogReader{"+"file="+tlogFile+"}";
+ }
+}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Mon Nov 14 15:24:20 2011
@@ -28,6 +28,9 @@ import org.apache.solr.request.SolrQuery
protected final SolrQueryRequest req;
protected final String commandName;
protected long version;
+ protected int flags;
+
+ public static int REPLAY = 0x00000001; // update command is from replaying a log.
public UpdateCommand(String commandName, SolrQueryRequest req) {
this.req = req;
@@ -45,4 +48,12 @@ import org.apache.solr.request.SolrQuery
public void setVersion(long version) {
this.version = version;
}
+
+ public void setFlags(int flags) {
+ this.flags = flags;
+ }
+
+ public int getFlags() {
+ return flags;
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Mon Nov 14 15:24:20 2011
@@ -23,9 +23,12 @@ import org.apache.solr.util.plugin.Plugi
/** @lucene.experimental */
public abstract class UpdateLog implements PluginInfoInitialized {
- public static final int ADD = 0x00;
- public static final int DELETE = 0x01;
- public static final int DELETE_BY_QUERY = 0x02;
+ public enum SyncLevel { NONE, FLUSH, FSYNC }
+
+ public static final int ADD = 0x01;
+ public static final int DELETE = 0x02;
+ public static final int DELETE_BY_QUERY = 0x03;
+ public static final int COMMIT = 0x04;
public abstract void init(UpdateHandler uhandler, SolrCore core);
public abstract void add(AddUpdateCommand cmd);
@@ -39,4 +42,6 @@ public abstract class UpdateLog implemen
public abstract Long lookupVersion(BytesRef indexedId);
public abstract void close();
public abstract VersionInfo getVersionInfo();
+ public abstract void finish(SyncLevel syncLevel);
+ public abstract boolean recoverFromLog();
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java Mon Nov 14 15:24:20 2011
@@ -21,13 +21,7 @@ import java.io.IOException;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.DocumentBuilder;
-import org.apache.solr.update.MergeIndexesCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
-import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.*;
/**
@@ -49,6 +43,8 @@ class RunUpdateProcessor extends UpdateR
private final SolrQueryRequest req;
private final UpdateHandler updateHandler;
+ private boolean changesSinceCommit = false;
+
public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
super( next );
this.req = req;
@@ -59,6 +55,7 @@ class RunUpdateProcessor extends UpdateR
public void processAdd(AddUpdateCommand cmd) throws IOException {
updateHandler.addDoc(cmd);
super.processAdd(cmd);
+ changesSinceCommit = true;
}
@Override
@@ -70,6 +67,7 @@ class RunUpdateProcessor extends UpdateR
updateHandler.deleteByQuery(cmd);
}
super.processDelete(cmd);
+ changesSinceCommit = true;
}
@Override
@@ -83,6 +81,7 @@ class RunUpdateProcessor extends UpdateR
{
updateHandler.commit(cmd);
super.processCommit(cmd);
+ changesSinceCommit = false;
}
/**
@@ -93,6 +92,16 @@ class RunUpdateProcessor extends UpdateR
{
updateHandler.rollback(cmd);
super.processRollback(cmd);
+ changesSinceCommit = false;
+ }
+
+
+ @Override
+ public void finish() throws IOException {
+ if (changesSinceCommit) {
+ updateHandler.getUpdateLog().finish(null);
+ }
+ super.finish();
}
}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java?rev=1201748&r1=1201747&r2=1201748&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java Mon Nov 14 15:24:20 2011
@@ -55,6 +55,15 @@ public class FastInputStream extends Inp
return buf[pos++] & 0xff;
}
+ public int peek() throws IOException {
+ if (pos >= end) {
+ refill();
+ if (pos >= end) return -1;
+ }
+ return buf[pos] & 0xff;
+ }
+
+
public int readUnsignedByte() throws IOException {
if (pos >= end) {
refill();