You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2012/01/25 21:32:53 UTC
svn commit: r1235919 [8/12] - in /lucene/dev/branches/lucene3661: ./
dev-tools/eclipse/ dev-tools/idea/lucene/contrib/ dev-tools/maven/
dev-tools/maven/solr/core/ dev-tools/maven/solr/solrj/ lucene/
lucene/contrib/ lucene/contrib/sandbox/src/test/org/a...
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java Wed Jan 25 20:32:44 2012
@@ -108,7 +108,7 @@ public class SolrIndexSearcher extends I
private final Collection<String> fieldNames;
private Collection<String> storedHighlightFieldNames;
private DirectoryFactory directoryFactory;
-
+
public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, SolrIndexConfig config, String name, boolean enableCache, DirectoryFactory directoryFactory) throws IOException {
// we don't need to reserve the directory because we get it from the factory
this(core, schema,name, core.getIndexReaderFactory().newReader(directoryFactory.get(path, config.lockType)), true, enableCache, false, directoryFactory);
@@ -133,6 +133,8 @@ public class SolrIndexSearcher extends I
if (dir instanceof FSDirectory) {
FSDirectory fsDirectory = (FSDirectory) dir;
indexDir = fsDirectory.getDirectory().getAbsolutePath();
+ } else {
+ log.warn("WARNING: Directory impl does not support setting indexDir: " + dir.getClass().getName());
}
this.closeReader = closeReader;
@@ -569,6 +571,37 @@ public class SolrIndexSearcher extends I
return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
}
+ /** lookup the docid by the unique key field, and return the id *within* the leaf reader in the low 32 bits, and the index of the leaf reader in the high 32 bits.
+ * -1 is returned if not found.
+ * @lucene.internal
+ */
+ public long lookupId(BytesRef idBytes) throws IOException {
+ String field = schema.getUniqueKeyField().getName();
+ final AtomicReaderContext[] leaves = leafContexts;
+
+
+ for (int i=0; i<leaves.length; i++) {
+ final AtomicReaderContext leaf = leaves[i];
+ final IndexReader reader = leaf.reader;
+
+ final Fields fields = reader.fields();
+ if (fields == null) continue;
+
+ final Bits liveDocs = reader.getLiveDocs();
+
+ final DocsEnum docs = reader.termDocsEnum(liveDocs, field, idBytes, false);
+
+ if (docs == null) continue;
+ int id = docs.nextDoc();
+ if (id == DocIdSetIterator.NO_MORE_DOCS) continue;
+ assert docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS;
+
+ return (((long)i) << 32) | id;
+ }
+
+ return -1;
+ }
+
/**
* Compute and cache the DocSet that matches a query.
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Wed Jan 25 20:32:44 2012
@@ -62,10 +62,10 @@ public class SolrDispatchFilter implemen
{
final Logger log = LoggerFactory.getLogger(SolrDispatchFilter.class);
- protected CoreContainer cores;
+ protected volatile CoreContainer cores;
+
protected String pathPrefix = null; // strip this from the beginning of a path
protected String abortErrorMessage = null;
- protected String solrConfigFilename = null;
protected final Map<SolrConfig, SolrRequestParsers> parsers = new WeakHashMap<SolrConfig, SolrRequestParsers>();
protected final SolrRequestParsers adminRequestParser;
@@ -100,6 +100,10 @@ public class SolrDispatchFilter implemen
log.info("SolrDispatchFilter.init() done");
}
+
+ public CoreContainer getCores() {
+ return cores;
+ }
/** Method to override to change how CoreContainer initialization is performed. */
protected CoreContainer.Initializer createInitializer() {
@@ -118,7 +122,13 @@ public class SolrDispatchFilter implemen
((HttpServletResponse)response).sendError( 500, abortErrorMessage );
return;
}
-
+
+ if (this.cores == null) {
+ ((HttpServletResponse)response).sendError( 403, "Server is shutting down" );
+ return;
+ }
+ CoreContainer cores = this.cores;
+
if( request instanceof HttpServletRequest) {
HttpServletRequest req = (HttpServletRequest)request;
HttpServletResponse resp = (HttpServletResponse)response;
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed Jan 25 20:32:44 2012
@@ -45,13 +45,20 @@ public class AddUpdateCommand extends Up
public int commitWithin = -1;
public AddUpdateCommand(SolrQueryRequest req) {
- super("add", req);
+ super(req);
}
+ @Override
+ public String name() {
+ return "add";
+ }
+
/** Reset state to reuse this object with a different document in the same request */
public void clear() {
solrDoc = null;
indexedId = null;
+ updateTerm = null;
+ version = 0;
}
public SolrInputDocument getSolrInputDocument() {
@@ -91,6 +98,10 @@ public class AddUpdateCommand extends Up
return indexedId;
}
+ public void setIndexedId(BytesRef indexedId) {
+ this.indexedId = indexedId;
+ }
+
public String getPrintableId() {
IndexSchema schema = req.getSchema();
SchemaField sf = schema.getUniqueKeyField();
@@ -105,10 +116,11 @@ public class AddUpdateCommand extends Up
@Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
- if (indexedId !=null) sb.append("id=").append(indexedId);
+ StringBuilder sb = new StringBuilder(super.toString());
+ if (indexedId != null) sb.append(",id=").append(indexedId);
if (!overwrite) sb.append(",overwrite=").append(overwrite);
+ if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+ sb.append('}');
return sb.toString();
}
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitTracker.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitTracker.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitTracker.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitTracker.java Wed Jan 25 20:32:44 2012
@@ -91,6 +91,14 @@ final class CommitTracker implements Run
public void scheduleCommitWithin(long commitMaxTime) {
_scheduleCommitWithin(commitMaxTime);
}
+
+ private void _scheduleCommitWithinIfNeeded(long commitWithin) {
+ long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound;
+
+ if (ctime > 0) {
+ _scheduleCommitWithin(ctime);
+ }
+ }
private void _scheduleCommitWithin(long commitMaxTime) {
if (commitMaxTime <= 0) return;
@@ -139,11 +147,14 @@ final class CommitTracker implements Run
}
// maxTime-triggered autoCommit
- long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound;
-
- if (ctime > 0) {
- _scheduleCommitWithin(ctime);
- }
+ _scheduleCommitWithinIfNeeded(commitWithin);
+ }
+
+ /**
+ * Indicate that documents have been deleted
+ */
+ public void deletedDocument( int commitWithin ) {
+ _scheduleCommitWithinIfNeeded(commitWithin);
}
/** Inform tracker that a commit has occurred */
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/CommitUpdateCommand.java Wed Jan 25 20:32:44 2012
@@ -37,16 +37,21 @@ public class CommitUpdateCommand extends
public int maxOptimizeSegments = 1;
public CommitUpdateCommand(SolrQueryRequest req, boolean optimize) {
- super("commit", req);
+ super(req);
this.optimize=optimize;
}
+
+ @Override
+ public String name() {
+ return "commit";
+ }
+
@Override
public String toString() {
- return prepareCommit ? "prepareCommit" :
- ("commit(optimize="+optimize
+ return super.toString() + ",optimize="+optimize
+",waitSearcher="+waitSearcher
+",expungeDeletes="+expungeDeletes
+",softCommit="+softCommit
- +')');
+ +'}';
}
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Wed Jan 25 20:32:44 2012
@@ -20,14 +20,21 @@ package org.apache.solr.update;
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
+import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
public final class DefaultSolrCoreState extends SolrCoreState {
+
+ private final Object recoveryLock = new Object();
private int refCnt = 1;
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
+ private boolean recoveryRunning;
+ private RecoveryStrategy recoveryStrat;
+ private boolean closed = false;
+
public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
this.directoryFactory = directoryFactory;
}
@@ -50,14 +57,23 @@ public final class DefaultSolrCoreState
}
@Override
- public synchronized void decref() throws IOException {
- refCnt--;
- if (refCnt == 0) {
- if (indexWriter != null) {
- indexWriter.close();
+ public void decref(IndexWriterCloser closer) throws IOException {
+ boolean cancelRecovery = false;
+ synchronized (this) {
+ refCnt--;
+ if (refCnt == 0) {
+ if (closer != null) {
+ closer.closeWriter(indexWriter);
+ } else if (indexWriter != null) {
+ indexWriter.close();
+ }
+ directoryFactory.close();
+ closed = true;
+ cancelRecovery = true;
}
- directoryFactory.close();
}
+ // don't wait for this in the sync block
+ if (cancelRecovery) cancelRecovery();
}
@Override
@@ -85,5 +101,43 @@ public final class DefaultSolrCoreState
public DirectoryFactory getDirectoryFactory() {
return directoryFactory;
}
+
+ @Override
+ public void doRecovery(SolrCore core) {
+ cancelRecovery();
+ synchronized (recoveryLock) {
+ while (recoveryRunning) {
+ try {
+ recoveryLock.wait(1000);
+ } catch (InterruptedException e) {
+
+ }
+ if (closed) return;
+ }
+
+ recoveryStrat = new RecoveryStrategy(core);
+ recoveryStrat.start();
+ recoveryRunning = true;
+ }
+
+ }
+
+ @Override
+ public void cancelRecovery() {
+ synchronized (recoveryLock) {
+ if (recoveryStrat != null) {
+ recoveryStrat.close();
+
+ try {
+ recoveryStrat.join();
+ } catch (InterruptedException e) {
+
+ }
+
+ recoveryRunning = false;
+ recoveryLock.notifyAll();
+ }
+ }
+ }
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Wed Jan 25 20:32:44 2012
@@ -18,6 +18,7 @@
package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRef;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema;
@@ -29,17 +30,28 @@ import org.apache.solr.schema.SchemaFiel
public class DeleteUpdateCommand extends UpdateCommand {
public String id; // external (printable) id, for delete-by-id
public String query; // query string for delete-by-query
- private BytesRef indexedId;
+ public BytesRef indexedId;
+ public int commitWithin = -1;
public DeleteUpdateCommand(SolrQueryRequest req) {
- super("delete", req);
+ super(req);
+ }
+
+ @Override
+ public String name() {
+ return "delete";
+ }
+
+ public boolean isDeleteById() {
+ return query == null;
}
public void clear() {
id = null;
query = null;
indexedId = null;
+ version = 0;
}
/** Returns the indexed ID for this delete. The returned BytesRef is retained across multiple calls, and should not be modified. */
@@ -55,13 +67,46 @@ public class DeleteUpdateCommand extends
return indexedId;
}
+ public String getId() {
+ if (id == null && indexedId != null) {
+ IndexSchema schema = req.getSchema();
+ SchemaField sf = schema.getUniqueKeyField();
+ if (sf != null) {
+ CharsRef ref = new CharsRef();
+ sf.getType().indexedToReadable(indexedId, ref);
+ id = ref.toString();
+ }
+ }
+ return id;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public void setIndexedId(BytesRef indexedId) {
+ this.indexedId = indexedId;
+ this.id = null;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ this.indexedId = null;
+ }
@Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
- if (id!=null) sb.append("id=").append(id);
- else sb.append("query=`").append(query).append('`');
- return sb.toString();
+ StringBuilder sb = new StringBuilder(super.toString());
+ if (id!=null) sb.append(",id=").append(getId());
+ if (indexedId!=null) sb.append(",indexedId=").append(getId());
+ if (query != null) sb.append(",query=`").append(query).append('`');
+ sb.append(",commitWithin=").append(commitWithin);
+ sb.append('}');
+ return sb.toString();
}
+
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Wed Jan 25 20:32:44 2012
@@ -32,6 +32,7 @@ import org.apache.lucene.document.Docume
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
+import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
@@ -45,8 +46,11 @@ import org.apache.solr.common.util.Named
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.FunctionRangeQuery;
import org.apache.solr.search.QParser;
-import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.search.QueryUtils;
+import org.apache.solr.search.function.ValueSourceRangeFilter;
/**
* TODO: add soft commitWithin support
@@ -54,8 +58,8 @@ import org.apache.solr.search.SolrIndexS
* <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added
* directly to the main Lucene index as opposed to adding to a separate smaller index.
*/
-public class DirectUpdateHandler2 extends UpdateHandler {
- protected SolrCoreState solrCoreState;
+public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState.IndexWriterCloser {
+ protected final SolrCoreState solrCoreState;
protected final Lock commitLock = new ReentrantLock();
// stats
@@ -97,7 +101,7 @@ public class DirectUpdateHandler2 extend
public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) throws IOException {
super(core);
if (updateHandler instanceof DirectUpdateHandler2) {
- this.solrCoreState = ((DirectUpdateHandler2)updateHandler).solrCoreState;
+ this.solrCoreState = ((DirectUpdateHandler2) updateHandler).solrCoreState;
} else {
// the impl has changed, so we cannot use the old state - decref it
updateHandler.decref();
@@ -115,7 +119,9 @@ public class DirectUpdateHandler2 extend
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
this.ulog = updateHandler.getUpdateLog();
- this.ulog.init(this, core);
+ if (this.ulog != null) {
+ this.ulog.init(this, core);
+ }
}
private void deleteAll() throws IOException {
@@ -170,14 +176,17 @@ public class DirectUpdateHandler2 extend
// allow duplicates
writer.addDocument(cmd.getLuceneDocument());
}
+
// Add to the transaction log *after* successfully adding to the index, if there was no error.
// This ordering ensures that if we log it, it's definitely been added to the the index.
// This also ensures that if a commit sneaks in-between, that we know everything in a particular
// log version was definitely committed.
- ulog.add(cmd);
+ if (ulog != null) ulog.add(cmd);
- softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
- commitTracker.addedDocument( cmd.commitWithin );
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+ commitTracker.addedDocument( cmd.commitWithin );
+ softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
+ }
rc = 1;
} finally {
@@ -203,17 +212,20 @@ public class DirectUpdateHandler2 extend
Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId());
// SolrCore.verbose("deleteDocuments",deleteTerm,writer);
+ commitTracker.deletedDocument( cmd.commitWithin );
writer.deleteDocuments(deleteTerm);
// SolrCore.verbose("deleteDocuments",deleteTerm,"DONE");
- ulog.delete(cmd);
-
- if (commitTracker.getTimeUpperBound() > 0) {
- commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
- }
-
- if (softCommitTracker.getTimeUpperBound() > 0) {
- softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+ if (ulog != null) ulog.delete(cmd);
+
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
+ if (commitTracker.getTimeUpperBound() > 0) {
+ commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
+ }
+
+ if (softCommitTracker.getTimeUpperBound() > 0) {
+ softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
+ }
}
}
@@ -226,21 +238,50 @@ public class DirectUpdateHandler2 extend
try {
Query q;
try {
+ // TODO: move this higher in the stack?
QParser parser = QParser.getParser(cmd.query, "lucene", cmd.req);
q = parser.getQuery();
+ q = QueryUtils.makeQueryable(q);
+
+ // peer-sync can cause older deleteByQueries to be executed and could
+ // delete newer documents. We prevent this by adding a clause restricting
+ // version.
+ if ((cmd.getFlags() & UpdateCommand.PEER_SYNC) != 0) {
+ BooleanQuery bq = new BooleanQuery();
+ bq.add(q, Occur.MUST);
+ SchemaField sf = core.getSchema().getField(VersionInfo.VERSION_FIELD);
+ ValueSource vs = sf.getType().getValueSource(sf, null);
+ ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.version)), true, true);
+ FunctionRangeQuery range = new FunctionRangeQuery(filt);
+ bq.add(range, Occur.MUST);
+ q = bq;
+ }
+
+
+
} catch (ParseException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
boolean delAll = MatchAllDocsQuery.class == q.getClass();
+
+ commitTracker.deletedDocument(cmd.commitWithin);
- if (delAll) {
- deleteAll();
- } else {
- solrCoreState.getIndexWriter(core).deleteDocuments(q);
- }
+ //
+ // synchronized to prevent deleteByQuery from running during the "open new searcher"
+ // part of a commit. DBQ needs to signal that a fresh reader will be needed for
+ // a realtime view of the index. When a new searcher is opened after a DBQ, that
+ // flag can be cleared. If those thing happen concurrently, it's not thread safe.
+ //
+ synchronized (this) {
+ if (delAll) {
+ deleteAll();
+ } else {
+ solrCoreState.getIndexWriter(core).deleteDocuments(q);
+ }
- ulog.deleteByQuery(cmd);
+ if (ulog != null) ulog.deleteByQuery(cmd);
+ }
madeIt = true;
@@ -342,7 +383,7 @@ public class DirectUpdateHandler2 extend
if (!cmd.softCommit) {
synchronized (this) { // sync is currently needed to prevent preCommit from being called between preSoft and postSoft... see postSoft comments.
- ulog.preCommit(cmd);
+ if (ulog != null) ulog.preCommit(cmd);
}
// SolrCore.verbose("writer.commit() start writer=",writer);
@@ -360,23 +401,23 @@ public class DirectUpdateHandler2 extend
}
- if (cmd.softCommit) {
- // ulog.preSoftCommit();
- synchronized (this) {
- ulog.preSoftCommit(cmd);
- core.getSearcher(true,false,waitSearcher, true);
- ulog.postSoftCommit(cmd);
- }
- // ulog.postSoftCommit();
- } else {
- synchronized (this) {
- ulog.preSoftCommit(cmd);
- core.getSearcher(true,false,waitSearcher);
- ulog.postSoftCommit(cmd);
- }
- ulog.postCommit(cmd); // postCommit currently means new searcher has also been opened
+ if (cmd.softCommit) {
+ // ulog.preSoftCommit();
+ synchronized (this) {
+ if (ulog != null) ulog.preSoftCommit(cmd);
+ core.getSearcher(true, false, waitSearcher, true);
+ if (ulog != null) ulog.postSoftCommit(cmd);
}
-
+ // ulog.postSoftCommit();
+ } else {
+ synchronized (this) {
+ if (ulog != null) ulog.preSoftCommit(cmd);
+ core.getSearcher(true, false, waitSearcher);
+ if (ulog != null) ulog.postSoftCommit(cmd);
+ }
+ if (ulog != null) ulog.postCommit(cmd); // postCommit currently means new searcher has
+ // also been opened
+ }
// reset commit tracking
@@ -415,25 +456,6 @@ public class DirectUpdateHandler2 extend
}
@Override
- public SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException {
-
- IndexReader currentReader = previousSearcher.getIndexReader();
- IndexReader newReader;
-
- IndexWriter writer = solrCoreState.getIndexWriter(core);
- // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
- newReader = IndexReader.openIfChanged(currentReader, writer, true);
- // SolrCore.verbose("reopen result", newReader);
-
- if (newReader == null) {
- currentReader.incRef();
- newReader = currentReader;
- }
-
- return new SolrIndexSearcher(core, schema, "main", newReader, true, true, true, core.getDirectoryFactory());
- }
-
- @Override
public void newIndexWriter() throws IOException {
solrCoreState.newIndexWriter(core);
}
@@ -487,12 +509,44 @@ public class DirectUpdateHandler2 extend
numDocsPending.set(0);
- solrCoreState.decref();
-
- log.info("closed " + this);
+ solrCoreState.decref(this);
}
+ public static boolean commitOnClose = true; // TODO: make this a real config option?
+
+ // IndexWriterCloser interface method - called from solrCoreState.decref(this)
+ @Override
+ public void closeWriter(IndexWriter writer) throws IOException {
+ commitLock.lock();
+ try {
+ if (!commitOnClose) {
+ if (writer != null) {
+ writer.rollback();
+ }
+
+ // we shouldn't close the transaction logs either, but leaving them open
+ // means we can't delete them on windows.
+ if (ulog != null) ulog.close();
+
+ return;
+ }
+
+ if (writer != null) {
+ writer.close();
+ }
+
+ // if the writer hits an exception, it's OK (and perhaps desirable)
+ // to not close the ulog?
+
+ // Closing the log currently deletes the log file.
+ // If this changes, we should record this as a "commit".
+ if (ulog != null) ulog.close();
+ } finally {
+ commitLock.unlock();
+ }
+ }
+
/////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info
/////////////////////////////////////////////////////////////////////
@@ -564,14 +618,15 @@ public class DirectUpdateHandler2 extend
return "DirectUpdateHandler2" + getStatistics();
}
- public SolrCoreState getIndexWriterProvider() {
+ @Override
+ public SolrCoreState getSolrCoreState() {
return solrCoreState;
}
@Override
public void decref() {
try {
- solrCoreState.decref();
+ solrCoreState.decref(this);
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/MergeIndexesCommand.java Wed Jan 25 20:32:44 2012
@@ -31,20 +31,25 @@ public class MergeIndexesCommand extends
public IndexReader[] readers;
public MergeIndexesCommand(IndexReader[] readers, SolrQueryRequest req) {
- super("mergeIndexes", req);
+ super(req);
this.readers = readers;
}
@Override
+ public String name() {
+ return "mergeIndexes";
+ }
+
+ @Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
+ StringBuilder sb = new StringBuilder(super.toString());
if (readers != null && readers.length > 0) {
sb.append(readers[0].directory());
for (int i = 1; i < readers.length; i++) {
sb.append(",").append(readers[i].directory());
}
}
+ sb.append('}');
return sb.toString();
}
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/RollbackUpdateCommand.java Wed Jan 25 20:32:44 2012
@@ -26,7 +26,16 @@ import org.apache.solr.request.SolrQuery
public class RollbackUpdateCommand extends UpdateCommand {
public RollbackUpdateCommand(SolrQueryRequest req) {
- super("rollback", req);
+ super(req);
}
+ @Override
+ public String name() {
+ return "rollback";
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + '}';
+ }
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Wed Jan 25 20:32:44 2012
@@ -49,11 +49,12 @@ public abstract class SolrCoreState {
/**
* Decrement the number of references to this state. When then number of
- * references hits 0, the state will close.
+ * references hits 0, the state will close. If an optional closer is
+ * passed, that will be used to close the writer.
*
* @throws IOException
*/
- public abstract void decref() throws IOException;
+ public abstract void decref(IndexWriterCloser closer) throws IOException;
/**
* Increment the number of references to this state.
@@ -73,5 +74,14 @@ public abstract class SolrCoreState {
* @return the {@link DirectoryFactory} that should be used.
*/
public abstract DirectoryFactory getDirectoryFactory();
+
+
+ public interface IndexWriterCloser {
+ public void closeWriter(IndexWriter writer) throws IOException;
+ }
+
+ public abstract void doRecovery(SolrCore core);
+ public abstract void cancelRecovery();
+
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Wed Jan 25 20:32:44 2012
@@ -17,6 +17,7 @@
package org.apache.solr.update;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.request.SolrQueryRequest;
@@ -24,17 +25,56 @@ import org.apache.solr.request.SolrQuery
*
*
*/
- public class UpdateCommand {
- protected final SolrQueryRequest req;
- protected final String commandName;
-
- public UpdateCommand(String commandName, SolrQueryRequest req) {
- this.req = req;
- this.commandName = commandName;
- }
+public abstract class UpdateCommand implements Cloneable {
+ protected SolrQueryRequest req;
+ protected long version;
+ protected int flags;
+
+ public static int BUFFERING = 0x00000001; // update command is being buffered.
+ public static int REPLAY = 0x00000002; // update command is from replaying a log.
+ public static int PEER_SYNC = 0x00000004; // update command is a missing update being provided by a peer.
+ public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
+
+ public UpdateCommand(SolrQueryRequest req) {
+ this.req = req;
+ }
+
+ public abstract String name();
+
+ @Override
+ public String toString() {
+ return name() + "{flags="+flags+",version="+version;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
+ public void setFlags(int flags) {
+ this.flags = flags;
+ }
+
+ public int getFlags() {
+ return flags;
+ }
+
+ public SolrQueryRequest getReq() {
+ return req;
+ }
+
+ public void setReq(SolrQueryRequest req) {
+ this.req = req;
+ }
- @Override
- public String toString() {
- return commandName;
+ @Override
+ public UpdateCommand clone() {
+ try {
+ return (UpdateCommand) super.clone();
+ } catch (CloneNotSupportedException e) {
+ return null;
}
}
+}
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Wed Jan 25 20:32:44 2012
@@ -88,12 +88,11 @@ public abstract class UpdateHandler impl
private void initLog() {
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
if (ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
- ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
- } else {
- ulog = new NullUpdateLog();
- ulog.init(null);
+ ulog = new UpdateLog();
+ ulog.init(ulogPluginInfo);
+ // ulog = core.createInitInstance(ulogPluginInfo, UpdateLog.class, "update log", "solr.NullUpdateLog");
+ ulog.init(this, core);
}
- ulog.init(this, core);
}
@@ -123,16 +122,7 @@ public abstract class UpdateHandler impl
parseEventListeners();
initLog();
}
-
- /**
- * Allows the UpdateHandler to create the SolrIndexSearcher after it
- * has issued a 'softCommit'.
- *
- * @param previousSearcher
- * @throws IOException
- */
- public abstract SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException;
-
+
/**
* Called when the Writer should be opened again - eg when replication replaces
* all of the index files.
@@ -141,7 +131,7 @@ public abstract class UpdateHandler impl
*/
public abstract void newIndexWriter() throws IOException;
- public abstract SolrCoreState getIndexWriterProvider();
+ public abstract SolrCoreState getSolrCoreState();
public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
public abstract void delete(DeleteUpdateCommand cmd) throws IOException;
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/UpdateLog.java Wed Jan 25 20:32:44 2012
@@ -18,23 +18,1051 @@
package org.apache.solr.update;
import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
+import org.apache.solr.update.processor.RunUpdateProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
/** @lucene.experimental */
-public abstract class UpdateLog implements PluginInfoInitialized {
- public static final int ADD = 0x00;
- public static final int DELETE = 0x01;
- public static final int DELETE_BY_QUERY = 0x02;
-
- public abstract void init(UpdateHandler uhandler, SolrCore core);
- public abstract void add(AddUpdateCommand cmd);
- public abstract void delete(DeleteUpdateCommand cmd);
- public abstract void deleteByQuery(DeleteUpdateCommand cmd);
- public abstract void preCommit(CommitUpdateCommand cmd);
- public abstract void postCommit(CommitUpdateCommand cmd);
- public abstract void preSoftCommit(CommitUpdateCommand cmd);
- public abstract void postSoftCommit(CommitUpdateCommand cmd);
- public abstract Object lookup(BytesRef indexedId);
- public abstract void close();
+public class UpdateLog implements PluginInfoInitialized {
+ public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
+ public boolean debug = log.isDebugEnabled();
+
+
+ public enum SyncLevel { NONE, FLUSH, FSYNC }
+ public enum State { REPLAYING, BUFFERING, APPLYING_BUFFERED, ACTIVE }
+
+ public static final int ADD = 0x01;
+ public static final int DELETE = 0x02;
+ public static final int DELETE_BY_QUERY = 0x03;
+ public static final int COMMIT = 0x04;
+
+ public static class RecoveryInfo {
+ public long positionOfStart;
+
+ public int adds;
+ public int deletes;
+ public int deleteByQuery;
+ public int errors;
+ }
+
+
+
+ public static String TLOG_NAME="tlog";
+
+ long id = -1;
+ private State state = State.ACTIVE;
+
+ private TransactionLog tlog;
+ private TransactionLog prevTlog;
+ private Deque<TransactionLog> logs = new LinkedList<TransactionLog>(); // list of recent logs, newest first
+ private TransactionLog newestLogOnStartup;
+ private int numOldRecords; // number of records in the recent logs
+
+ private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>();
+ private Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening
+ private Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening
+ private TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
+ private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
+
+ private final int numDeletesToKeep = 1000;
+ private final int numRecordsToKeep = 100;
+ // keep track of deletes only... this is not updated on an add
+ private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > numDeletesToKeep;
+ }
+ };
+
+ private String[] tlogFiles;
+ private File tlogDir;
+ private Collection<String> globalStrings;
+
+ private String dataDir;
+ private String lastDataDir;
+
+ private VersionInfo versionInfo;
+
+ private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
+
+ private volatile UpdateHandler uhandler; // a core reload can change this reference!
+ private volatile boolean cancelApplyBufferUpdate;
+
+
+ public static class LogPtr {
+ final long pointer;
+ final long version;
+
+ public LogPtr(long pointer, long version) {
+ this.pointer = pointer;
+ this.version = version;
+ }
+
+ public String toString() {
+ return "LogPtr(" + pointer + ")";
+ }
+ }
+
+
+ public VersionInfo getVersionInfo() {
+ return versionInfo;
+ }
+
+ public void init(PluginInfo info) {
+ dataDir = (String)info.initArgs.get("dir");
+ }
+
+ public void init(UpdateHandler uhandler, SolrCore core) {
+ if (dataDir == null || dataDir.length()==0) {
+ dataDir = core.getDataDir();
+ }
+
+ this.uhandler = uhandler;
+
+ if (dataDir.equals(lastDataDir)) {
+ // on a normal reopen, we currently shouldn't have to do anything
+ return;
+ }
+ lastDataDir = dataDir;
+ tlogDir = new File(dataDir, TLOG_NAME);
+ tlogDir.mkdirs();
+ tlogFiles = getLogList(tlogDir);
+ id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
+
+ TransactionLog oldLog = null;
+ for (String oldLogName : tlogFiles) {
+ File f = new File(tlogDir, oldLogName);
+ try {
+ oldLog = new TransactionLog( f, null, true );
+ addOldLog(oldLog);
+ } catch (Exception e) {
+ SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
+ f.delete();
+ }
+ }
+ newestLogOnStartup = oldLog;
+
+ versionInfo = new VersionInfo(uhandler, 256);
+ }
+
+ public File getLogDir() {
+ return tlogDir;
+ }
+
+ /* Takes over ownership of the log, keeping it until no longer needed
+ and then decrementing it's reference and dropping it.
+ */
+ private void addOldLog(TransactionLog oldLog) {
+ if (oldLog == null) return;
+
+ numOldRecords += oldLog.numRecords();
+
+ int currRecords = numOldRecords;
+
+ if (oldLog != tlog && tlog != null) {
+ currRecords += tlog.numRecords();
+ }
+
+ while (logs.size() > 0) {
+ TransactionLog log = logs.peekLast();
+ int nrec = log.numRecords();
+ // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+ // we already have the limit of 10 log files.
+ if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
+ currRecords -= nrec;
+ numOldRecords -= nrec;
+ logs.removeLast().decref(); // dereference so it will be deleted when no longer in use
+ continue;
+ }
+
+ break;
+ }
+
+ // don't incref... we are taking ownership from the caller.
+ logs.addFirst(oldLog);
+ }
+
+
+ public static String[] getLogList(File directory) {
+ final String prefix = TLOG_NAME+'.';
+ String[] names = directory.list(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith(prefix);
+ }
+ });
+ Arrays.sort(names);
+ return names;
+ }
+
+
+ public long getLastLogId() {
+ if (id != -1) return id;
+ if (tlogFiles.length == 0) return -1;
+ String last = tlogFiles[tlogFiles.length-1];
+ return Long.parseLong(last.substring(TLOG_NAME.length()+1));
+ }
+
+
+ public void add(AddUpdateCommand cmd) {
+ // don't log if we are replaying from another log
+ // TODO: we currently need to log to maintain correct versioning, rtg, etc
+ // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
+
+ synchronized (this) {
+ long pos = -1;
+
+ // don't log if we are replaying from another log
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog();
+ pos = tlog.write(cmd);
+ }
+
+ // TODO: in the future we could support a real position for a REPLAY update.
+ // Only currently would be useful for RTG while in recovery mode though.
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+ // only update our map if we're not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ map.put(cmd.getIndexedId(), ptr);
+ }
+
+ if (debug) {
+ log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+ public void delete(DeleteUpdateCommand cmd) {
+ BytesRef br = cmd.getIndexedId();
+
+ synchronized (this) {
+ long pos = -1;
+
+ // don't log if we are replaying from another log
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog();
+ pos = tlog.writeDelete(cmd);
+ }
+
+ LogPtr ptr = new LogPtr(pos, cmd.version);
+
+ // only update our map if we're not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ map.put(br, ptr);
+
+ oldDeletes.put(br, ptr);
+ }
+
+ if (debug) {
+ log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+ public void deleteByQuery(DeleteUpdateCommand cmd) {
+ synchronized (this) {
+ long pos = -1;
+ // don't log if we are replaying from another log
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog();
+ pos = tlog.writeDeleteByQuery(cmd);
+ }
+
+ // only change our caches if we are not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ // given that we just did a delete-by-query, we don't know what documents were
+ // affected and hence we must purge our caches.
+ map.clear();
+
+ // oldDeletes.clear();
+
+ // We must cause a new IndexReader to be opened before anything looks at these caches again
+ // so that a cache miss will read fresh data.
+ //
+ // TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands
+ try {
+ RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+ holder.decref();
+ } catch (Throwable e) {
+ SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+ }
+
+ }
+
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+ if (debug) {
+ log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+
+ private void newMap() {
+ prevMap2 = prevMap;
+ prevMapLog2 = prevMapLog;
+
+ prevMap = map;
+ prevMapLog = tlog;
+
+ map = new HashMap<BytesRef, LogPtr>();
+ }
+
+ private void clearOldMaps() {
+ prevMap = null;
+ prevMap2 = null;
+ }
+
+ public void preCommit(CommitUpdateCommand cmd) {
+ synchronized (this) {
+ if (debug) {
+ log.debug("TLOG: preCommit");
+ }
+
+ if (getState() != State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // if we aren't in the active state, and this isn't a replay
+ // from the recovery process, then we shouldn't mess with
+ // the current transaction log. This normally shouldn't happen
+ // as DistributedUpdateProcessor will prevent this. Commits
+ // that don't use the processor are possible though.
+ return;
+ }
+
+ // since we're changing the log, we must change the map.
+ newMap();
+
+ // since document additions can happen concurrently with commit, create
+ // a new transaction log first so that we know the old one is definitely
+ // in the index.
+ prevTlog = tlog;
+ tlog = null;
+ id++;
+
+ if (prevTlog != null) {
+ globalStrings = prevTlog.getGlobalStrings();
+ }
+
+ addOldLog(prevTlog);
+ }
+ }
+
+ public void postCommit(CommitUpdateCommand cmd) {
+ synchronized (this) {
+ if (debug) {
+ log.debug("TLOG: postCommit");
+ }
+ if (prevTlog != null) {
+ // if we made it through the commit, write a commit command to the log
+ // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
+ prevTlog.writeCommit(cmd);
+ // the old log list will decref when no longer needed
+ // prevTlog.decref();
+ prevTlog = null;
+ }
+ }
+ }
+
+ public void preSoftCommit(CommitUpdateCommand cmd) {
+ debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+
+ synchronized (this) {
+
+ if (!cmd.softCommit) return; // already handled this at the start of the hard commit
+ newMap();
+
+ // start adding documents to a new map since we won't know if
+ // any added documents will make it into this commit or not.
+ // But we do know that any updates already added will definitely
+ // show up in the latest reader after the commit succeeds.
+ map = new HashMap<BytesRef, LogPtr>();
+
+ if (debug) {
+ log.debug("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+ }
+ }
+ }
+
+ public void postSoftCommit(CommitUpdateCommand cmd) {
+ synchronized (this) {
+ // We can clear out all old maps now that a new searcher has been opened.
+ // This currently only works since DUH2 synchronizes around preCommit to avoid
+ // it being called in the middle of a preSoftCommit, postSoftCommit sequence.
+ // If this DUH2 synchronization were to be removed, preSoftCommit should
+ // record what old maps were created and only remove those.
+
+ if (debug) {
+ SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2));
+ }
+ clearOldMaps();
+ }
+ }
+
+ public Object lookup(BytesRef indexedId) {
+ LogPtr entry;
+ TransactionLog lookupLog;
+
+ synchronized (this) {
+ entry = map.get(indexedId);
+ lookupLog = tlog; // something found in "map" will always be in "tlog"
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ if (entry == null && prevMap != null) {
+ entry = prevMap.get(indexedId);
+ // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+ lookupLog = prevMapLog;
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ if (entry == null && prevMap2 != null) {
+ entry = prevMap2.get(indexedId);
+ // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+ lookupLog = prevMapLog2;
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+
+ if (entry == null) {
+ return null;
+ }
+ lookupLog.incref();
+ }
+
+ try {
+ // now do the lookup outside of the sync block for concurrency
+ return lookupLog.lookup(entry.pointer);
+ } finally {
+ lookupLog.decref();
+ }
+
+ }
+
+ // This method works like realtime-get... it only guarantees to return the latest
+ // version of the *completed* update. There can be updates in progress concurrently
+ // that have already grabbed higher version numbers. Higher level coordination or
+ // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
+ public Long lookupVersion(BytesRef indexedId) {
+ LogPtr entry;
+ TransactionLog lookupLog;
+
+ synchronized (this) {
+ entry = map.get(indexedId);
+ lookupLog = tlog; // something found in "map" will always be in "tlog"
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ if (entry == null && prevMap != null) {
+ entry = prevMap.get(indexedId);
+ // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+ lookupLog = prevMapLog;
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ if (entry == null && prevMap2 != null) {
+ entry = prevMap2.get(indexedId);
+ // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+ lookupLog = prevMapLog2;
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ }
+
+ if (entry != null) {
+ return entry.version;
+ }
+
+ // Now check real index
+ Long version = versionInfo.getVersionFromIndex(indexedId);
+
+ if (version != null) {
+ return version;
+ }
+
+ // We can't get any version info for deletes from the index, so if the doc
+ // wasn't found, check a cache of recent deletes.
+
+ synchronized (this) {
+ entry = oldDeletes.get(indexedId);
+ }
+
+ if (entry != null) {
+ return entry.version;
+ }
+
+ return null;
+ }
+
+ public void finish(SyncLevel syncLevel) {
+ if (syncLevel == null) {
+ syncLevel = defaultSyncLevel;
+ }
+ if (syncLevel == SyncLevel.NONE) {
+ return;
+ }
+
+ TransactionLog currLog;
+ synchronized (this) {
+ currLog = tlog;
+ if (currLog == null) return;
+ currLog.incref();
+ }
+
+ try {
+ currLog.finish(syncLevel);
+ } finally {
+ currLog.decref();
+ }
+ }
+
+ public Future<RecoveryInfo> recoverFromLog() {
+ recoveryInfo = new RecoveryInfo();
+ if (newestLogOnStartup == null) return null;
+
+ if (!newestLogOnStartup.try_incref()) return null; // log file was already closed
+
+ // now that we've incremented the reference, the log shouldn't go away.
+ try {
+ if (newestLogOnStartup.endsWithCommit()) {
+ newestLogOnStartup.decref();
+ return null;
+ }
+ } catch (IOException e) {
+ log.error("Error inspecting tlog " + newestLogOnStartup);
+ newestLogOnStartup.decref();
+ return null;
+ }
+
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+ LogReplayer replayer = new LogReplayer(newestLogOnStartup, false);
+
+ versionInfo.blockUpdates();
+ try {
+ state = State.REPLAYING;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+
+ return cs.submit(replayer, recoveryInfo);
+
+ }
+
+
+ private void ensureLog() {
+ if (tlog == null) {
+ String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+ try {
+ tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't open new tlog!", e);
+ }
+ }
+ }
+
+ public void close() {
+ synchronized (this) {
+ try {
+ recoveryExecutor.shutdownNow();
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ }
+
+ // Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions
+
+ if (prevTlog != null) {
+ prevTlog.deleteOnClose = false;
+ prevTlog.decref();
+ prevTlog.forceClose();
+ }
+ if (tlog != null) {
+ tlog.deleteOnClose = false;
+ tlog.decref();
+ tlog.forceClose();
+ }
+
+ for (TransactionLog log : logs) {
+ log.deleteOnClose = false;
+ log.decref();
+ log.forceClose();
+ }
+
+ }
+ }
+
+
+ static class Update {
+ TransactionLog log;
+ long version;
+ long pointer;
+ }
+
+ public class RecentUpdates {
+ Deque<TransactionLog> logList; // newest first
+ List<List<Update>> updateList;
+ HashMap<Long, Update> updates;
+ List<Update> deleteByQueryList;
+
+
+ public List<Long> getVersions(int n) {
+ List<Long> ret = new ArrayList(n);
+
+ for (List<Update> singleList : updateList) {
+ for (Update ptr : singleList) {
+ ret.add(ptr.version);
+ if (--n <= 0) return ret;
+ }
+ }
+
+ return ret;
+ }
+
+ public Object lookup(long version) {
+ Update update = updates.get(version);
+ if (update == null) return null;
+
+ return update.log.lookup(update.pointer);
+ }
+
+ /** Returns the list of deleteByQueries that happened after the given version */
+ public List<Object> getDeleteByQuery(long afterVersion) {
+ List<Object> result = new ArrayList<Object>(deleteByQueryList.size());
+ for (Update update : deleteByQueryList) {
+ if (Math.abs(update.version) > afterVersion) {
+ Object dbq = update.log.lookup(update.pointer);
+ result.add(dbq);
+ }
+ }
+ return result;
+ }
+
+ private void update() {
+ int numUpdates = 0;
+ updateList = new ArrayList<List<Update>>(logList.size());
+ deleteByQueryList = new ArrayList<Update>();
+ updates = new HashMap<Long,Update>(numRecordsToKeep);
+
+ for (TransactionLog oldLog : logList) {
+ List<Update> updatesForLog = new ArrayList<Update>();
+
+ TransactionLog.ReverseReader reader = null;
+ try {
+ reader = oldLog.getReverseReader();
+
+ while (numUpdates < numRecordsToKeep) {
+ Object o = reader.next();
+ if (o==null) break;
+ try {
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ // TODO: refactor this out so we get common error handling
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ case UpdateLog.DELETE:
+ case UpdateLog.DELETE_BY_QUERY:
+ Update update = new Update();
+ update.log = oldLog;
+ update.pointer = reader.position();
+ update.version = version;
+
+ updatesForLog.add(update);
+ updates.put(version, update);
+
+ if (oper == UpdateLog.DELETE_BY_QUERY) {
+ deleteByQueryList.add(update);
+ }
+
+ break;
+
+ case UpdateLog.COMMIT:
+ break;
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+ } catch (ClassCastException cl) {
+ log.warn("Unexpected log entry or corrupt log. Entry=" + o, cl);
+ // would be caused by a corrupt transaction log
+ } catch (Exception ex) {
+ log.warn("Exception reverse reading log", ex);
+ break;
+ }
+ }
+
+ } catch (IOException e) {
+ // failure to read a log record isn't fatal
+ log.error("Exception reading versions from log",e);
+ } finally {
+ if (reader != null) reader.close();
+ }
+
+ updateList.add(updatesForLog);
+ }
+
+ }
+
+ public void close() {
+ for (TransactionLog log : logList) {
+ log.decref();
+ }
+ }
+ }
+
+
+ public RecentUpdates getRecentUpdates() {
+ Deque<TransactionLog> logList;
+ synchronized (this) {
+ logList = new LinkedList<TransactionLog>(logs);
+ for (TransactionLog log : logList) {
+ log.incref();
+ }
+ if (prevTlog != null) {
+ prevTlog.incref();
+ logList.addFirst(prevTlog);
+ }
+ if (tlog != null) {
+ tlog.incref();
+ logList.addFirst(tlog);
+ }
+ }
+
+ // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
+ // one of the updates I originally handed out fell off the list). Over-request?
+ RecentUpdates recentUpdates = new RecentUpdates();
+ recentUpdates.logList = logList;
+ recentUpdates.update();
+
+ return recentUpdates;
+ }
+
+ public void bufferUpdates() {
+ // recovery trips this assert under some race - even when
+ // it checks the state first
+ // assert state == State.ACTIVE;
+
+ recoveryInfo = new RecoveryInfo();
+
+ // block all updates to eliminate race conditions
+ // reading state and acting on it in the update processor
+ versionInfo.blockUpdates();
+ try {
+ if (state != State.ACTIVE) return;
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting to buffer updates. " + this);
+ }
+
+ // since we blocked updates, this synchronization shouldn't strictly be necessary.
+ synchronized (this) {
+ recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+ }
+
+ state = State.BUFFERING;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+ }
+
+ /** Returns true if we were able to drop buffered updates and return to the ACTIVE state */
+ public boolean dropBufferedUpdates() {
+ versionInfo.blockUpdates();
+ try {
+ if (state != State.BUFFERING) return false;
+
+ if (log.isInfoEnabled()) {
+ log.info("Dropping buffered updates " + this);
+ }
+
+ // since we blocked updates, this synchronization shouldn't strictly be necessary.
+ synchronized (this) {
+ if (tlog != null) {
+ tlog.rollback(recoveryInfo.positionOfStart);
+ }
+ }
+
+ state = State.ACTIVE;
+ } catch (IOException e) {
+ SolrException.log(log,"Error attempting to roll back log", e);
+ return false;
+ }
+ finally {
+ versionInfo.unblockUpdates();
+ }
+ return true;
+ }
+
+
+ /** Returns the Future to wait on, or null if no replay was needed */
+ public Future<RecoveryInfo> applyBufferedUpdates() {
+ // recovery trips this assert under some race - even when
+ // it checks the state first
+ // assert state == State.BUFFERING;
+
+ // block all updates to eliminate race conditions
+ // reading state and acting on it in the update processor
+ versionInfo.blockUpdates();
+ try {
+ cancelApplyBufferUpdate = false;
+ if (state != State.BUFFERING) return null;
+
+ // handle case when no log was even created because no updates
+ // were received.
+ if (tlog == null) {
+ state = State.ACTIVE;
+ return null;
+ }
+ tlog.incref();
+ state = State.APPLYING_BUFFERED;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+
+ if (recoveryExecutor.isShutdown()) {
+ tlog.decref();
+ throw new RuntimeException("executor is not running...");
+ }
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
+ LogReplayer replayer = new LogReplayer(tlog, true);
+ return cs.submit(replayer, recoveryInfo);
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public String toString() {
+ return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}";
+ }
+
+
+ public static Runnable testing_logReplayHook; // called before each log read
+ public static Runnable testing_logReplayFinishHook; // called when log replay has finished
+
+
+
+ private RecoveryInfo recoveryInfo;
+
+ // TODO: do we let the log replayer run across core reloads?
+ class LogReplayer implements Runnable {
+ TransactionLog translog;
+ TransactionLog.LogReader tlogReader;
+ boolean activeLog;
+ boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
+
+
+ public LogReplayer(TransactionLog translog, boolean activeLog) {
+ this.translog = translog;
+ this.activeLog = activeLog;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ uhandler.core.log.warn("Starting log replay " + translog + " active="+activeLog + "starting pos=" + recoveryInfo.positionOfStart);
+
+ tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
+ SolrQueryResponse rsp = new SolrQueryResponse();
+
+ // NOTE: we don't currently handle a core reload during recovery. This would cause the core
+ // to change underneath us.
+
+ // TODO: use the standard request factory? We won't get any custom configuration instantiating this way.
+ RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
+ DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
+ runFac.init(new NamedList());
+ magicFac.init(new NamedList());
+
+ UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
+
+ long commitVersion = 0;
+
+ for(;;) {
+ Object o = null;
+ if (cancelApplyBufferUpdate) break;
+ try {
+ if (testing_logReplayHook != null) testing_logReplayHook.run();
+ o = null;
+ o = tlogReader.next();
+ if (o == null && activeLog) {
+ if (!finishing) {
+ // block to prevent new adds, but don't immediately unlock since
+ // we could be starved from ever completing recovery. Only unlock
+ // after we've finished this recovery.
+ // NOTE: our own updates won't be blocked since the thread holding a write lock can
+ // lock a read lock.
+ versionInfo.blockUpdates();
+ finishing = true;
+ o = tlogReader.next();
+ } else {
+ // we had previously blocked updates, so this "null" from the log is final.
+
+ // Wait until our final commit to change the state and unlock.
+ // This is only so no new updates are written to the current log file, and is
+ // only an issue if we crash before the commit (and we are paying attention
+ // to incomplete log files).
+ //
+ // versionInfo.unblockUpdates();
+ }
+ }
+ } catch (InterruptedException e) {
+ SolrException.log(log,e);
+ } catch (IOException e) {
+ SolrException.log(log,e);
+ } catch (Throwable e) {
+ SolrException.log(log,e);
+ }
+
+ if (o == null) break;
+
+ try {
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ {
+ recoveryInfo.adds++;
+ // byte[] idBytes = (byte[]) entry.get(2);
+ SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processAdd(cmd);
+ break;
+ }
+ case UpdateLog.DELETE:
+ {
+ recoveryInfo.deletes++;
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY:
+ {
+ recoveryInfo.deleteByQuery++;
+ String query = (String)entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ case UpdateLog.COMMIT:
+ {
+ commitVersion = version;
+ break;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+
+ if (rsp.getException() != null) {
+ log.error("Exception replaying log", rsp.getException());
+ throw rsp.getException();
+ }
+ } catch (IOException ex) {
+ recoveryInfo.errors++;
+ log.warn("IOException reading log", ex);
+ // could be caused by an incomplete flush if recovering from log
+ } catch (ClassCastException cl) {
+ recoveryInfo.errors++;
+ log.warn("Unexpected log entry or corrupt log. Entry=" + o, cl);
+ // would be caused by a corrupt transaction log
+ } catch (Throwable ex) {
+ recoveryInfo.errors++;
+ log.warn("Exception replaying log", ex);
+ // something wrong with the request?
+ }
+ }
+
+ CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+ cmd.setVersion(commitVersion);
+ cmd.softCommit = false;
+ cmd.waitSearcher = true;
+ cmd.setFlags(UpdateCommand.REPLAY);
+ try {
+ uhandler.commit(cmd); // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart.
+ } catch (IOException ex) {
+ recoveryInfo.errors++;
+ log.error("Replay exception: final commit.", ex);
+ }
+
+ if (!activeLog) {
+ // if we are replaying an old tlog file, we need to add a commit to the end
+ // so we don't replay it again if we restart right after.
+ translog.writeCommit(cmd);
+ }
+
+ try {
+ proc.finish();
+ } catch (IOException ex) {
+ recoveryInfo.errors++;
+ log.error("Replay exception: finish()", ex);
+ }
+
+ tlogReader.close();
+ translog.decref();
+
+ } catch (Throwable e) {
+ recoveryInfo.errors++;
+ SolrException.log(log,e);
+ } finally {
+ // change the state while updates are still blocked to prevent races
+ state = State.ACTIVE;
+ if (finishing) {
+ versionInfo.unblockUpdates();
+ }
+ }
+
+ log.warn("Ending log replay " + tlogReader);
+
+ if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
+ }
+ }
+
+ public void cancelApplyBufferedUpdates() {
+ this.cancelApplyBufferUpdate = true;
+ }
+
+ ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0,
+ Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("recoveryExecutor"));
+
}
+
+
+
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java Wed Jan 25 20:32:44 2012
@@ -117,20 +117,20 @@ class LogUpdateProcessor extends UpdateR
@Override
public void processDelete( DeleteUpdateCommand cmd ) throws IOException {
- if (cmd.id != null) {
+ if (cmd.isDeleteById()) {
if (deletes == null) {
deletes = new ArrayList<String>();
toLog.add("delete",deletes);
}
if (deletes.size() < maxNumToLog) {
- deletes.add(cmd.id);
+ deletes.add(cmd.getId());
}
- if (logDebug) { log.debug("delete {}", cmd.id); }
+ if (logDebug) { log.debug("delete {}", cmd.getId()); }
} else {
if (toLog.size() < maxNumToLog) {
toLog.add("deleteByQuery", cmd.query);
}
- if (logDebug) { log.debug("deleteByQuery {}", cmd.query); }
+ if (logDebug) { log.debug("deleteByQuery {}", cmd.getQuery()); }
}
numDeletes++;
Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java Wed Jan 25 20:32:44 2012
@@ -21,13 +21,7 @@ import java.io.IOException;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.DocumentBuilder;
-import org.apache.solr.update.MergeIndexesCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
-import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.*;
/**
@@ -49,6 +43,8 @@ class RunUpdateProcessor extends UpdateR
private final SolrQueryRequest req;
private final UpdateHandler updateHandler;
+ private boolean changesSinceCommit = false;
+
public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
super( next );
this.req = req;
@@ -59,17 +55,19 @@ class RunUpdateProcessor extends UpdateR
public void processAdd(AddUpdateCommand cmd) throws IOException {
updateHandler.addDoc(cmd);
super.processAdd(cmd);
+ changesSinceCommit = true;
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
- if( cmd.id != null ) {
+ if( cmd.isDeleteById()) {
updateHandler.delete(cmd);
}
else {
updateHandler.deleteByQuery(cmd);
}
super.processDelete(cmd);
+ changesSinceCommit = true;
}
@Override
@@ -83,6 +81,7 @@ class RunUpdateProcessor extends UpdateR
{
updateHandler.commit(cmd);
super.processCommit(cmd);
+ changesSinceCommit = false;
}
/**
@@ -93,6 +92,16 @@ class RunUpdateProcessor extends UpdateR
{
updateHandler.rollback(cmd);
super.processRollback(cmd);
+ changesSinceCommit = false;
+ }
+
+
+ @Override
+ public void finish() throws IOException {
+ if (changesSinceCommit && updateHandler.getUpdateLog() != null) {
+ updateHandler.getUpdateLog().finish(null);
+ }
+ super.finish();
}
}
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema.xml Wed Jan 25 20:32:44 2012
@@ -559,6 +559,8 @@
<field name="nopositionstext" type="nopositions" indexed="true" stored="true"/>
<field name="tlong" type="tlong" indexed="true" stored="true" />
+
+ <field name="_version_" type="long" indexed="true" stored="true" multiValued="false"/>
<!-- Dynamic field definitions. If a field name is not found, dynamicFields
will be used if the name matches any of the patterns.
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema12.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema12.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema12.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/schema12.xml Wed Jan 25 20:32:44 2012
@@ -530,6 +530,11 @@
<field name="uniq3" type="string" indexed="true" stored="true"/>
<field name="nouniq" type="string" indexed="true" stored="true" multiValued="true"/>
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+
+
+
<dynamicField name="*_coordinate" type="tdouble" indexed="true" stored="false"/>
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-basic.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-basic.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-basic.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-basic.xml Wed Jan 25 20:32:44 2012
@@ -21,6 +21,7 @@
DO NOT ADD THINGS TO THIS CONFIG! -->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
+ <dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
<requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
</config>
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml Wed Jan 25 20:32:44 2012
@@ -30,8 +30,11 @@
solr.RAMDirectoryFactory is memory based and not persistent. -->
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+ <dataDir>${solr.data.dir:}</dataDir>
+
<updateHandler class="solr.DirectUpdateHandler2">
</updateHandler>
+
<requestHandler name="standard" class="solr.StandardRequestHandler"/>
<requestDispatcher handleSelect="true" >
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig-tlog.xml Wed Jan 25 20:32:44 2012
@@ -19,7 +19,9 @@
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
- <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
@@ -29,8 +31,14 @@
</lst>
</requestHandler>
+ <requestHandler name="/update" class="solr.XmlUpdateRequestHandler">
+ </requestHandler>
+
+ <requestHandler name="/update/json" class="solr.JsonUpdateRequestHandler">
+ </requestHandler>
+
<updateHandler class="solr.DirectUpdateHandler2">
- <updateLog class="solr.FSUpdateLog">
+ <updateLog>
<!-- <str name="dir">/tmp/solr/</str> -->
</updateLog>
</updateHandler>
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/conf/solrconfig.xml Wed Jan 25 20:32:44 2012
@@ -141,7 +141,10 @@
<arr name="env"> <str>MYVAR=val1</str> </arr>
</listener>
-->
-
+
+ <updateLog enable="${enable.update.log:false}">
+ <str name="dir">${solr.data.dir:}</str>
+ </updateLog>
</updateHandler>
@@ -234,7 +237,8 @@
</query>
-
+
+ <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" />
<!-- An alternate set representation that uses an integer hash to store filters (sets of docids).
If the set cardinality <= maxSize elements, then HashDocSet will be used instead of the bitset
Modified: lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/solr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/solr.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/solr.xml (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test-files/solr/solr.xml Wed Jan 25 20:32:44 2012
@@ -28,7 +28,7 @@
adminPath: RequestHandler path to manage cores.
If 'null' (or absent), cores will not be manageable via request handler
-->
- <cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}" hostContext="solr" zkClientTimeout="8000">
- <core name="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" instanceDir="."/>
+ <cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}" hostContext="solr" zkClientTimeout="12000" numShards="${numShards:3}">
+ <core name="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" schema="${schema:schema.xml}" instanceDir="."/>
</cores>
</solr>
Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java Wed Jan 25 20:32:44 2012
@@ -121,7 +121,7 @@ public class BasicFunctionalityTest exte
// test merge factor picked up
SolrCore core = h.getCore();
- IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter(core);
+ IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getSolrCoreState().getIndexWriter(core);
assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
lrf.args.put(CommonParams.VERSION,"2.2");
@@ -222,6 +222,19 @@ public class BasicFunctionalityTest exte
);
}
+ @Test
+ public void testHTMLStrip() {
+ assertU(add(doc("id","200", "HTMLwhitetok","ABC")));
+ assertU(add(doc("id","201", "HTMLwhitetok","ABC"))); // do it again to make sure reuse is working
+ assertU(commit());
+ assertQ(req("q","HTMLwhitetok:ABC")
+ ,"//*[@numFound='2']"
+ );
+ assertQ(req("q","HTMLwhitetok:ABC")
+ ,"//*[@numFound='2']"
+ );
+ }
+
@Test
public void testClientErrorOnMalformedNumbers() throws Exception {
Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java Wed Jan 25 20:32:44 2012
@@ -45,10 +45,9 @@ public class TestSolrCoreProperties exte
public void setUp() throws Exception {
super.setUp();
setUpMe();
- System.setProperty("solr.solr.home", getHomeDir());
System.setProperty("solr.data.dir", getDataDir());
- solrJetty = new JettySolrRunner("/solr", 0);
+ solrJetty = new JettySolrRunner(getHomeDir(), "/solr", 0);
solrJetty.start();
String url = "http://localhost:" + solrJetty.getLocalPort() + "/solr";
Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Wed Jan 25 20:32:44 2012
@@ -23,6 +23,7 @@ import org.apache.solr.BaseDistributedSe
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.core.SolrConfig;
+import org.junit.AfterClass;
import org.junit.Before;
public abstract class AbstractDistributedZkTestCase extends BaseDistributedSearchTestCase {
@@ -34,7 +35,7 @@ public abstract class AbstractDistribute
public void setUp() throws Exception {
super.setUp();
log.info("####SETUP_START " + getName());
-
+ createTempDir();
ignoreException("java.nio.channels.ClosedChannelException");
String zkDir = testDir.getAbsolutePath() + File.separator
@@ -80,8 +81,8 @@ public abstract class AbstractDistribute
System.clearProperty("collection");
System.clearProperty("solr.test.sys.prop1");
System.clearProperty("solr.test.sys.prop2");
- super.tearDown();
resetExceptionIgnores();
+ super.tearDown();
}
protected void printLayout() throws Exception {
@@ -89,4 +90,10 @@ public abstract class AbstractDistribute
zkClient.printLayoutToStdOut();
zkClient.close();
}
+
+ @AfterClass
+ public static void afterClass() throws InterruptedException {
+ // wait just a bit for any zk client threads to outlast timeout
+ Thread.sleep(2000);
+ }
}