You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2017/01/26 07:34:36 UTC
[11/12] lucene-solr:apiv2: SOLR-5944: In-place updates of Numeric
DocValues
SOLR-5944: In-place updates of Numeric DocValues
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/53754108
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/53754108
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/53754108
Branch: refs/heads/apiv2
Commit: 5375410807aecf3cc67f82ca1e9ee591f39d0ac7
Parents: 7330601
Author: Ishan Chattopadhyaya <is...@apache.org>
Authored: Thu Jan 26 06:53:13 2017 +0530
Committer: Ishan Chattopadhyaya <is...@apache.org>
Committed: Thu Jan 26 06:53:13 2017 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +
.../client/solrj/embedded/JettySolrRunner.java | 58 +
.../handler/component/RealTimeGetComponent.java | 369 +++++-
.../apache/solr/search/SolrIndexSearcher.java | 6 +-
.../apache/solr/update/AddUpdateCommand.java | 36 +-
.../solr/update/DirectUpdateHandler2.java | 52 +-
.../org/apache/solr/update/DocumentBuilder.java | 151 ++-
.../java/org/apache/solr/update/PeerSync.java | 10 +
.../apache/solr/update/SolrCmdDistributor.java | 4 +
.../org/apache/solr/update/TransactionLog.java | 43 +-
.../java/org/apache/solr/update/UpdateLog.java | 242 +++-
.../org/apache/solr/update/VersionInfo.java | 7 +
.../processor/AtomicUpdateDocumentMerger.java | 182 ++-
.../processor/DistributedUpdateProcessor.java | 258 +++-
...BasedVersionConstraintsProcessorFactory.java | 2 +-
.../SkipExistingDocumentsProcessorFactory.java | 5 +-
.../collection1/conf/schema-inplace-updates.xml | 67 ++
.../test-files/solr/collection1/conf/schema.xml | 14 +-
.../solr/collection1/conf/schema15.xml | 7 +-
.../solrconfig-sortingmergepolicyfactory.xml | 3 +-
.../cloud/SegmentTerminateEarlyTestState.java | 14 +-
.../apache/solr/cloud/TestSegmentSorting.java | 165 ++-
.../solr/cloud/TestStressInPlaceUpdates.java | 612 ++++++++++
.../org/apache/solr/search/TestRecovery.java | 149 ++-
.../org/apache/solr/update/PeerSyncTest.java | 152 ++-
.../apache/solr/update/SolrIndexConfigTest.java | 4 +-
.../solr/update/TestInPlaceUpdatesDistrib.java | 1101 ++++++++++++++++++
.../update/TestInPlaceUpdatesStandalone.java | 1100 +++++++++++++++++
.../test/org/apache/solr/update/TestUpdate.java | 2 +-
.../org/apache/solr/update/UpdateLogTest.java | 271 +++++
.../update/processor/AtomicUpdatesTest.java | 147 +++
31 files changed, 5011 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 89cc796..21cfd7a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -76,6 +76,10 @@ Optimizations
* SOLR-9996: Unstored IntPointField returns Long type (Ishan Chattopadhyaya)
+* SOLR-5944: In-place updates of Numeric DocValues. To leverage this, the _version_ field and the updated
+ field must both be stored=false, indexed=false, docValues=true. (Ishan Chattopadhyaya, hossman, noble,
+ shalin, yonik)
+
Other Changes
----------------------
* SOLR-8396: Add support for PointFields in Solr (Ishan Chattopadhyaya, Tom�s Fern�ndez L�bbe)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index f4887e6..155f52e 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -30,12 +30,15 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrClient;
@@ -96,13 +99,35 @@ public class JettySolrRunner {
private int proxyPort = -1;
public static class DebugFilter implements Filter {
+ public final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private AtomicLong nRequests = new AtomicLong();
+
+ List<Delay> delays = new ArrayList<>();
public long getTotalRequests() {
return nRequests.get();
}
+
+ /**
+ * Introduce a delay of specified milliseconds for the specified request.
+ *
+ * @param reason Info message logged when delay occurs
+ * @param count The count-th request will experience a delay
+ * @param delay There will be a delay of this many milliseconds
+ */
+ public void addDelay(String reason, int count, int delay) {
+ delays.add(new Delay(reason, count, delay));
+ }
+
+ /**
+ * Remove any delay introduced before.
+ */
+ public void unsetDelay() {
+ delays.clear();
+ }
+
@Override
public void init(FilterConfig filterConfig) throws ServletException { }
@@ -110,11 +135,32 @@ public class JettySolrRunner {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
nRequests.incrementAndGet();
+ executeDelay();
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() { }
+
+ private void executeDelay() {
+ int delayMs = 0;
+ for (Delay delay: delays) {
+ log.info("Delaying "+delay.delayValue+", for reason: "+delay.reason);
+ if (delay.counter.decrementAndGet() == 0) {
+ delayMs += delay.delayValue;
+ }
+ }
+
+ if (delayMs > 0) {
+ log.info("Pausing this socket connection for " + delayMs + "ms...");
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ log.info("Waking up after the delay of " + delayMs + "ms...");
+ }
+ }
}
@@ -516,4 +562,16 @@ public class JettySolrRunner {
}
}
}
+
+ static class Delay {
+ final AtomicInteger counter;
+ final int delayValue;
+ final String reason;
+
+ public Delay(String reason, int counter, int delay) {
+ this.reason = reason;
+ this.counter = new AtomicInteger(counter);
+ this.delayValue = delay;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 8ce7301..4be643e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -27,9 +27,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
@@ -45,6 +47,7 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.ClusterState;
@@ -75,11 +78,11 @@ import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class RealTimeGetComponent extends SearchComponent
{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -148,6 +151,12 @@ public class RealTimeGetComponent extends SearchComponent
processGetUpdates(rb);
return;
}
+
+ val = params.get("getInputDocument");
+ if (val != null) {
+ processGetInputDocument(rb);
+ return;
+ }
final IdsRequsted reqIds = IdsRequsted.parseParams(req);
@@ -176,14 +185,14 @@ public class RealTimeGetComponent extends SearchComponent
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
- SolrCore core = req.getCore();
+ final SolrCore core = req.getCore();
SchemaField idField = core.getLatestSchema().getUniqueKeyField();
FieldType fieldType = idField.getType();
SolrDocumentList docList = new SolrDocumentList();
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- RefCounted<SolrIndexSearcher> searcherHolder = null;
+ SearcherInfo searcherInfo = new SearcherInfo(core);
// this is initialized & set on the context *after* any searcher (re-)opening
ResultContext resultContext = null;
@@ -197,7 +206,7 @@ public class RealTimeGetComponent extends SearchComponent
|| ((null != transformer) && transformer.needsSolrIndexSearcher());
try {
- SolrIndexSearcher searcher = null;
+
BytesRefBuilder idBytes = new BytesRefBuilder();
for (String idStr : reqIds.allIds) {
@@ -208,24 +217,34 @@ public class RealTimeGetComponent extends SearchComponent
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o;
assert entry.size() >= 3;
- int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
+ int oper = (Integer)entry.get(UpdateLog.FLAGS_IDX) & UpdateLog.OPERATION_MASK;
switch (oper) {
+ case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD:
if (mustUseRealtimeSearcher) {
- if (searcherHolder != null) {
- // close handles to current searchers & result context
- searcher = null;
- searcherHolder.decref();
- searcherHolder = null;
- resultContext = null;
- }
+ // close handles to current searchers & result context
+ searcherInfo.clear();
+ resultContext = null;
ulog.openRealtimeSearcher(); // force open a new realtime searcher
o = null; // pretend we never found this record and fall through to use the searcher
break;
}
- SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), core.getLatestSchema());
+ SolrDocument doc;
+ if (oper == UpdateLog.ADD) {
+ doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), core.getLatestSchema());
+ } else if (oper == UpdateLog.UPDATE_INPLACE) {
+ assert entry.size() == 5;
+ // For in-place update case, we have obtained the partial document till now. We need to
+ // resolve it to a full document to be returned to the user.
+ doc = resolveFullDocument(core, idBytes.get(), rsp.getReturnFields(), (SolrInputDocument)entry.get(entry.size()-1), entry, null);
+ if (doc == null) {
+ break; // document has been deleted as the resolve was going on
+ }
+ } else {
+ throw new SolrException(ErrorCode.INVALID_STATE, "Expected ADD or UPDATE_INPLACE. Got: " + oper);
+ }
if (transformer!=null) {
transformer.transform(doc, -1, 0); // unknown docID
}
@@ -241,23 +260,20 @@ public class RealTimeGetComponent extends SearchComponent
}
// didn't find it in the update log, so it should be in the newest searcher opened
- if (searcher == null) {
- searcherHolder = core.getRealtimeSearcher();
- searcher = searcherHolder.get();
- // don't bother with ResultContext yet, we won't need it if doc doesn't match filters
- }
+ searcherInfo.init();
+ // don't bother with ResultContext yet, we won't need it if doc doesn't match filters
int docid = -1;
- long segAndId = searcher.lookupId(idBytes.get());
+ long segAndId = searcherInfo.getSearcher().lookupId(idBytes.get());
if (segAndId >= 0) {
int segid = (int) segAndId;
- LeafReaderContext ctx = searcher.getTopReaderContext().leaves().get((int) (segAndId >> 32));
+ LeafReaderContext ctx = searcherInfo.getSearcher().getTopReaderContext().leaves().get((int) (segAndId >> 32));
docid = segid + ctx.docBase;
if (rb.getFilters() != null) {
for (Query raw : rb.getFilters()) {
- Query q = raw.rewrite(searcher.getIndexReader());
- Scorer scorer = searcher.createWeight(q, false, 1f).scorer(ctx);
+ Query q = raw.rewrite(searcherInfo.getSearcher().getIndexReader());
+ Scorer scorer = searcherInfo.getSearcher().createWeight(q, false, 1f).scorer(ctx);
if (scorer == null || segid != scorer.iterator().advance(segid)) {
// filter doesn't match.
docid = -1;
@@ -269,13 +285,13 @@ public class RealTimeGetComponent extends SearchComponent
if (docid < 0) continue;
- Document luceneDocument = searcher.doc(docid, rsp.getReturnFields().getLuceneFieldNames());
+ Document luceneDocument = searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames());
SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
- searcher.decorateDocValueFields(doc, docid, searcher.getNonStoredDVs(true));
+ searcherInfo.getSearcher().decorateDocValueFields(doc, docid, searcherInfo.getSearcher().getNonStoredDVs(true));
if ( null != transformer) {
if (null == resultContext) {
// either first pass, or we've re-opened searcher - either way now we setContext
- resultContext = new RTGResultContext(rsp.getReturnFields(), searcher, req);
+ resultContext = new RTGResultContext(rsp.getReturnFields(), searcherInfo.getSearcher(), req);
transformer.setContext(resultContext);
}
transformer.transform(doc, docid, 0);
@@ -284,22 +300,210 @@ public class RealTimeGetComponent extends SearchComponent
}
} finally {
- if (searcherHolder != null) {
- searcherHolder.decref();
- }
+ searcherInfo.clear();
}
addDocListToResponse(rb, docList);
}
+
+ /**
+ * Return the requested SolrInputDocument from the tlog/index. This will
+ * always be a full document, i.e. any partial in-place document will be resolved.
+ */
+ void processGetInputDocument(ResponseBuilder rb) throws IOException {
+ SolrQueryRequest req = rb.req;
+ SolrQueryResponse rsp = rb.rsp;
+ SolrParams params = req.getParams();
+ if (!params.getBool(COMPONENT_NAME, true)) {
+ return;
+ }
+
+ String idStr = params.get("getInputDocument", null);
+ if (idStr == null) return;
+ AtomicLong version = new AtomicLong();
+ SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr), version, false, null, true);
+ log.info("getInputDocument called for id="+idStr+", returning: "+doc);
+ rb.rsp.add("inputDocument", doc);
+ rb.rsp.add("version", version.get());
+ }
+
+ /**
+ * A SearcherInfo provides mechanism for obtaining RT searcher, from
+ * a SolrCore, and closing it, while taking care of the RefCounted references.
+ */
+ private static class SearcherInfo {
+ private RefCounted<SolrIndexSearcher> searcherHolder = null;
+ private SolrIndexSearcher searcher = null;
+ final SolrCore core;
+
+ public SearcherInfo(SolrCore core) {
+ this.core = core;
+ }
+
+ void clear(){
+ if (searcherHolder != null) {
+ // close handles to current searchers
+ searcher = null;
+ searcherHolder.decref();
+ searcherHolder = null;
+ }
+ }
+
+ void init(){
+ if (searcher == null) {
+ searcherHolder = core.getRealtimeSearcher();
+ searcher = searcherHolder.get();
+ }
+ }
+
+ public SolrIndexSearcher getSearcher() {
+ assert null != searcher : "init not called!";
+ return searcher;
+ }
+ }
+
+ /***
+ * Given a partial document obtained from the transaction log (e.g. as a result of RTG), resolve to a full document
+ * by populating all the partial updates that were applied on top of that last full document update.
+ *
+ * @param onlyTheseFields When a non-null set of field names is passed in, the resolve process only attempts to populate
+ * the given fields in this set. When this set is null, it resolves all fields.
+ * @return Returns the merged document, i.e. the resolved full document, or null if the document was not found (deleted
+ * after the resolving began)
+ */
+ private static SolrDocument resolveFullDocument(SolrCore core, BytesRef idBytes,
+ ReturnFields returnFields, SolrInputDocument partialDoc, List logEntry, Set<String> onlyTheseFields) throws IOException {
+ if (idBytes == null || logEntry.size() != 5) {
+ throw new SolrException(ErrorCode.INVALID_STATE, "Either Id field not present in partial document or log entry doesn't have previous version.");
+ }
+ long prevPointer = (long) logEntry.get(UpdateLog.PREV_POINTER_IDX);
+ long prevVersion = (long) logEntry.get(UpdateLog.PREV_VERSION_IDX);
+
+ // get the last full document from ulog
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ long lastPrevPointer = ulog.applyPartialUpdates(idBytes, prevPointer, prevVersion, onlyTheseFields, partialDoc);
+
+ if (lastPrevPointer == -1) { // full document was not found in tlog, but exists in index
+ SolrDocument mergedDoc = mergePartialDocWithFullDocFromIndex(core, idBytes, returnFields, onlyTheseFields, partialDoc);
+ return mergedDoc;
+ } else if (lastPrevPointer > 0) {
+ // We were supposed to have found the last full doc also in the tlogs, but the prevPointer links led to nowhere
+ // We should reopen a new RT searcher and get the doc. This should be a rare occurrence
+ Term idTerm = new Term(core.getLatestSchema().getUniqueKeyField().getName(), idBytes);
+ SolrDocument mergedDoc = reopenRealtimeSearcherAndGet(core, idTerm, returnFields);
+ if (mergedDoc == null) {
+ return null; // the document may have been deleted as the resolving was going on.
+ }
+ return mergedDoc;
+ } else { // i.e. lastPrevPointer==0
+ assert lastPrevPointer == 0;
+ // We have successfully resolved the document based off the tlogs
+ return toSolrDoc(partialDoc, core.getLatestSchema());
+ }
+ }
+
+ /**
+ * Re-open the RT searcher and get the document, referred to by the idTerm, from that searcher.
+ * @return Returns the document or null if not found.
+ */
+ private static SolrDocument reopenRealtimeSearcherAndGet(SolrCore core, Term idTerm, ReturnFields returnFields) throws IOException {
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ ulog.openRealtimeSearcher();
+ RefCounted<SolrIndexSearcher> searcherHolder = core.getRealtimeSearcher();
+ try {
+ SolrIndexSearcher searcher = searcherHolder.get();
+
+ int docid = searcher.getFirstMatch(idTerm);
+ if (docid < 0) {
+ return null;
+ }
+ Document luceneDocument = searcher.doc(docid, returnFields.getLuceneFieldNames());
+ SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
+ searcher.decorateDocValueFields(doc, docid, searcher.getNonStoredDVs(false));
+
+ return doc;
+ } finally {
+ searcherHolder.decref();
+ }
+ }
+
+ /**
+ * Gets a document from the index by id. If a non-null partial document (for in-place update) is passed in,
+ * this method obtains the document from the tlog/index by the given id, merges the partial document on top of it and then returns
+ * the resultant document.
+ *
+ * @param core A SolrCore instance, useful for obtaining a realtimesearcher and the schema
+ * @param idBytes Binary representation of the value of the unique key field
+ * @param returnFields Return fields, as requested
+ * @param onlyTheseFields When a non-null set of field names is passed in, the merge process only attempts to merge
+ * the given fields in this set. When this set is null, it merges all fields.
+ * @param partialDoc A partial document (containing an in-place update) used for merging against a full document
+ * from index; this maybe be null.
+ * @return If partial document is null, this returns document from the index or null if not found.
+ * If partial document is not null, this returns a document from index merged with the partial document, or null if
+ * document doesn't exist in the index.
+ */
+ private static SolrDocument mergePartialDocWithFullDocFromIndex(SolrCore core, BytesRef idBytes, ReturnFields returnFields,
+ Set<String> onlyTheseFields, SolrInputDocument partialDoc) throws IOException {
+ RefCounted<SolrIndexSearcher> searcherHolder = core.getRealtimeSearcher(); //Searcher();
+ try {
+ // now fetch last document from index, and merge partialDoc on top of it
+ SolrIndexSearcher searcher = searcherHolder.get();
+ SchemaField idField = core.getLatestSchema().getUniqueKeyField();
+ Term idTerm = new Term(idField.getName(), idBytes);
+
+ int docid = searcher.getFirstMatch(idTerm);
+ if (docid < 0) {
+ // The document was not found in index! Reopen a new RT searcher (to be sure) and get again.
+ // This should be because the document was deleted recently.
+ SolrDocument doc = reopenRealtimeSearcherAndGet(core, idTerm, returnFields);
+ if (doc == null) {
+ // Unable to resolve the last full doc in tlog fully,
+ // and document not found in index even after opening new rt searcher.
+ // This must be a case of deleted doc
+ return null;
+ }
+ return doc;
+ }
+
+ SolrDocument doc;
+ Set<String> decorateFields = onlyTheseFields == null ? searcher.getNonStoredDVs(false): onlyTheseFields;
+ Document luceneDocument = searcher.doc(docid, returnFields.getLuceneFieldNames());
+ doc = toSolrDoc(luceneDocument, core.getLatestSchema());
+ searcher.decorateDocValueFields(doc, docid, decorateFields);
+
+ long docVersion = (long) doc.getFirstValue(DistributedUpdateProcessor.VERSION_FIELD);
+ Object partialVersionObj = partialDoc.getFieldValue(DistributedUpdateProcessor.VERSION_FIELD);
+ long partialDocVersion = partialVersionObj instanceof Field? ((Field) partialVersionObj).numericValue().longValue():
+ partialVersionObj instanceof Number? ((Number) partialVersionObj).longValue(): Long.parseLong(partialVersionObj.toString());
+ if (docVersion > partialDocVersion) {
+ return doc;
+ }
+ for (String fieldName: (Iterable<String>) partialDoc.getFieldNames()) {
+ doc.setField(fieldName.toString(), partialDoc.getFieldValue(fieldName)); // since partial doc will only contain single valued fields, this is fine
+ }
+
+ return doc;
+ } finally {
+ if (searcherHolder != null) {
+ searcherHolder.decref();
+ }
+ }
+ }
public static SolrInputDocument DELETED = new SolrInputDocument();
/** returns the SolrInputDocument from the current tlog, or DELETED if it has been deleted, or
* null if there is no record of it in the current update log. If null is returned, it could
* still be in the latest index.
+ * @param versionReturned If a non-null AtomicLong is passed in, it is set to the version of the update returned from the TLog.
+ * @param resolveFullDocument In case the document is fetched from the tlog, it could only be a partial document if the last update
+ * was an in-place update. In that case, should this partial document be resolved to a full document (by following
+ * back prevPointer/prevVersion)?
*/
- public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes) {
+ public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes, AtomicLong versionReturned,
+ Set<String> onlyTheseNonStoredDVs, boolean resolveFullDocument) {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
@@ -310,9 +514,32 @@ public class RealTimeGetComponent extends SearchComponent
List entry = (List)o;
assert entry.size() >= 3;
int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
+ if (versionReturned != null) {
+ versionReturned.set((long)entry.get(UpdateLog.VERSION_IDX));
+ }
switch (oper) {
+ case UpdateLog.UPDATE_INPLACE:
+ assert entry.size() == 5;
+
+ if (resolveFullDocument) {
+ SolrInputDocument doc = (SolrInputDocument)entry.get(entry.size()-1);
+ try {
+ // For in-place update case, we have obtained the partial document till now. We need to
+ // resolve it to a full document to be returned to the user.
+ SolrDocument sdoc = resolveFullDocument(core, idBytes, new SolrReturnFields(), doc, entry, onlyTheseNonStoredDVs);
+ if (sdoc == null) {
+ return DELETED;
+ }
+ doc = toSolrInputDocument(sdoc, core.getLatestSchema());
+ return doc;
+ } catch (IOException ex) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error while resolving full document. ", ex);
+ }
+ } else {
+ // fall through to ADD, so as to get only the partial document
+ }
case UpdateLog.ADD:
- return (SolrInputDocument)entry.get(entry.size()-1);
+ return (SolrInputDocument) entry.get(entry.size()-1);
case UpdateLog.DELETE:
return DELETED;
default:
@@ -324,12 +551,40 @@ public class RealTimeGetComponent extends SearchComponent
return null;
}
+ /**
+ * Obtains the latest document for a given id from the tlog or index (if not found in the tlog).
+ *
+ * NOTE: This method uses the effective value for avoidRetrievingStoredFields param as false and
+ * for nonStoredDVs as null in the call to @see {@link RealTimeGetComponent#getInputDocument(SolrCore, BytesRef, AtomicLong, boolean, Set, boolean)},
+ * so as to retrieve all stored and non-stored DV fields from all documents. Also, it uses the effective value of
+ * resolveFullDocument param as true, i.e. it resolves any partial documents (in-place updates), in case the
+ * document is fetched from the tlog, to a full document.
+ */
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
+ return getInputDocument (core, idBytes, null, false, null, true);
+ }
+
+ /**
+ * Obtains the latest document for a given id from the tlog or through the realtime searcher (if not found in the tlog).
+ * @param versionReturned If a non-null AtomicLong is passed in, it is set to the version of the update returned from the TLog.
+ * @param avoidRetrievingStoredFields Setting this to true avoids fetching stored fields through the realtime searcher,
+ * however has no effect on documents obtained from the tlog.
+ * Non-stored docValues fields are populated anyway, and are not affected by this parameter. Note that if
+ * the id field is a stored field, it will not be populated if this parameter is true and the document is
+ * obtained from the index.
+ * @param onlyTheseNonStoredDVs If not-null, populate only these DV fields in the document fetched through the realtime searcher.
+ * If this is null, decorate all non-stored DVs (that are not targets of copy fields) from the searcher.
+ * @param resolveFullDocument In case the document is fetched from the tlog, it could only be a partial document if the last update
+ * was an in-place update. In that case, should this partial document be resolved to a full document (by following
+ * back prevPointer/prevVersion)?
+ */
+ public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, AtomicLong versionReturned, boolean avoidRetrievingStoredFields,
+ Set<String> onlyTheseNonStoredDVs, boolean resolveFullDocument) throws IOException {
SolrInputDocument sid = null;
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
- sid = getInputDocumentFromTlog(core, idBytes);
+ sid = getInputDocumentFromTlog(core, idBytes, versionReturned, onlyTheseNonStoredDVs, resolveFullDocument);
if (sid == DELETED) {
return null;
}
@@ -346,9 +601,18 @@ public class RealTimeGetComponent extends SearchComponent
int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes));
if (docid < 0) return null;
- Document luceneDocument = searcher.doc(docid);
- sid = toSolrInputDocument(luceneDocument, core.getLatestSchema());
- searcher.decorateDocValueFields(sid, docid, searcher.getNonStoredDVsWithoutCopyTargets());
+
+ if (avoidRetrievingStoredFields) {
+ sid = new SolrInputDocument();
+ } else {
+ Document luceneDocument = searcher.doc(docid);
+ sid = toSolrInputDocument(luceneDocument, core.getLatestSchema());
+ }
+ if (onlyTheseNonStoredDVs != null) {
+ searcher.decorateDocValueFields(sid, docid, onlyTheseNonStoredDVs);
+ } else {
+ searcher.decorateDocValueFields(sid, docid, searcher.getNonStoredDVsWithoutCopyTargets());
+ }
}
} finally {
if (searcherHolder != null) {
@@ -356,6 +620,11 @@ public class RealTimeGetComponent extends SearchComponent
}
}
+ if (versionReturned != null) {
+ if (sid.containsKey(DistributedUpdateProcessor.VERSION_FIELD)) {
+ versionReturned.set((long)sid.getFieldValue(DistributedUpdateProcessor.VERSION_FIELD));
+ }
+ }
return sid;
}
@@ -381,6 +650,30 @@ public class RealTimeGetComponent extends SearchComponent
return out;
}
+ private static SolrInputDocument toSolrInputDocument(SolrDocument doc, IndexSchema schema) {
+ SolrInputDocument out = new SolrInputDocument();
+ for( String fname : doc.getFieldNames() ) {
+ SchemaField sf = schema.getFieldOrNull(fname);
+ if (sf != null) {
+ if ((!sf.hasDocValues() && !sf.stored()) || schema.isCopyFieldTarget(sf)) continue;
+ }
+ for (Object val: doc.getFieldValues(fname)) {
+ if (val instanceof Field) {
+ Field f = (Field) val;
+ if (sf != null) {
+ val = sf.getType().toObject(f); // object or external string?
+ } else {
+ val = f.stringValue();
+ if (val == null) val = f.numericValue();
+ if (val == null) val = f.binaryValue();
+ if (val == null) val = f;
+ }
+ }
+ out.addField(fname, val);
+ }
+ }
+ return out;
+ }
private static SolrDocument toSolrDoc(Document doc, IndexSchema schema) {
SolrDocument out = new SolrDocument();
@@ -409,9 +702,13 @@ public class RealTimeGetComponent extends SearchComponent
return out;
}
- private static SolrDocument toSolrDoc(SolrInputDocument sdoc, IndexSchema schema) {
+ /**
+ * Converts a SolrInputDocument to SolrDocument, using an IndexSchema instance.
+ * @lucene.experimental
+ */
+ public static SolrDocument toSolrDoc(SolrInputDocument sdoc, IndexSchema schema) {
// TODO: do something more performant than this double conversion
- Document doc = DocumentBuilder.toDocument(sdoc, schema);
+ Document doc = DocumentBuilder.toDocument(sdoc, schema, false);
// copy the stored fields only
Document out = new Document();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index 3f7d511..75d0998 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -809,7 +809,11 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
}
}
} else {
- final DocValuesType dvType = fieldInfos.fieldInfo(fieldName).getDocValuesType();
+ FieldInfo fi = fieldInfos.fieldInfo(fieldName);
+ if (fi == null) {
+ continue; // Searcher doesn't have info about this field, hence ignore it.
+ }
+ final DocValuesType dvType = fi.getDocValuesType();
switch (dvType) {
case NUMERIC:
final NumericDocValues ndv = leafReader.getNumericDocValues(fieldName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index 377cb6b..db1d79b 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -39,10 +39,20 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
// it will be obtained from the doc.
private BytesRef indexedId;
- // Higher level SolrInputDocument, normally used to construct the Lucene Document
- // to index.
+ /**
+ * Higher level SolrInputDocument, normally used to construct the Lucene Document
+ * to index.
+ */
public SolrInputDocument solrDoc;
+ /**
+ * This is the version of a document, previously indexed, on which the current
+ * update depends on. This version could be that of a previous in-place update
+ * or a full update. A negative value here, e.g. -1, indicates that this add
+ * update does not depend on a previous update.
+ */
+ public long prevVersion = -1;
+
public boolean overwrite = true;
public Term updateTerm;
@@ -76,10 +86,19 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
}
/** Creates and returns a lucene Document to index. Any changes made to the returned Document
- * will not be reflected in the SolrInputDocument, or future calls to this method.
+ * will not be reflected in the SolrInputDocument, or future calls to this method. This defaults
+ * to false for the inPlaceUpdate parameter of {@link #getLuceneDocument(boolean)}.
*/
public Document getLuceneDocument() {
- return DocumentBuilder.toDocument(getSolrInputDocument(), req.getSchema());
+ return getLuceneDocument(false);
+ }
+
+ /** Creates and returns a lucene Document to index. Any changes made to the returned Document
+ * will not be reflected in the SolrInputDocument, or future calls to this method.
+ * @param inPlaceUpdate Whether this document will be used for in-place updates.
+ */
+ public Document getLuceneDocument(boolean inPlaceUpdate) {
+ return DocumentBuilder.toDocument(getSolrInputDocument(), req.getSchema(), inPlaceUpdate);
}
/** Returns the indexed ID for this document. The returned BytesRef is retained across multiple calls, and should not be modified. */
@@ -212,7 +231,6 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
unwrappedDocs.add(currentDoc);
}
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
@@ -223,5 +241,11 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
return sb.toString();
}
-
+ /**
+ * Is this add update an in-place update? An in-place update is one where only docValues are
+ * updated, and a new docment is not indexed.
+ */
+ public boolean isInPlaceUpdate() {
+ return (prevVersion >= 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 035ae8d..9e65ebd 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -27,9 +27,11 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.Term;
@@ -274,9 +276,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
if (cmd.isBlock()) {
writer.updateDocuments(updateTerm, cmd);
} else {
- Document luceneDocument = cmd.getLuceneDocument();
- // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
- writer.updateDocument(updateTerm, luceneDocument);
+ updateDocOrDocValues(cmd, writer, updateTerm);
}
// SolrCore.verbose("updateDocument",updateTerm,"DONE");
@@ -331,7 +331,8 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// see comment in deleteByQuery
synchronized (solrCoreState.getUpdateLock()) {
- writer.updateDocument(idTerm, luceneDocument);
+ updateDocOrDocValues(cmd, writer, idTerm);
+
for (Query q : dbqList) {
writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
}
@@ -450,6 +451,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// as we use around ulog.preCommit... also see comments in ulog.postSoftCommit)
//
synchronized (solrCoreState.getUpdateLock()) {
+
+ // We are reopening a searcher before applying the deletes to overcome LUCENE-7344.
+ // Once LUCENE-7344 is resolved, we can consider removing this.
+ if (ulog != null) ulog.openRealtimeSearcher();
+
if (delAll) {
deleteAll();
} else {
@@ -830,6 +836,44 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
splitter.split();
}
+ /**
+ * Calls either {@link IndexWriter#updateDocValues} or {@link IndexWriter#updateDocument} as
+ * needed based on {@link AddUpdateCommand#isInPlaceUpdate}.
+ * <p>
+ * If the this is an UPDATE_INPLACE cmd, then all fields inclued in
+ * {@link AddUpdateCommand#getLuceneDocument} must either be the uniqueKey field, or be DocValue
+ * only fields.
+ * </p>
+ *
+ * @param cmd - cmd apply to IndexWriter
+ * @param writer - IndexWriter to use
+ * @param updateTerm - used if this cmd results in calling {@link IndexWriter#updateDocument}
+ */
+ private void updateDocOrDocValues(AddUpdateCommand cmd, IndexWriter writer, Term updateTerm) throws IOException {
+ assert null != cmd;
+ final SchemaField uniqueKeyField = cmd.req.getSchema().getUniqueKeyField();
+ final String uniqueKeyFieldName = null == uniqueKeyField ? null : uniqueKeyField.getName();
+
+ if (cmd.isInPlaceUpdate()) {
+ Document luceneDocument = cmd.getLuceneDocument(true);
+
+ final List<IndexableField> origDocFields = luceneDocument.getFields();
+ final List<Field> fieldsToUpdate = new ArrayList<>(origDocFields.size());
+ for (IndexableField field : origDocFields) {
+ if (! field.name().equals(uniqueKeyFieldName) ) {
+ fieldsToUpdate.add((Field)field);
+ }
+ }
+ log.debug("updateDocValues({})", cmd);
+ writer.updateDocValues(updateTerm, fieldsToUpdate.toArray(new Field[fieldsToUpdate.size()]));
+ } else {
+ Document luceneDocument = cmd.getLuceneDocument(false);
+ log.debug("updateDocument({})", cmd);
+ writer.updateDocument(updateTerm, luceneDocument);
+ }
+ }
+
+
/////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info
/////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
index 633a6dc..eb6612e 100644
--- a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
+++ b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.IndexableField;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -37,15 +38,46 @@ import com.google.common.collect.Sets;
*/
public class DocumentBuilder {
- private static void addField(Document doc, SchemaField field, Object val, float boost) {
+ /**
+ * Add a field value to a given document.
+ * @param doc Document that the field needs to be added to
+ * @param field The schema field object for the field
+ * @param val The value for the field to be added
+ * @param boost Boost value for the field
+ * @param forInPlaceUpdate Whether the field is to be added for in-place update. If true,
+ * only numeric docValues based fields are added to the document. This can be true
+ * when constructing a Lucene document for writing an in-place update, and we don't need
+ * presence of non-updatable fields (non NDV) in such a document.
+ */
+ private static void addField(Document doc, SchemaField field, Object val, float boost,
+ boolean forInPlaceUpdate) {
if (val instanceof IndexableField) {
+ if (forInPlaceUpdate) {
+ assert val instanceof NumericDocValuesField: "Expected in-place update to be done on"
+ + " NDV fields only.";
+ }
// set boost to the calculated compound boost
((Field)val).setBoost(boost);
doc.add((Field)val);
return;
}
for (IndexableField f : field.getType().createFields(field, val, boost)) {
- if (f != null) doc.add((Field) f); // null fields are not added
+ if (f != null) { // null fields are not added
+ // HACK: workaround for SOLR-9809
+ // even though at this point in the code we know the field is single valued and DV only
+ // TrieField.createFields() may still return (usless) IndexableField instances that are not
+ // NumericDocValuesField instances.
+ //
+ // once SOLR-9809 is resolved, we should be able to replace this conditional with...
+ // assert f instanceof NumericDocValuesField
+ if (forInPlaceUpdate) {
+ if (f instanceof NumericDocValuesField) {
+ doc.add((Field) f);
+ }
+ } else {
+ doc.add((Field) f);
+ }
+ }
}
}
@@ -60,6 +92,14 @@ public class DocumentBuilder {
}
/**
+ * @see DocumentBuilder#toDocument(SolrInputDocument, IndexSchema, boolean)
+ */
+ public static Document toDocument( SolrInputDocument doc, IndexSchema schema )
+ {
+ return toDocument(doc, schema, false);
+ }
+
+ /**
* Convert a SolrInputDocument to a lucene Document.
*
* This function should go elsewhere. This builds the Document without an
@@ -72,9 +112,19 @@ public class DocumentBuilder {
* moved to an independent function
*
* @since solr 1.3
+ *
+ * @param doc SolrInputDocument from which the document has to be built
+ * @param schema Schema instance
+ * @param forInPlaceUpdate Whether the output document would be used for an in-place update or not. When this is true,
+ * default fields values and copy fields targets are not populated.
+ * @return Built Lucene document
+
*/
- public static Document toDocument( SolrInputDocument doc, IndexSchema schema )
- {
+ public static Document toDocument( SolrInputDocument doc, IndexSchema schema, boolean forInPlaceUpdate )
+ {
+ final SchemaField uniqueKeyField = schema.getUniqueKeyField();
+ final String uniqueKeyFieldName = null == uniqueKeyField ? null : uniqueKeyField.getName();
+
Document out = new Document();
final float docBoost = doc.getDocumentBoost();
Set<String> usedFields = Sets.newHashSet();
@@ -84,7 +134,6 @@ public class DocumentBuilder {
String name = field.getName();
SchemaField sfield = schema.getFieldOrNull(name);
boolean used = false;
-
// Make sure it has the correct number
if( sfield!=null && !sfield.multiValued() && field.getValueCount() > 1 ) {
@@ -119,45 +168,51 @@ public class DocumentBuilder {
hasField = true;
if (sfield != null) {
used = true;
- addField(out, sfield, v, applyBoost ? compoundBoost : 1f);
+ addField(out, sfield, v, applyBoost ? compoundBoost : 1f,
+ name.equals(uniqueKeyFieldName) ? false : forInPlaceUpdate);
// record the field as having a value
usedFields.add(sfield.getName());
}
// Check if we should copy this field value to any other fields.
// This could happen whether it is explicit or not.
- if( copyFields != null ){
- for (CopyField cf : copyFields) {
- SchemaField destinationField = cf.getDestination();
-
- final boolean destHasValues = usedFields.contains(destinationField.getName());
-
- // check if the copy field is a multivalued or not
- if (!destinationField.multiValued() && destHasValues) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "ERROR: "+getID(doc, schema)+"multiple values encountered for non multiValued copy field " +
- destinationField.getName() + ": " + v);
- }
-
- used = true;
-
- // Perhaps trim the length of a copy field
- Object val = v;
- if( val instanceof String && cf.getMaxChars() > 0 ) {
- val = cf.getLimitedValue((String)val);
+ if (copyFields != null) {
+ // Do not copy this field if this document is to be used for an in-place update,
+ // and this is the uniqueKey field (because the uniqueKey can't change so no need to "update" the copyField).
+ if ( ! (forInPlaceUpdate && name.equals(uniqueKeyFieldName)) ) {
+ for (CopyField cf : copyFields) {
+ SchemaField destinationField = cf.getDestination();
+
+ final boolean destHasValues = usedFields.contains(destinationField.getName());
+
+ // check if the copy field is a multivalued or not
+ if (!destinationField.multiValued() && destHasValues) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "ERROR: "+getID(doc, schema)+"multiple values encountered for non multiValued copy field " +
+ destinationField.getName() + ": " + v);
+ }
+
+ used = true;
+
+ // Perhaps trim the length of a copy field
+ Object val = v;
+ if( val instanceof String && cf.getMaxChars() > 0 ) {
+ val = cf.getLimitedValue((String)val);
+ }
+
+ // we can't copy any boost unless the dest field is
+ // indexed & !omitNorms, but which boost we copy depends
+ // on whether the dest field already contains values (we
+ // don't want to apply the compounded docBoost more then once)
+ final float destBoost =
+ (destinationField.indexed() && !destinationField.omitNorms()) ?
+ (destHasValues ? fieldBoost : compoundBoost) : 1.0F;
+
+ addField(out, destinationField, val, destBoost,
+ destinationField.getName().equals(uniqueKeyFieldName) ? false : forInPlaceUpdate);
+ // record the field as having a value
+ usedFields.add(destinationField.getName());
}
-
- // we can't copy any boost unless the dest field is
- // indexed & !omitNorms, but which boost we copy depends
- // on whether the dest field already contains values (we
- // don't want to apply the compounded docBoost more then once)
- final float destBoost =
- (destinationField.indexed() && !destinationField.omitNorms()) ?
- (destHasValues ? fieldBoost : compoundBoost) : 1.0F;
-
- addField(out, destinationField, val, destBoost);
- // record the field as having a value
- usedFields.add(destinationField.getName());
}
}
@@ -187,14 +242,20 @@ public class DocumentBuilder {
// Now validate required fields or add default values
// fields with default values are defacto 'required'
- for (SchemaField field : schema.getRequiredFields()) {
- if (out.getField(field.getName() ) == null) {
- if (field.getDefaultValue() != null) {
- addField(out, field, field.getDefaultValue(), 1.0f);
- }
- else {
- String msg = getID(doc, schema) + "missing required field: " + field.getName();
- throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, msg );
+
+ // Note: We don't need to add default fields if this document is to be used for
+ // in-place updates, since this validation and population of default fields would've happened
+ // during the full indexing initially.
+ if (!forInPlaceUpdate) {
+ for (SchemaField field : schema.getRequiredFields()) {
+ if (out.getField(field.getName() ) == null) {
+ if (field.getDefaultValue() != null) {
+ addField(out, field, field.getDefaultValue(), 1.0f, false);
+ }
+ else {
+ String msg = getID(doc, schema) + "missing required field: " + field.getName();
+ throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, msg );
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index 861cbf7..88900aa 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -828,6 +828,16 @@ public class PeerSync implements SolrMetricProducer {
proc.processDelete(cmd);
break;
}
+ case UpdateLog.UPDATE_INPLACE:
+ {
+ AddUpdateCommand cmd = UpdateLog.convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ if (debug) {
+ log.debug(msg() + "inplace update " + cmd + " prevVersion=" + cmd.prevVersion + ", doc=" + cmd.solrDoc);
+ }
+ proc.processAdd(cmd);
+ break;
+ }
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index c161b82..5caf43e 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplicationTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -206,6 +207,9 @@ public class SolrCmdDistributor {
uReq.lastDocInBatch();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ if (cmd.isInPlaceUpdate()) {
+ params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
+ }
submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 997485a..5037b45 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -342,7 +342,33 @@ public class TransactionLog implements Closeable {
int lastAddSize;
+ /**
+ * Writes an add update command to the transaction log. This is not applicable for
+ * in-place updates; use {@link #write(AddUpdateCommand, long, int)}.
+ * (The previous pointer (applicable for in-place updates) is set to -1 while writing
+ * the command to the transaction log.)
+ * @param cmd The add update command to be written
+ * @param flags Options for writing the command to the transaction log
+ * @return Returns the position pointer of the written update command
+ *
+ * @see #write(AddUpdateCommand, long, int)
+ */
public long write(AddUpdateCommand cmd, int flags) {
+ return write(cmd, -1, flags);
+ }
+
+ /**
+ * Writes an add update command to the transaction log. This should be called only for
+ * writing in-place updates, or else pass -1 as the prevPointer.
+ * @param cmd The add update command to be written
+ * @param prevPointer The pointer in the transaction log which this update depends
+ * on (applicable for in-place updates)
+ * @param flags Options for writing the command to the transaction log
+ * @return Returns the position pointer of the written update command
+ */
+ public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
+ assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
+
LogCodec codec = new LogCodec(resolver);
SolrInputDocument sdoc = cmd.getSolrInputDocument();
@@ -355,10 +381,19 @@ public class TransactionLog implements Closeable {
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out);
- codec.writeTag(JavaBinCodec.ARR, 3);
- codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
- codec.writeLong(cmd.getVersion());
- codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ if (cmd.isInPlaceUpdate()) {
+ codec.writeTag(JavaBinCodec.ARR, 5);
+ codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeLong(prevPointer);
+ codec.writeLong(cmd.prevVersion);
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ } else {
+ codec.writeTag(JavaBinCodec.ARR, 3);
+ codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+ }
lastAddSize = (int)out.size();
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 7c2dae6..aaa6b6a 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,6 +35,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
@@ -44,6 +46,7 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
@@ -122,6 +125,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public static final int DELETE = 0x02;
public static final int DELETE_BY_QUERY = 0x03;
public static final int COMMIT = 0x04;
+ public static final int UPDATE_INPLACE = 0x08;
// Flag indicating that this is a buffered operation, and that a gap exists before buffering started.
// for example, if full index replication starts and we are buffering updates, then this flag should
// be set to indicate that replaying the log would not bring us into sync (i.e. peersync should
@@ -129,6 +133,28 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public static final int FLAG_GAP = 0x10;
public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation
+ /**
+ * The index of the flags value in an entry from the transaction log.
+ */
+ public static final int FLAGS_IDX = 0;
+
+ /**
+ * The index of the _version_ value in an entry from the transaction log.
+ */
+public static final int VERSION_IDX = 1;
+
+ /**
+ * The index of the previous pointer in an entry from the transaction log.
+ * This is only relevant if flags (indexed at FLAGS_IDX) includes UPDATE_INPLACE.
+ */
+ public static final int PREV_POINTER_IDX = 2;
+
+ /**
+ * The index of the previous version in an entry from the transaction log.
+ * This is only relevant if flags (indexed at FLAGS_IDX) includes UPDATE_INPLACE.
+ */
+ public static final int PREV_VERSION_IDX = 3;
+
public static class RecoveryInfo {
public long positionOfStart;
@@ -215,10 +241,29 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public static class LogPtr {
final long pointer;
final long version;
-
+ final long previousPointer; // used for entries that are in-place updates and need a pointer to a previous update command
+
+ /**
+ * Creates an object that contains the position and version of an update. In this constructor,
+ * the effective value of the previousPointer is -1.
+ *
+ * @param pointer Position in the transaction log of an update
+ * @param version Version of the update at the given position
+ */
public LogPtr(long pointer, long version) {
+ this(pointer, version, -1);
+ }
+
+ /**
+ *
+ * @param pointer Position in the transaction log of an update
+ * @param version Version of the update at the given position
+ * @param previousPointer Position, in the transaction log, of an update on which the current update depends
+ */
+ public LogPtr(long pointer, long version, long previousPointer) {
this.pointer = pointer;
this.version = version;
+ this.previousPointer = previousPointer;
}
@Override
@@ -476,16 +521,18 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
synchronized (this) {
long pos = -1;
+ long prevPointer = getPrevPointerForUpdate(cmd);
+
// don't log if we are replaying from another log
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
ensureLog();
- pos = tlog.write(cmd, operationFlags);
+ pos = tlog.write(cmd, prevPointer, operationFlags);
}
if (!clearCaches) {
// TODO: in the future we could support a real position for a REPLAY update.
// Only currently would be useful for RTG while in recovery mode though.
- LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer);
// only update our map if we're not buffering
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
@@ -506,6 +553,31 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
+ /**
+ * @return If cmd is an in-place update, then returns the pointer (in the tlog) of the previous
+ * update that the given update depends on.
+ * Returns -1 if this is not an in-place update, or if we can't find a previous entry in
+ * the tlog. Upon receiving a -1, it should be clear why it was -1: if the command's
+ * flags|UpdateLog.UPDATE_INPLACE is set, then this command is an in-place update whose
+ * previous update is in the index and not in the tlog; if that flag is not set, it is
+ * not an in-place update at all, and don't bother about the prevPointer value at
+ * all (which is -1 as a dummy value).)
+ */
+ private synchronized long getPrevPointerForUpdate(AddUpdateCommand cmd) {
+ // note: sync required to ensure maps aren't changed out form under us
+ if (cmd.isInPlaceUpdate()) {
+ BytesRef indexedId = cmd.getIndexedId();
+ for (Map<BytesRef, LogPtr> currentMap : Arrays.asList(map, prevMap, prevMap2)) {
+ if (currentMap != null) {
+ LogPtr prevEntry = currentMap.get(indexedId);
+ if (null != prevEntry) {
+ return prevEntry.pointer;
+ }
+ }
+ }
+ }
+ return -1;
+ }
public void delete(DeleteUpdateCommand cmd) {
@@ -755,6 +827,117 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
+ /**
+ * Goes over backwards, following the prevPointer, to merge all partial updates into the passed doc. Stops at either a full
+ * document, or if there are no previous entries to follow in the update log.
+ *
+ * @param id Binary representation of the unique key field
+ * @param prevPointer Pointer to the previous entry in the ulog, based on which the current in-place update was made.
+ * @param prevVersion Version of the previous entry in the ulog, based on which the current in-place update was made.
+ * @param onlyTheseFields When a non-null set of field names is passed in, the resolve process only attempts to populate
+ * the given fields in this set. When this set is null, it resolves all fields.
+ * @param latestPartialDoc Partial document that is to be populated
+ * @return Returns 0 if a full document was found in the log, -1 if no full document was found. If full document was supposed
+ * to be found in the tlogs, but couldn't be found (because the logs were rotated) then the prevPointer is returned.
+ */
+ synchronized public long applyPartialUpdates(BytesRef id, long prevPointer, long prevVersion,
+ Set<String> onlyTheseFields, SolrDocumentBase latestPartialDoc) {
+
+ SolrInputDocument partialUpdateDoc = null;
+
+ List<TransactionLog> lookupLogs = Arrays.asList(tlog, prevMapLog, prevMapLog2);
+ while (prevPointer >= 0) {
+ //go through each partial update and apply it on the incoming doc one after another
+ List entry;
+ entry = getEntryFromTLog(prevPointer, prevVersion, lookupLogs);
+ if (entry == null) {
+ return prevPointer; // a previous update was supposed to be found, but wasn't found (due to log rotation)
+ }
+ int flags = (int) entry.get(UpdateLog.FLAGS_IDX);
+
+ // since updates can depend only upon ADD updates or other UPDATE_INPLACE updates, we assert that we aren't
+ // getting something else
+ if ((flags & UpdateLog.ADD) != UpdateLog.ADD && (flags & UpdateLog.UPDATE_INPLACE) != UpdateLog.UPDATE_INPLACE) {
+ throw new SolrException(ErrorCode.INVALID_STATE, entry + " should've been either ADD or UPDATE_INPLACE update" +
+ ", while looking for id=" + new String(id.bytes, Charset.forName("UTF-8")));
+ }
+ // if this is an ADD (i.e. full document update), stop here
+ if ((flags & UpdateLog.ADD) == UpdateLog.ADD) {
+ partialUpdateDoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ applyOlderUpdates(latestPartialDoc, partialUpdateDoc, onlyTheseFields);
+ return 0; // Full document was found in the tlog itself
+ }
+ if (entry.size() < 5) {
+ throw new SolrException(ErrorCode.INVALID_STATE, entry + " is not a partial doc" +
+ ", while looking for id=" + new String(id.bytes, Charset.forName("UTF-8")));
+ }
+ // This update is an inplace update, get the partial doc. The input doc is always at last position.
+ partialUpdateDoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ applyOlderUpdates(latestPartialDoc, partialUpdateDoc, onlyTheseFields);
+ prevPointer = (long) entry.get(UpdateLog.PREV_POINTER_IDX);
+ prevVersion = (long) entry.get(UpdateLog.PREV_VERSION_IDX);
+
+ if (onlyTheseFields != null && latestPartialDoc.keySet().containsAll(onlyTheseFields)) {
+ return 0; // all the onlyTheseFields have been resolved, safe to abort now.
+ }
+ }
+
+ return -1; // last full document is not supposed to be in tlogs, but it must be in the index
+ }
+
+ /**
+ * Add all fields from olderDoc into newerDoc if not already present in newerDoc
+ */
+ private void applyOlderUpdates(SolrDocumentBase newerDoc, SolrInputDocument olderDoc, Set<String> mergeFields) {
+ for (String fieldName : olderDoc.getFieldNames()) {
+ // if the newerDoc has this field, then this field from olderDoc can be ignored
+ if (!newerDoc.containsKey(fieldName) && (mergeFields == null || mergeFields.contains(fieldName))) {
+ for (Object val : olderDoc.getFieldValues(fieldName)) {
+ newerDoc.addField(fieldName, val);
+ }
+ }
+ }
+ }
+
+
+ /***
+ * Get the entry that has the given lookupVersion in the given lookupLogs at the lookupPointer position.
+ *
+ * @return The entry if found, otherwise null
+ */
+ private synchronized List getEntryFromTLog(long lookupPointer, long lookupVersion, List<TransactionLog> lookupLogs) {
+ for (TransactionLog lookupLog : lookupLogs) {
+ if (lookupLog != null && lookupLog.getLogSize() > lookupPointer) {
+ lookupLog.incref();
+ try {
+ Object obj = null;
+
+ try {
+ obj = lookupLog.lookup(lookupPointer);
+ } catch (Exception | Error ex) {
+ // This can happen when trying to deserialize the entry at position lookupPointer,
+ // but from a different tlog than the one containing the desired entry.
+ // Just ignore the exception, so as to proceed to the next tlog.
+ log.debug("Exception reading the log (this is expected, don't worry)=" + lookupLog + ", for version=" + lookupVersion +
+ ". This can be ignored.");
+ }
+
+ if (obj != null && obj instanceof List) {
+ List tmpEntry = (List) obj;
+ if (tmpEntry.size() >= 2 &&
+ (tmpEntry.get(UpdateLog.VERSION_IDX) instanceof Long) &&
+ ((Long) tmpEntry.get(UpdateLog.VERSION_IDX)).equals(lookupVersion)) {
+ return tmpEntry;
+ }
+ }
+ } finally {
+ lookupLog.decref();
+ }
+ }
+ }
+ return null;
+ }
+
public Object lookup(BytesRef indexedId) {
LogPtr entry;
TransactionLog lookupLog;
@@ -967,6 +1150,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
static class Update {
TransactionLog log;
long version;
+ long previousVersion; // for in-place updates
long pointer;
}
@@ -1070,15 +1254,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
List entry = (List)o;
// TODO: refactor this out so we get common error handling
- int opAndFlags = (Integer)entry.get(0);
+ int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX);
if (latestOperation == 0) {
latestOperation = opAndFlags;
}
int oper = opAndFlags & UpdateLog.OPERATION_MASK;
- long version = (Long) entry.get(1);
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
switch (oper) {
case UpdateLog.ADD:
+ case UpdateLog.UPDATE_INPLACE:
case UpdateLog.DELETE:
case UpdateLog.DELETE_BY_QUERY:
Update update = new Update();
@@ -1086,13 +1271,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
update.pointer = reader.position();
update.version = version;
+ if (oper == UpdateLog.UPDATE_INPLACE && entry.size() == 5) {
+ update.previousVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ }
updatesForLog.add(update);
updates.put(version, update);
if (oper == UpdateLog.DELETE_BY_QUERY) {
deleteByQueryList.add(update);
} else if (oper == UpdateLog.DELETE) {
- deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
+ deleteList.add(new DeleteUpdate(version, (byte[])entry.get(entry.size()-1)));
}
break;
@@ -1429,23 +1617,17 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List) o;
-
- operationAndFlags = (Integer) entry.get(0);
+ operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX);
int oper = operationAndFlags & OPERATION_MASK;
- long version = (Long) entry.get(1);
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
switch (oper) {
+ case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD: {
recoveryInfo.adds++;
- // byte[] idBytes = (byte[]) entry.get(2);
- SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
- AddUpdateCommand cmd = new AddUpdateCommand(req);
- // cmd.setIndexedId(new BytesRef(idBytes));
- cmd.solrDoc = sdoc;
- cmd.setVersion(version);
+ AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- if (debug) log.debug("add " + cmd);
-
+ log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
proc.processAdd(cmd);
break;
}
@@ -1472,7 +1654,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
proc.processDelete(cmd);
break;
}
-
case UpdateLog.COMMIT: {
commitVersion = version;
break;
@@ -1552,6 +1733,31 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
+ /**
+ * Given a entry from the transaction log containing a document, return a new AddUpdateCommand that
+ * can be applied to ADD the document or do an UPDATE_INPLACE.
+ *
+ * @param req The request to use as the owner of the new AddUpdateCommand
+ * @param entry Entry from the transaction log that contains the document to be added
+ * @param operation The value of the operation flag; this must be either ADD or UPDATE_INPLACE --
+ * if it is UPDATE_INPLACE then the previous version will also be read from the entry
+ * @param version Version already obtained from the entry.
+ */
+ public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req, List entry,
+ int operation, long version) {
+ assert operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE;
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+
+ if (operation == UPDATE_INPLACE) {
+ long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+ cmd.prevVersion = prevVersion;
+ }
+ return cmd;
+ }
+
public void cancelApplyBufferedUpdates() {
this.cancelApplyBufferUpdate = true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/VersionInfo.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 3c55172..07172eb 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -193,6 +193,10 @@ public class VersionInfo {
return ulog.lookupVersion(idBytes);
}
+ /**
+ * Returns the latest version from the index, searched by the given id (bytes) as seen from the realtime searcher.
+ * Returns null if no document can be found in the index for the given id.
+ */
public Long getVersionFromIndex(BytesRef idBytes) {
// TODO: we could cache much of this and invalidate during a commit.
// TODO: most DocValues classes are threadsafe - expose which.
@@ -219,6 +223,9 @@ public class VersionInfo {
}
}
+ /**
+ * Returns the highest version from the index, or 0L if no versions can be found in the index.
+ */
public Long getMaxVersionFromIndex(IndexSearcher searcher) throws IOException {
String versionFieldName = versionField.getName();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
index 452574e..4c843ad 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java
@@ -16,25 +16,34 @@
*/
package org.apache.solr.update.processor;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
-
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.CopyField;
import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.NumericValueFieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,7 +132,178 @@ public class AtomicUpdateDocumentMerger {
return toDoc;
}
+
+ /**
+ * Given a schema field, return whether or not such a field is supported for an in-place update.
+ * Note: If an update command has updates to only supported fields (and _version_ is also supported),
+ * only then is such an update command executed as an in-place update.
+ */
+ private static boolean isSupportedFieldForInPlaceUpdate(SchemaField schemaField) {
+ return !(schemaField.indexed() || schemaField.stored() || !schemaField.hasDocValues() ||
+ schemaField.multiValued() || !(schemaField.getType() instanceof NumericValueFieldType));
+ }
+
+ /**
+ * Given an add update command, compute a list of fields that can be updated in-place. If there is even a single
+ * field in the update that cannot be updated in-place, the entire update cannot be executed in-place (and empty set
+ * will be returned in that case).
+ *
+ * @return Return a set of fields that can be in-place updated.
+ */
+ public static Set<String> computeInPlaceUpdatableFields(AddUpdateCommand cmd) throws IOException {
+ SolrInputDocument sdoc = cmd.getSolrInputDocument();
+ IndexSchema schema = cmd.getReq().getSchema();
+
+ final SchemaField uniqueKeyField = schema.getUniqueKeyField();
+ final String uniqueKeyFieldName = null == uniqueKeyField ? null : uniqueKeyField.getName();
+
+ final Set<String> candidateFields = new HashSet<>();
+
+ // if _version_ field is not supported for in-place update, bail out early
+ SchemaField versionField = schema.getFieldOrNull(DistributedUpdateProcessor.VERSION_FIELD);
+ if (versionField == null || !isSupportedFieldForInPlaceUpdate(versionField)) {
+ return Collections.emptySet();
+ }
+
+ // first pass, check the things that are virtually free,
+ // and bail out early if anything is obviously not a valid in-place update
+ for (String fieldName : sdoc.getFieldNames()) {
+ if (fieldName.equals(uniqueKeyFieldName)
+ || fieldName.equals(DistributedUpdateProcessor.VERSION_FIELD)) {
+ continue;
+ }
+ Object fieldValue = sdoc.getField(fieldName).getValue();
+ if (! (fieldValue instanceof Map) ) {
+ // not an in-place update if there are fields that are not maps
+ return Collections.emptySet();
+ }
+ // else it's a atomic update map...
+ for (String op : ((Map<String, Object>)fieldValue).keySet()) {
+ if (!op.equals("set") && !op.equals("inc")) {
+ // not a supported in-place update op
+ return Collections.emptySet();
+ }
+ }
+ candidateFields.add(fieldName);
+ }
+
+ if (candidateFields.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ // second pass over the candidates for in-place updates
+ // this time more expensive checks involving schema/config settings
+ for (String fieldName: candidateFields) {
+ SchemaField schemaField = schema.getField(fieldName);
+
+ if (!isSupportedFieldForInPlaceUpdate(schemaField)) {
+ return Collections.emptySet();
+ }
+
+ // if this field has copy target which is not supported for in place, then empty
+ for (CopyField copyField: schema.getCopyFieldsList(fieldName)) {
+ if (!isSupportedFieldForInPlaceUpdate(copyField.getDestination()))
+ return Collections.emptySet();
+ }
+ }
+
+ // third pass: requiring checks against the actual IndexWriter due to internal DV update limitations
+ SolrCore core = cmd.getReq().getCore();
+ RefCounted<IndexWriter> holder = core.getSolrCoreState().getIndexWriter(core);
+ Set<String> fieldNamesFromIndexWriter = null;
+ Set<String> segmentSortingFields = null;
+ try {
+ IndexWriter iw = holder.get();
+ fieldNamesFromIndexWriter = iw.getFieldNames();
+ segmentSortingFields = iw.getConfig().getIndexSortFields();
+ } finally {
+ holder.decref();
+ }
+ for (String fieldName: candidateFields) {
+ if (! fieldNamesFromIndexWriter.contains(fieldName) ) {
+ return Collections.emptySet(); // if this field doesn't exist, DV update can't work
+ }
+ if (segmentSortingFields.contains(fieldName) ) {
+ return Collections.emptySet(); // if this is used for segment sorting, DV updates can't work
+ }
+ }
+
+ return candidateFields;
+ }
+ /**
+ * Given an AddUpdateCommand containing update operations (e.g. set, inc), merge and resolve the operations into
+ * a partial document that can be used for indexing the in-place updates. The AddUpdateCommand is modified to contain
+ * the partial document (instead of the original document which contained the update operations) and also
+ * the prevVersion that this in-place update depends on.
+ * Note: updatedFields passed into the method can be changed, i.e. the version field can be added to the set.
+ * @return If in-place update cannot succeed, e.g. if the old document is deleted recently, then false is returned. A false
+ * return indicates that this update can be re-tried as a full atomic update. Returns true if the in-place update
+ * succeeds.
+ */
+ public boolean doInPlaceUpdateMerge(AddUpdateCommand cmd, Set<String> updatedFields) throws IOException {
+ SolrInputDocument inputDoc = cmd.getSolrInputDocument();
+ BytesRef idBytes = cmd.getIndexedId();
+
+ updatedFields.add(DistributedUpdateProcessor.VERSION_FIELD); // add the version field so that it is fetched too
+ SolrInputDocument oldDocument = RealTimeGetComponent.getInputDocument
+ (cmd.getReq().getCore(), idBytes,
+ null, // don't want the version to be returned
+ true, // avoid stored fields from index
+ updatedFields,
+ true); // resolve the full document
+
+ if (oldDocument == RealTimeGetComponent.DELETED || oldDocument == null) {
+ // This doc was deleted recently. In-place update cannot work, hence a full atomic update should be tried.
+ return false;
+ }
+
+ if (oldDocument.containsKey(DistributedUpdateProcessor.VERSION_FIELD) == false) {
+ throw new SolrException (ErrorCode.INVALID_STATE, "There is no _version_ in previous document. id=" +
+ cmd.getPrintableId());
+ }
+ Long oldVersion = (Long) oldDocument.remove(DistributedUpdateProcessor.VERSION_FIELD).getValue();
+
+ // If the oldDocument contains any other field apart from updatedFields (or id/version field), then remove them.
+ // This can happen, despite requesting for these fields in the call to RTGC.getInputDocument, if the document was
+ // fetched from the tlog and had all these fields (possibly because it was a full document ADD operation).
+ if (updatedFields != null) {
+ Collection<String> names = new HashSet<String>(oldDocument.getFieldNames());
+ for (String fieldName: names) {
+ if (fieldName.equals(DistributedUpdateProcessor.VERSION_FIELD)==false && fieldName.equals("id")==false && updatedFields.contains(fieldName)==false) {
+ oldDocument.remove(fieldName);
+ }
+ }
+ }
+ // Copy over all supported DVs from oldDocument to partialDoc
+ //
+ // Assuming multiple updates to the same doc: field 'dv1' in one update, then field 'dv2' in a second
+ // update, and then again 'dv1' in a third update (without commits in between), the last update would
+ // fetch from the tlog the partial doc for the 2nd (dv2) update. If that doc doesn't copy over the
+ // previous updates to dv1 as well, then a full resolution (by following previous pointers) would
+ // need to be done to calculate the dv1 value -- so instead copy all the potentially affected DV fields.
+ SolrInputDocument partialDoc = new SolrInputDocument();
+ String uniqueKeyField = schema.getUniqueKeyField().getName();
+ for (String fieldName : oldDocument.getFieldNames()) {
+ SchemaField schemaField = schema.getField(fieldName);
+ if (fieldName.equals(uniqueKeyField) || isSupportedFieldForInPlaceUpdate(schemaField)) {
+ partialDoc.addField(fieldName, oldDocument.getFieldValue(fieldName));
+ }
+ }
+
+ merge(inputDoc, partialDoc);
+
+ // Populate the id field if not already populated (this can happen since stored fields were avoided during fetch from RTGC)
+ if (!partialDoc.containsKey(schema.getUniqueKeyField().getName())) {
+ partialDoc.addField(idField.getName(),
+ inputDoc.getField(schema.getUniqueKeyField().getName()).getFirstValue());
+ }
+
+ cmd.prevVersion = oldVersion;
+ cmd.solrDoc = partialDoc;
+ return true;
+ }
+
protected void doSet(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
SchemaField sf = schema.getField(sif.getName());
toDoc.setField(sif.getName(), sf.getType().toNativeType(fieldVal), sif.getBoost());