You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2016/02/05 17:14:34 UTC
lucene-solr git commit: SOLR-8586: add index fingerprinting and use
it in peersync (cherry picked from commit 629767b)
Repository: lucene-solr
Updated Branches:
refs/heads/branch_5x 482b40f84 -> ff83a4001
SOLR-8586: add index fingerprinting and use it in peersync
(cherry picked from commit 629767b)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
Branch: refs/heads/branch_5x
Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
Parents: 482b40f
Author: yonik <yo...@apache.org>
Authored: Thu Feb 4 14:54:08 2016 -0500
Committer: yonik <yo...@apache.org>
Committed: Fri Feb 5 11:11:34 2016 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +
.../org/apache/solr/cloud/SyncStrategy.java | 2 +-
.../handler/component/RealTimeGetComponent.java | 8 +
.../apache/solr/update/IndexFingerprint.java | 232 +++++++++++++++++++
.../java/org/apache/solr/update/PeerSync.java | 78 +++++--
.../java/org/apache/solr/update/UpdateLog.java | 85 +++----
.../org/apache/solr/update/PeerSyncTest.java | 23 ++
7 files changed, 374 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f2067c6..2b7b46e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -119,6 +119,10 @@ New Features
* SOLR-8415: Provide command to switch between non/secure mode in ZK
(Mike Drob, Gregory Chanan)
+* SOLR-8586: added index fingerprint, a hash over all versions currently in the index.
+ PeerSync now uses this to check if replicas are in sync. (yonik)
+
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index 7a16598..d811f5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -173,7 +173,7 @@ public class SyncStrategy {
// if we can't reach a replica for sync, we still consider the overall sync a success
// TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
// to recover once more?
- PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
+ PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive, false);
return peerSync.sync();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 2bbf5a2..14a4185 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SolrReturnFields;
import org.apache.solr.update.DocumentBuilder;
+import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
@@ -536,6 +537,8 @@ public class RealTimeGetComponent extends SearchComponent
int nVersions = params.getInt("getVersions", -1);
if (nVersions == -1) return;
+ boolean doFingerprint = params.getBool("fingerprint", false);
+
String sync = params.get("sync");
if (sync != null) {
processSync(rb, nVersions, sync);
@@ -548,6 +551,11 @@ public class RealTimeGetComponent extends SearchComponent
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
}
+
+ if (doFingerprint) {
+ IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
+ rb.rsp.add("fingerprint", fingerprint.toObject());
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
new file mode 100644
index 0000000..c73b57b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
@@ -0,0 +1,232 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/** @lucene.internal */
+public class IndexFingerprint {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private long maxVersionSpecified;
+ private long maxVersionEncountered;
+ private long maxInHash;
+ private long versionsHash;
+ private long numVersions;
+ private long numDocs;
+ private long maxDoc;
+
+ public long getMaxVersionSpecified() {
+ return maxVersionSpecified;
+ }
+
+ public long getMaxVersionEncountered() {
+ return maxVersionEncountered;
+ }
+
+ public long getMaxInHash() {
+ return maxInHash;
+ }
+
+ public long getVersionsHash() {
+ return versionsHash;
+ }
+
+ public long getNumVersions() {
+ return numVersions;
+ }
+
+ public long getNumDocs() {
+ return numDocs;
+ }
+
+ public long getMaxDoc() {
+ return maxDoc;
+ }
+
+ /** Opens a new realtime searcher and returns it's fingerprint */
+ public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
+ core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
+ RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
+ try {
+ return getFingerprint(newestSearcher.get(), maxVersion);
+ } finally {
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+ }
+
+ public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
+ long start = System.currentTimeMillis();
+
+ SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
+
+ IndexFingerprint f = new IndexFingerprint();
+ f.maxVersionSpecified = maxVersion;
+ f.maxDoc = searcher.maxDoc();
+
+ // TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
+ ValueSource vs = versionField.getType().getValueSource(versionField, null);
+ Map funcContext = ValueSource.newContext(searcher);
+ vs.createWeight(funcContext, searcher);
+ for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
+ int maxDoc = ctx.reader().maxDoc();
+ f.numDocs += ctx.reader().numDocs();
+ Bits liveDocs = ctx.reader().getLiveDocs();
+ FunctionValues fv = vs.getValues(funcContext, ctx);
+ for (int doc = 0; doc < maxDoc; doc++) {
+ if (liveDocs != null && !liveDocs.get(doc)) continue;
+ long v = fv.longVal(doc);
+ f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
+ if (v <= f.maxVersionSpecified) {
+ f.maxInHash = Math.max(v, f.maxInHash);
+ f.versionsHash += Hash.fmix64(v);
+ f.numVersions++;
+ }
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
+
+ return f;
+ }
+
+ /** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
+ public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
+ int cmp;
+
+ // NOTE: some way want number of docs in index to take precedence over highest version (add-only systems for sure)
+
+ // if we're comparing all of the versions in the index, then go by the highest encountered.
+ if (f1.maxVersionSpecified == Long.MAX_VALUE) {
+ cmp = Long.compare(f1.maxVersionEncountered, f2.maxVersionEncountered);
+ if (cmp != 0) return cmp;
+ }
+
+ // Go by the highest version under the requested max.
+ cmp = Long.compare(f1.maxInHash, f2.maxInHash);
+ if (cmp != 0) return cmp;
+
+ // go by who has the most documents in the index
+ cmp = Long.compare(f1.numVersions, f2.numVersions);
+ if (cmp != 0) return cmp;
+
+ // both have same number of documents, so go by hash
+ cmp = Long.compare(f1.versionsHash, f2.versionsHash);
+ return cmp;
+ }
+
+ /**
+ * Create a generic object suitable for serializing with ResponseWriters
+ */
+ public Object toObject() {
+ Map<String,Object> map = new LinkedHashMap<>();
+ map.put("maxVersionSpecified", maxVersionSpecified);
+ map.put("maxVersionEncountered", maxVersionEncountered);
+ map.put("maxInHash", maxInHash);
+ map.put("versionsHash", versionsHash);
+ map.put("numVersions", numVersions);
+ map.put("numDocs", numDocs);
+ map.put("maxDoc", maxDoc);
+ return map;
+ }
+
+ private static long getLong(Object o, String key, long def) {
+ long v = def;
+
+ Object oval = null;
+ if (o instanceof Map) {
+ oval = ((Map)o).get(key);
+ } else if (o instanceof NamedList) {
+ oval = ((NamedList)o).get(key);
+ }
+
+ return oval != null ? ((Number)oval).longValue() : def;
+ }
+
+ /**
+ * Create an IndexFingerprint object from a deserialized generic object (Map or NamedList)
+ */
+ public static IndexFingerprint fromObject(Object o) {
+ IndexFingerprint f = new IndexFingerprint();
+ f.maxVersionSpecified = getLong(o, "maxVersionSpecified", Long.MAX_VALUE);
+ f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
+ f.maxInHash = getLong(o, "maxInHash", -1);
+ f.versionsHash = getLong(o, "versionsHash", -1);
+ f.numVersions = getLong(o, "numVersions", -1);
+ f.numDocs = getLong(o, "numDocs", -1);
+ f.maxDoc = getLong(o, "maxDoc", -1);
+ return f;
+ }
+
+ @Override
+ public String toString() {
+ return toObject().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index c5ddaf0..dbc0091 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -68,6 +68,7 @@ public class PeerSync {
private UpdateLog ulog;
private HttpShardHandlerFactory shardHandlerFactory;
private ShardHandler shardHandler;
+ private List<SyncShardRequest> requests = new ArrayList<>();
private List<Long> startingVersions;
@@ -76,8 +77,10 @@ public class PeerSync {
private Set<Long> requestedUpdateSet;
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
+ private long ourHighest; // currently just used for logging/debugging purposes
private final boolean cantReachIsSuccess;
private final boolean getNoVersionsIsSuccess;
+ private final boolean doFingerprint;
private final HttpClient client;
private final boolean onlyIfActive;
private SolrCore core;
@@ -116,6 +119,8 @@ public class PeerSync {
private static class SyncShardRequest extends ShardRequest {
List<Long> reportedVersions;
+ IndexFingerprint fingerprint;
+ boolean doFingerprintComparison;
List<Long> requestedUpdates;
Exception updateException;
}
@@ -125,16 +130,17 @@ public class PeerSync {
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
- this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false);
+ this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false, true);
}
- public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
+ public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive, boolean doFingerprint) {
this.core = core;
this.replicas = replicas;
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
this.cantReachIsSuccess = cantReachIsSuccess;
this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
+ this.doFingerprint = doFingerprint;
this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
this.onlyIfActive = onlyIfActive;
@@ -169,9 +175,8 @@ public class PeerSync {
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
}
- /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
- * when considering the last N updates between it and its peers.
- * A commit is not performed.
+ /** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
+ * It does not mean that the remote replica is in sync with us.
*/
public boolean sync() {
if (ulog == null) {
@@ -210,7 +215,7 @@ public class PeerSync {
ourLowThreshold = percentile(startingVersions, 0.8f);
ourHighThreshold = percentile(startingVersions, 0.2f);
-
+
// now make sure that the starting updates overlap our updates
// there shouldn't be reorders, so any overlap will do.
@@ -231,6 +236,7 @@ public class PeerSync {
}
ourUpdates = newList;
+ Collections.sort(ourUpdates, absComparator);
} else {
if (ourUpdates.size() > 0) {
@@ -243,9 +249,10 @@ public class PeerSync {
return false;
}
}
-
+
+ ourHighest = ourUpdates.get(0);
ourUpdateSet = new HashSet<>(ourUpdates);
- requestedUpdateSet = new HashSet<>(ourUpdates);
+ requestedUpdateSet = new HashSet<>();
for (;;) {
ShardResponse srsp = shardHandler.takeCompletedOrError();
@@ -257,9 +264,18 @@ public class PeerSync {
return false;
}
}
-
- log.info(msg() + "DONE. sync succeeded");
- return true;
+
+ // finish up any comparisons with other shards that we deferred
+ boolean success = true;
+ for (SyncShardRequest sreq : requests) {
+ if (sreq.doFingerprintComparison) {
+ success = compareFingerprint(sreq);
+ if (!success) break;
+ }
+ }
+
+ log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
+ return success;
} finally {
MDCLoggingContext.clear();
}
@@ -267,6 +283,7 @@ public class PeerSync {
private void requestVersions(String replica) {
SyncShardRequest sreq = new SyncShardRequest();
+ requests.add(sreq);
sreq.purpose = 1;
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
@@ -274,6 +291,7 @@ public class PeerSync {
sreq.params.set("qt","/get");
sreq.params.set("distrib",false);
sreq.params.set("getVersions",nUpdates);
+ sreq.params.set("fingerprint",doFingerprint);
shardHandler.submit(sreq, replica, sreq.params);
}
@@ -355,7 +373,12 @@ public class PeerSync {
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
sreq.reportedVersions = otherVersions;
- log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
+ Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
+
+ log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] + " fingerprint:" + fingerprint );
+ if (fingerprint != null) {
+ sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
+ }
if (otherVersions.size() == 0) {
return getNoVersionsIsSuccess;
@@ -371,13 +394,14 @@ public class PeerSync {
long otherHigh = percentile(otherVersions, .2f);
long otherLow = percentile(otherVersions, .8f);
+ long otherHighest = otherVersions.get(0);
if (ourHighThreshold < otherLow) {
// Small overlap between version windows and ours is older
// This means that we might miss updates if we attempted to use this method.
// Since there exists just one replica that is so much newer, we must
// fail the sync.
- log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
+ log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
return false;
}
@@ -385,7 +409,10 @@ public class PeerSync {
// Small overlap between windows and ours is newer.
// Using this list to sync would result in requesting/replaying results we don't need
// and possibly bringing deleted docs back to life.
- log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+ log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
+
+ // Because our versions are newer, IndexFingerprint with the remote would not match us.
+ // We return true on our side, but the remote peersync with us should fail.
return true;
}
@@ -408,9 +435,15 @@ public class PeerSync {
sreq.requestedUpdates = toRequest;
if (toRequest.isEmpty()) {
- log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+ log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
// we had (or already requested) all the updates referenced by the replica
+
+ // If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to defer
+ if (doFingerprint) {
+ sreq.doFingerprintComparison = true;
+ }
+
return true;
}
@@ -422,6 +455,19 @@ public class PeerSync {
return requestUpdates(srsp, toRequest);
}
+ private boolean compareFingerprint(SyncShardRequest sreq) {
+ if (sreq.fingerprint == null) return true;
+ try {
+ IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
+ int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
+ log.info("Fingerprint comparison: " + cmp);
+ return cmp == 0; // currently, we only check for equality...
+ } catch(IOException e){
+ log.error(msg() + "Error getting index fingerprint", e);
+ return false;
+ }
+ }
+
private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
String replica = srsp.getShardRequest().shards[0];
@@ -556,7 +602,7 @@ public class PeerSync {
}
}
- return true;
+ return compareFingerprint(sreq);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 7d7d3ef..7e3fc9f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -283,7 +283,7 @@ public class UpdateLog implements PluginInfoInitialized {
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
}
-
+
TransactionLog oldLog = null;
for (String oldLogName : tlogFiles) {
File f = new File(tlogDir, oldLogName);
@@ -334,11 +334,11 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
-
+
public String getLogDir() {
return tlogDir.getAbsolutePath();
}
-
+
public List<Long> getStartingVersions() {
return startingVersions;
}
@@ -503,32 +503,35 @@ public class UpdateLog implements PluginInfoInitialized {
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
// given that we just did a delete-by-query, we don't know what documents were
// affected and hence we must purge our caches.
- if (map != null) map.clear();
- if (prevMap != null) prevMap.clear();
- if (prevMap2 != null) prevMap2.clear();
-
+ openRealtimeSearcher();
trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
- // oldDeletes.clear();
-
- // We must cause a new IndexReader to be opened before anything looks at these caches again
- // so that a cache miss will read fresh data.
- //
- // TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands
- try {
- RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
- holder.decref();
- } catch (Exception e) {
- SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+ if (trace) {
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+ log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
-
}
+ }
+ }
- LogPtr ptr = new LogPtr(pos, cmd.getVersion());
-
- if (trace) {
- log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ /** Opens a new realtime searcher and clears the id caches.
+ * This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint)
+ */
+ public void openRealtimeSearcher() {
+ synchronized (this) {
+ // We must cause a new IndexReader to be opened before anything looks at these caches again
+ // so that a cache miss will read fresh data.
+ try {
+ RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+ holder.decref();
+ } catch (Exception e) {
+ SolrException.log(log, "Error opening realtime searcher", e);
+ return;
}
+
+ if (map != null) map.clear();
+ if (prevMap != null) prevMap.clear();
+ if (prevMap2 != null) prevMap2.clear();
}
}
@@ -620,7 +623,7 @@ public class UpdateLog implements PluginInfoInitialized {
public boolean hasUncommittedChanges() {
return tlog != null;
}
-
+
public void preCommit(CommitUpdateCommand cmd) {
synchronized (this) {
if (debug) {
@@ -878,11 +881,11 @@ public class UpdateLog implements PluginInfoInitialized {
theLog.forceClose();
}
}
-
+
public void close(boolean committed) {
close(committed, false);
}
-
+
public void close(boolean committed, boolean deleteOnClose) {
synchronized (this) {
recoveryExecutor.shutdown(); // no new tasks
@@ -923,7 +926,7 @@ public class UpdateLog implements PluginInfoInitialized {
this.id = id;
}
}
-
+
public class RecentUpdates implements Closeable {
final Deque<TransactionLog> logList; // newest first
@@ -950,17 +953,17 @@ public class UpdateLog implements PluginInfoInitialized {
public List<Long> getVersions(int n) {
List<Long> ret = new ArrayList<>(n);
-
+
for (List<Update> singleList : updateList) {
for (Update ptr : singleList) {
ret.add(ptr.version);
if (--n <= 0) return ret;
}
}
-
+
return ret;
}
-
+
public Object lookup(long version) {
Update update = updates.get(version);
if (update == null) return null;
@@ -1004,7 +1007,7 @@ public class UpdateLog implements PluginInfoInitialized {
try {
o = reader.next();
if (o==null) break;
-
+
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o;
@@ -1027,13 +1030,13 @@ public class UpdateLog implements PluginInfoInitialized {
updatesForLog.add(update);
updates.put(version, update);
-
+
if (oper == UpdateLog.DELETE_BY_QUERY) {
deleteByQueryList.add(update);
} else if (oper == UpdateLog.DELETE) {
deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
}
-
+
break;
case UpdateLog.COMMIT:
@@ -1063,7 +1066,7 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
-
+
@Override
public void close() {
for (TransactionLog log : logList) {
@@ -1331,10 +1334,10 @@ public class UpdateLog implements PluginInfoInitialized {
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
Math.floor(cpos / (double) csize * 100.));
-
+
}
}
-
+
o = null;
o = tlogReader.next();
if (o == null && activeLog) {
@@ -1513,25 +1516,25 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
}
-
+
protected String getTlogDir(SolrCore core, PluginInfo info) {
String dataDir = (String) info.initArgs.get("dir");
-
+
String ulogDir = core.getCoreDescriptor().getUlogDir();
if (ulogDir != null) {
dataDir = ulogDir;
}
-
+
if (dataDir == null || dataDir.length() == 0) {
dataDir = core.getDataDir();
}
return dataDir + "/" + TLOG_NAME;
}
-
+
/**
* Clears the logs on the file system. Only call before init.
- *
+ *
* @param core the SolrCore
* @param ulogPluginInfo the init info for the UpdateHandler
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
index 8083ad0..bcaf846 100644
--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
@@ -168,6 +168,29 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+
+ // now lets check fingerprinting causes appropriate fails
+ v = 4000;
+ add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+ toAdd = numVersions+10;
+ for (int i=0; i<toAdd; i++) {
+ add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+ add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+ }
+
+ // client0 now has an additional add beyond our window and the fingerprint should cause this to fail
+ assertSync(client1, numVersions, false, shardsArr[0]);
+
+ // lets add the missing document and verify that order doesn't matter
+ add(client1, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
+ // lets do some overwrites to ensure that repeated updates and maxDoc don't matter
+ for (int i=0; i<10; i++) {
+ add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
+ }
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
}
Re: lucene-solr git commit: SOLR-8586: add index fingerprinting and
use it in peersync (cherry picked from commit 629767b)
Posted by Yonik Seeley <ys...@gmail.com>.
On Fri, Feb 5, 2016 at 11:23 AM, Uwe Schindler <uw...@thetaphi.de> wrote:
> Hi Yonik,
>
> I committed a change yesterday. Could you backports this, too? It broke the
> forbidden tests' because it used currentTimeMillis instead of RTimer.
Ah, I hadn't noticed (I got busy between the commit to trunk yesterday
and the backport to 5x today).
Will do.
-Yonik
> Uwe
>
> Am 5. Februar 2016 17:14:34 MEZ, schrieb yonik@apache.org:
>>
>> Repository: lucene-solr
>> Updated Branches:
>> refs/heads/branch_5x 482b40f84 -> ff83a4001
>>
>>
>> SOLR-8586: add index fingerprinting and use it in peersync
>> (cherry picked from commit 629767b)
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
>> Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
>> Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
>>
>> Branch: refs/heads/branch_5x
>> Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
>> Parents: 482b40f
>> Author: yonik
>> <yo...@apache.org>
>> Authored: Thu Feb 4 14:54:08 2016 -0500
>> Committer: yonik <yo...@apache.org>
>> Committed: Fri Feb 5 11:11:34 2016 -0500
>>
>> ________________________________
>>
>> solr/CHANGES.txt | 4 +
>> .../org/apache/solr/cloud/SyncStrategy.java | 2 +-
>> .../handler/component/RealTimeGetComponent.java | 8 +
>> .../apache/solr/update/IndexFingerprint.java | 232 +++++++++++++++++++
>> .../java/org/apache/solr/update/PeerSync.java | 78 +++++--
>> .../java/org/apache/solr/update/UpdateLog.java | 85 +++----
>> .../org/apache/solr/update/PeerSyncTest.java | 23 ++
>> 7 files changed, 374 insertions(+), 58 deletions(-)
>> ________________________________
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
>> ________________________________
>>
>> diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
>> index
>> f2067c6..2b7b46e 100644
>> --- a/solr/CHANGES.txt
>> +++ b/solr/CHANGES.txt
>> @@ -119,6 +119,10 @@ New Features
>> * SOLR-8415: Provide command to switch between non/secure mode in ZK
>> (Mike Drob, Gregory Chanan)
>>
>> +* SOLR-8586: added index fingerprint, a hash over all versions currently
>> in the index.
>> + PeerSync now uses this to check if replicas are in sync. (yonik)
>> +
>> +
>> Bug Fixes
>> ----------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> ________________________________
>>
>> diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> index 7a16598..d811f5c 100644
>> ---
>> a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> @@ -173,7 +173,7 @@ public class SyncStrategy {
>> // if we can't reach a replica for sync, we still consider the
>> overall sync a success
>> // TODO: as an assurance, we should still try and tell the sync nodes
>> that we couldn't reach
>> // to recover once more?
>> - PeerSync peerSync = new PeerSync(core, syncWith,
>> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true,
>> peerSyncOnlyWithActive);
>> + PeerSync peerSync = new PeerSync(core, syncWith,
>> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true,
>> peerSyncOnlyWithActive, false);
>> return peerSync.sync();
>> }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> index 2bbf5a2..14a4185 100644
>> ---
>> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> +++
>> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> @@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
>> import org.apache.solr.search.SolrIndexSearcher;
>> import org.apache.solr.search.SolrReturnFields;
>> import org.apache.solr.update.DocumentBuilder;
>> +import org.apache.solr.update.IndexFingerprint;
>>
>> import org.apache.solr.update.PeerSync;
>> import org.apache.solr.update.UpdateLog;
>> import org.apache.solr.util.RefCounted;
>> @@ -536,6 +537,8 @@ public class RealTimeGetComponent extends
>> SearchComponent
>> int nVersions = params.getInt("getVersions", -1);
>> if (nVersions == -1) return;
>>
>> + boolean doFingerprint = params.getBool("fingerprint", false);
>> +
>> String sync = params.get("sync");
>> if (sync != null) {
>> processSync(rb, nVersions, sync);
>> @@ -548,6 +551,11 @@ public class RealTimeGetComponent extends
>> SearchComponent
>> try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates())
>> {
>> rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
>> }
>> +
>> + if (doFingerprint) {
>> + IndexFingerprint fingerprint =
>> IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
>> + rb.rsp.add("fingerprint", fingerprint.toObject());
>> + }
>> }
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> new file mode 100644
>> index 0000000..c73b57b
>> --- /dev/null
>> +++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> @@ -0,0 +1,232 @@
>> +package org.apache.solr.update;
>> +
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements. See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License,
>> Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +import java.io.IOException;
>> +import java.lang.invoke.MethodHandles;
>> +import java.net.ConnectException;
>> +import java.net.SocketException;
>> +import java.util.ArrayList;
>> +import java.util.Collections;
>> +import
>> java.util.Comparator;
>> +import java.util.HashSet;
>> +import java.util.LinkedHashMap;
>> +import java.util.List;
>> +import java.util.Map;
>> +import java.util.Set;
>> +
>> +import org.apache.http.NoHttpResponseException;
>> +import org.apache.http.client.HttpClient;
>> +import org.apache.http.conn.ConnectTimeoutException;
>> +import org.apache.lucene.index.LeafReader;
>> +import org.apache.lucene.index.LeafReaderContext;
>> +import org.apache.lucene.queries.function.FunctionValues;
>> +import org.apache.lucene.queries.function.ValueSource;
>> +import org.apache.lucene.util.Bits;
>> +import org.apache.lucene.util.BytesRef;
>> +import org.apache.solr.client.solrj.SolrServerException;
>> +import org.apache.solr.cloud.ZkController;
>> +import org.apache.solr.common.SolrException;
>> +import org.apache.solr.common.SolrInputDocument;
>> +import org.apache.solr.common.params.ModifiableSolrParams;
>> +import org.apache.solr.common.util.Hash;
>> +import org.apache.solr.common.util.NamedList;
>> +import org.apache.solr.common.util.StrUtils;
>> +import org.apache.solr.core.SolrCore;
>> +import org.apache.solr.handler.component.HttpShardHandlerFactory;
>> +import org.apache.solr.handler.component.ShardHandler;
>> +import org.apache.solr.handler.component.ShardHandlerFactory;
>> +import org.apache.solr.handler.component.ShardRequest;
>> +import org.apache.solr.handler.component.ShardResponse;
>> +import org.apache.solr.logging.MDCLoggingContext;
>> +import org.apache.solr.request.LocalSolrQueryRequest;
>> +import org.apache.solr.request.SolrQueryRequest;
>> +import org.apache.solr.response.SolrQueryResponse;
>> +import org.apache.solr.schema.SchemaField;
>> +import org.apache.solr.search.SolrIndexSearcher;
>> +import org.apache.solr.update.processor.UpdateRequestProcessor;
>> +import org.apache.solr.update.processor.UpdateRequestProcessorChain;
>> +import org.apache.solr.util.RefCounted;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import static
>> org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
>> +import static
>> org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
>> +
>> +/** @lucene.internal */
>> +public class IndexFingerprint {
>> + private static final Logger log =
>> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>> +
>> + private long maxVersionSpecified;
>> + private long maxVersionEncountered;
>> + private long maxInHash;
>> + private long versionsHash;
>> + private long numVersions;
>> + private long numDocs;
>> + private long maxDoc;
>> +
>> + public long getMaxVersionSpecified() {
>> + return maxVersionSpecified;
>> + }
>> +
>> + public long getMaxVersionEncountered() {
>> + return maxVersionEncountered;
>> + }
>> +
>> + public long getMaxInHash() {
>> + return
>> maxInHash;
>> + }
>> +
>> + public long getVersionsHash() {
>> + return versionsHash;
>> + }
>> +
>> + public long getNumVersions() {
>> + return numVersions;
>> + }
>> +
>> + public long getNumDocs() {
>> + return numDocs;
>> + }
>> +
>> + public long getMaxDoc() {
>> + return maxDoc;
>> + }
>> +
>> + /** Opens a new realtime searcher and returns it's fingerprint */
>> + public static IndexFingerprint getFingerprint(SolrCore core, long
>> maxVersion) throws IOException {
>> + core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
>> + RefCounted<SolrIndexSearcher> newestSearcher =
>> core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
>> + try {
>> + return getFingerprint(newestSearcher.get(), maxVersion);
>> + } finally {
>> + if (newestSearcher != null) {
>> + newestSearcher.decref();
>> + }
>> + }
>> + }
>> +
>> + public
>> static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long
>> maxVersion) throws IOException {
>> + long start = System.currentTimeMillis();
>> +
>> + SchemaField versionField =
>> VersionInfo.getAndCheckVersionField(searcher.getSchema());
>> +
>> + IndexFingerprint f = new IndexFingerprint();
>> + f.maxVersionSpecified = maxVersion;
>> + f.maxDoc = searcher.maxDoc();
>> +
>> + // TODO: this could be parallelized, or even cached per-segment if
>> performance becomes an issue
>> + ValueSource vs = versionField.getType().getValueSource(versionField,
>> null);
>> + Map funcContext = ValueSource.newContext(searcher);
>> + vs.createWeight(funcContext, searcher);
>> + for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves())
>> {
>> + int maxDoc = ctx.reader().maxDoc();
>> + f.numDocs += ctx.reader().numDocs();
>> + Bits liveDocs = ctx.reader().getLiveDocs();
>> + FunctionValues fv =
>> vs.getValues(funcContext, ctx);
>> + for (int doc = 0; doc < maxDoc; doc++) {
>> + if (liveDocs != null && !liveDocs.get(doc)) continue;
>> + long v = fv.longVal(doc);
>> + f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
>> + if (v <= f.maxVersionSpecified) {
>> + f.maxInHash = Math.max(v, f.maxInHash);
>> + f.versionsHash += Hash.fmix64(v);
>> + f.numVersions++;
>> + }
>> + }
>> + }
>> +
>> + long end = System.currentTimeMillis();
>> + log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
>> +
>> + return f;
>> + }
>> +
>> + /** returns 0 for equal, negative if f1 is less recent than f2,
>> positive if more recent */
>> + public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
>> + int cmp;
>> +
>> + // NOTE: some way want number of docs in index to take
>> precedence over highest version (add-only systems for sure)
>> +
>> + // if we're comparing all of the versions in the index, then go by
>> the highest encountered.
>> + if (f1.maxVersionSpecified == Long.MAX_VALUE) {
>> + cmp = Long.compare(f1.maxVersionEncountered,
>> f2.maxVersionEncountered);
>> + if (cmp != 0) return cmp;
>> + }
>> +
>> + // Go by the highest version under the requested max.
>> + cmp = Long.compare(f1.maxInHash, f2.maxInHash);
>> + if (cmp != 0) return cmp;
>> +
>> + // go by who has the most documents in the index
>> + cmp = Long.compare(f1.numVersions, f2.numVersions);
>> + if (cmp != 0) return cmp;
>> +
>> + // both have same number of documents, so go by hash
>> + cmp = Long.compare(f1.versionsHash, f2.versionsHash);
>> + return cmp;
>> + }
>> +
>> + /**
>> + * Create a generic object suitable for serializing with
>> ResponseWriters
>> + */
>> + public Object
>> toObject() {
>> + Map<String,Object> map = new LinkedHashMap<>();
>> + map.put("maxVersionSpecified", maxVersionSpecified);
>> + map.put("maxVersionEncountered", maxVersionEncountered);
>> + map.put("maxInHash", maxInHash);
>> + map.put("versionsHash", versionsHash);
>> + map.put("numVersions", numVersions);
>> + map.put("numDocs", numDocs);
>> + map.put("maxDoc", maxDoc);
>> + return map;
>> + }
>> +
>> + private static long getLong(Object o, String key, long def) {
>> + long v = def;
>> +
>> + Object oval = null;
>> + if (o instanceof Map) {
>> + oval = ((Map)o).get(key);
>> + } else if (o instanceof NamedList) {
>> + oval = ((NamedList)o).get(key);
>> + }
>> +
>> + return oval != null ? ((Number)oval).longValue() : def;
>> + }
>> +
>> + /**
>> + * Create an IndexFingerprint object from a deserialized generic object
>> (Map or NamedList)
>> + */
>> +
>> public static IndexFingerprint fromObject(Object o) {
>> + IndexFingerprint f = new IndexFingerprint();
>> + f.maxVersionSpecified = getLong(o, "maxVersionSpecified",
>> Long.MAX_VALUE);
>> + f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
>> + f.maxInHash = getLong(o, "maxInHash", -1);
>> + f.versionsHash = getLong(o, "versionsHash", -1);
>> + f.numVersions = getLong(o, "numVersions", -1);
>> + f.numDocs = getLong(o, "numDocs", -1);
>> + f.maxDoc = getLong(o, "maxDoc", -1);
>> + return f;
>> + }
>> +
>> + @Override
>> + public String toString() {
>> + return toObject().toString();
>> + }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> index c5ddaf0..dbc0091 100644
>> --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> @@ -68,6 +68,7 @@ public class PeerSync {
>> private UpdateLog ulog;
>> private HttpShardHandlerFactory shardHandlerFactory;
>> private ShardHandler shardHandler;
>> + private List<SyncShardRequest> requests = new ArrayList<>();
>>
>> private List<Long> startingVersions;
>>
>> @@ -76,8 +77,10 @@ public class PeerSync {
>> private Set<Long> requestedUpdateSet;
>> private long ourLowThreshold; // 20th percentile
>> private long ourHighThreshold; // 80th percentile
>> + private long ourHighest; // currently just used for logging/debugging
>> purposes
>> private final boolean cantReachIsSuccess;
>> private final
>> boolean getNoVersionsIsSuccess;
>> + private final boolean doFingerprint;
>> private final HttpClient client;
>> private final boolean onlyIfActive;
>> private SolrCore core;
>> @@ -116,6 +119,8 @@ public class PeerSync {
>>
>> private static class SyncShardRequest extends ShardRequest {
>> List<Long> reportedVersions;
>> + IndexFingerprint fingerprint;
>> + boolean doFingerprintComparison;
>> List<Long> requestedUpdates;
>> Exception updateException;
>> }
>> @@ -125,16 +130,17 @@ public class PeerSync {
>> }
>>
>> public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
>> - this(core, replicas, nUpdates, cantReachIsSuccess,
>> getNoVersionsIsSuccess, false);
>> + this(core, replicas, nUpdates, cantReachIsSuccess,
>> getNoVersionsIsSuccess, false, true);
>> }
>>
>> - public PeerSync(SolrCore
>> core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess,
>> boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
>> + public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>> onlyIfActive, boolean doFingerprint) {
>> this.core = core;
>> this.replicas = replicas;
>> this.nUpdates = nUpdates;
>> this.maxUpdates = nUpdates;
>> this.cantReachIsSuccess = cantReachIsSuccess;
>> this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
>> + this.doFingerprint = doFingerprint;
>> this.client =
>> core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
>> this.onlyIfActive = onlyIfActive;
>>
>> @@ -169,9 +175,8 @@ public class PeerSync {
>> return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
>> }
>>
>> - /** Returns true if peer sync was successful,
>> meaning that this core may not be considered to have the latest updates
>> - * when considering the last N updates between it and its peers.
>> - * A commit is not performed.
>> + /** Returns true if peer sync was successful, meaning that this core
>> may be considered to have the latest updates.
>> + * It does not mean that the remote replica is in sync with us.
>> */
>> public boolean sync() {
>> if (ulog == null) {
>> @@ -210,7 +215,7 @@ public class PeerSync {
>>
>> ourLowThreshold = percentile(startingVersions, 0.8f);
>> ourHighThreshold = percentile(startingVersions, 0.2f);
>> -
>> +
>> // now make sure that the starting updates overlap our updates
>> // there shouldn't be reorders, so any overlap will do.
>>
>> @@ -231,6 +236,7 @@ public class PeerSync {
>> }
>>
>> ourUpdates = newList;
>> +
>> Collections.sort(ourUpdates, absComparator);
>> } else {
>>
>> if (ourUpdates.size() > 0) {
>> @@ -243,9 +249,10 @@ public class PeerSync {
>> return false;
>> }
>> }
>> -
>> +
>> + ourHighest = ourUpdates.get(0);
>> ourUpdateSet = new HashSet<>(ourUpdates);
>> - requestedUpdateSet = new HashSet<>(ourUpdates);
>> + requestedUpdateSet = new HashSet<>();
>>
>> for (;;) {
>> ShardResponse srsp = shardHandler.takeCompletedOrError();
>> @@ -257,9 +264,18 @@ public class PeerSync {
>> return false;
>> }
>> }
>> -
>> - log.info(msg() + "DONE. sync succeeded");
>> - return true;
>> +
>> + // finish up any comparisons with other shards that we deferred
>> + boolean success = true;
>> + for (SyncShardRequest sreq : requests)
>> {
>> + if (sreq.doFingerprintComparison) {
>> + success = compareFingerprint(sreq);
>> + if (!success) break;
>> + }
>> + }
>> +
>> + log.info(msg() + "DONE. sync " + (success ? "succeeded" :
>> "failed"));
>> + return success;
>> } finally {
>> MDCLoggingContext.clear();
>> }
>> @@ -267,6 +283,7 @@ public class PeerSync {
>>
>> private void requestVersions(String replica) {
>> SyncShardRequest sreq = new SyncShardRequest();
>> + requests.add(sreq);
>> sreq.purpose = 1;
>> sreq.shards = new String[]{replica};
>> sreq.actualShards = sreq.shards;
>> @@ -274,6 +291,7 @@ public class PeerSync {
>> sreq.params.set("qt","/get");
>> sreq.params.set("distrib",false);
>> sreq.params.set("getVersions",nUpdates);
>> + sreq.params.set("fingerprint",doFingerprint);
>> shardHandler.submit(sreq, replica,
>> sreq.params);
>> }
>>
>> @@ -355,7 +373,12 @@ public class PeerSync {
>> SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
>> sreq.reportedVersions = otherVersions;
>>
>> - log.info(msg() + " Received " + otherVersions.size() + " versions
>> from " + sreq.shards[0] );
>> + Object fingerprint =
>> srsp.getSolrResponse().getResponse().get("fingerprint");
>> +
>> + log.info(msg() + " Received " + otherVersions.size() + " versions
>> from " + sreq.shards[0] + " fingerprint:" + fingerprint );
>> + if (fingerprint != null) {
>> + sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
>> + }
>>
>> if (otherVersions.size() == 0) {
>> return getNoVersionsIsSuccess;
>> @@ -371,13 +394,14 @@ public class PeerSync {
>>
>> long otherHigh = percentile(otherVersions, .2f);
>> long otherLow =
>> percentile(otherVersions, .8f);
>> + long otherHighest = otherVersions.get(0);
>>
>> if (ourHighThreshold < otherLow) {
>> // Small overlap between version windows and ours is older
>> // This means that we might miss updates if we attempted to use
>> this method.
>> // Since there exists just one replica that is so much newer, we
>> must
>> // fail the sync.
>> - log.info(msg() + " Our versions are too old.
>> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
>> + log.info(msg() + " Our versions are too old.
>> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + "
>> ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>> return false;
>> }
>>
>> @@ -385,7 +409,10 @@ public class PeerSync {
>> // Small overlap between windows and ours is newer.
>> // Using this list to sync would
>> result in requesting/replaying results we don't need
>> // and possibly bringing deleted docs back to life.
>> - log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>> + log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest="
>> + ourHighest + " otherHighest=" + otherHighest);
>> +
>> + // Because our versions are newer, IndexFingerprint with the remote
>> would not match us.
>> + // We return true on our side, but the remote peersync with us
>> should fail.
>> return true;
>> }
>>
>> @@ -408,9 +435,15 @@ public class PeerSync {
>> sreq.requestedUpdates = toRequest;
>>
>> if (toRequest.isEmpty()) {
>> - log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + "
>> otherHigh="+otherHigh);
>> + log.info(msg() + " No additional versions requested.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest="
>> + ourHighest + " otherHighest=" + otherHighest);
>>
>> // we had (or already requested) all the updates referenced by the
>> replica
>> +
>> + // If we requested updates from another replica, we can't compare
>> fingerprints yet with this replica, we need to defer
>> + if (doFingerprint) {
>> + sreq.doFingerprintComparison = true;
>> + }
>> +
>> return true;
>> }
>>
>> @@ -422,6 +455,19 @@ public class PeerSync {
>> return requestUpdates(srsp, toRequest);
>> }
>>
>> + private boolean compareFingerprint(SyncShardRequest sreq) {
>> + if (sreq.fingerprint == null) return true;
>> + try {
>> + IndexFingerprint ourFingerprint =
>> IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
>> +
>> int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
>> + log.info("Fingerprint comparison: " + cmp);
>> + return cmp == 0; // currently, we only check for equality...
>> + } catch(IOException e){
>> + log.error(msg() + "Error getting index fingerprint", e);
>> + return false;
>> + }
>> + }
>> +
>> private boolean requestUpdates(ShardResponse srsp, List<Long>
>> toRequest) {
>> String replica = srsp.getShardRequest().shards[0];
>>
>> @@ -556,7 +602,7 @@ public class PeerSync {
>> }
>> }
>>
>> - return true;
>> + return compareFingerprint(sreq);
>> }
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> ________________________________
>>
>> diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> index 7d7d3ef..7e3fc9f 100644
>> --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> @@ -283,7 +283,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> if (debug) {
>> log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing
>> tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
>> }
>> -
>> +
>> TransactionLog oldLog = null;
>> for (String oldLogName : tlogFiles) {
>> File f = new File(tlogDir, oldLogName);
>> @@ -334,11 +334,11 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> }
>>
>> }
>> -
>> +
>> public String getLogDir() {
>> return tlogDir.getAbsolutePath();
>> }
>> -
>> +
>> public List<Long>
>> getStartingVersions() {
>> return startingVersions;
>> }
>> @@ -503,32 +503,35 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
>> // given that we just did a delete-by-query, we don't know what
>> documents were
>> // affected and hence we must purge our caches.
>> - if (map != null) map.clear();
>> - if (prevMap != null) prevMap.clear();
>> - if (prevMap2 != null) prevMap2.clear();
>> -
>> + openRealtimeSearcher();
>> trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
>>
>> - // oldDeletes.clear();
>> -
>> - // We must cause a new IndexReader to be opened before anything
>> looks at these caches again
>> - // so that a cache miss will read fresh data.
>> - //
>> - // TODO: FUTURE: open a new searcher lazily for better throughput
>> with delete-by-query commands
>> -
>> try {
>> - RefCounted<SolrIndexSearcher> holder =
>> uhandler.core.openNewSearcher(true, true);
>> - holder.decref();
>> - } catch (Exception e) {
>> - SolrException.log(log, "Error opening realtime searcher for
>> deleteByQuery", e);
>> + if (trace) {
>> + LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>> + log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>> tlog + " " + ptr + " map=" + System.identityHashCode(map));
>> }
>> -
>> }
>> + }
>> + }
>>
>> - LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>> -
>> - if (trace) {
>> - log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>> tlog + " " + ptr + " map=" + System.identityHashCode(map));
>> + /** Opens a new realtime searcher and clears the id caches.
>> + * This may also be called when we updates are being buffered (from
>> PeerSync/IndexFingerprint)
>> + */
>> +
>> public void openRealtimeSearcher() {
>> + synchronized (this) {
>> + // We must cause a new IndexReader to be opened before anything
>> looks at these caches again
>> + // so that a cache miss will read fresh data.
>> + try {
>> + RefCounted<SolrIndexSearcher> holder =
>> uhandler.core.openNewSearcher(true, true);
>> + holder.decref();
>> + } catch (Exception e) {
>> + SolrException.log(log, "Error opening realtime searcher", e);
>> + return;
>> }
>> +
>> + if (map != null) map.clear();
>> + if (prevMap != null) prevMap.clear();
>> + if (prevMap2 != null) prevMap2.clear();
>> }
>> }
>>
>> @@ -620,7 +623,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> public boolean hasUncommittedChanges() {
>> return tlog != null;
>> }
>> -
>> +
>> public void preCommit(CommitUpdateCommand cmd) {
>> synchronized (this) {
>>
>> if (debug) {
>> @@ -878,11 +881,11 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> theLog.forceClose();
>> }
>> }
>> -
>> +
>> public void close(boolean committed) {
>> close(committed, false);
>> }
>> -
>> +
>> public void close(boolean committed, boolean deleteOnClose) {
>> synchronized (this) {
>> recoveryExecutor.shutdown(); // no new tasks
>> @@ -923,7 +926,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> this.id = id;
>> }
>> }
>> -
>> +
>> public class RecentUpdates implements Closeable {
>>
>> final Deque<TransactionLog> logList; // newest first
>> @@ -950,17 +953,17 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>
>> public List<Long> getVersions(int n) {
>> List<Long> ret = new ArrayList<>(n);
>> -
>> +
>> for
>> (List<Update> singleList : updateList) {
>> for (Update ptr : singleList) {
>> ret.add(ptr.version);
>> if (--n <= 0) return ret;
>> }
>> }
>> -
>> +
>> return ret;
>> }
>> -
>> +
>> public Object lookup(long version) {
>> Update update = updates.get(version);
>> if (update == null) return null;
>> @@ -1004,7 +1007,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> try {
>> o = reader.next();
>> if (o==null) break;
>> -
>> +
>> // should currently be a List<Oper,Ver,Doc/Id>
>> List entry = (List)o;
>>
>> @@ -1027,13 +1030,13 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>
>> updatesForLog.add(update);
>> updates.put(version, update);
>> -
>> +
>>
>> if (oper == UpdateLog.DELETE_BY_QUERY) {
>> deleteByQueryList.add(update);
>> } else if (oper == UpdateLog.DELETE) {
>> deleteList.add(new DeleteUpdate(version,
>> (byte[])entry.get(2)));
>> }
>> -
>> +
>> break;
>>
>> case UpdateLog.COMMIT:
>> @@ -1063,7 +1066,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> }
>>
>> }
>> -
>> +
>> @Override
>> public void close() {
>> for (TransactionLog log : logList) {
>> @@ -1331,10 +1334,10 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> "log replay status {} active={} starting pos={}
>> current pos={} current size={} % read={}",
>> translog, activeLog,
>> recoveryInfo.positionOfStart, cpos, csize,
>> Math.floor(cpos /
>> (double) csize * 100.));
>> -
>> +
>> }
>> }
>> -
>> +
>> o = null;
>> o = tlogReader.next();
>> if (o == null && activeLog) {
>> @@ -1513,25 +1516,25 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>> }
>> }
>> }
>> -
>> +
>> protected String getTlogDir(SolrCore core, PluginInfo info) {
>> String dataDir = (String) info.initArgs.get("dir");
>> -
>> +
>> String ulogDir = core.getCoreDescriptor().getUlogDir();
>> if (ulogDir != null) {
>> dataDir = ulogDir;
>> }
>> -
>> +
>> if (dataDir == null || dataDir.length() == 0) {
>> dataDir = core.getDataDir();
>> }
>>
>> return dataDir + "/" + TLOG_NAME;
>> }
>> -
>> +
>> /**
>> * Clears the logs on the file system. Only call before init.
>> - *
>> + *
>>
>> * @param core the SolrCore
>> * @param ulogPluginInfo the init info for the UpdateHandler
>> */
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> ________________________________
>>
>> diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> index 8083ad0..bcaf846 100644
>> --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> @@ -168,6 +168,29 @@ public class PeerSyncTest extends
>> BaseDistributedSearchTestCase {
>>
>> assertSync(client1, numVersions, true, shardsArr[0]);
>> client0.commit(); client1.commit(); queryAndCompare(params("q",
>> "*:*", "sort","_version_ desc"), client0,
>> client1);
>> +
>> + // now lets check fingerprinting causes appropriate fails
>> + v = 4000;
>> + add(client0, seenLeader,
>> sdoc("id",Integer.toString((int)v),"_version_",v));
>> + toAdd = numVersions+10;
>> + for (int i=0; i<toAdd; i++) {
>> + add(client0, seenLeader,
>> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>> + add(client1, seenLeader,
>> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>> + }
>> +
>> + // client0 now has an additional add beyond our window and the
>> fingerprint should cause this to fail
>> + assertSync(client1, numVersions, false, shardsArr[0]);
>> +
>> + // lets add the missing document and verify that order doesn't matter
>> + add(client1, seenLeader,
>> sdoc("id",Integer.toString((int)v),"_version_",v));
>> + assertSync(client1, numVersions, true, shardsArr[0]);
>> +
>> + // lets do some overwrites to ensure that repeated updates and maxDoc
>> don't
>> matter
>> + for (int i=0; i<10; i++) {
>> + add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i +
>> 1), "_version_", v + i + 1));
>> + }
>> + assertSync(client1, numVersions, true, shardsArr[0]);
>> +
>> }
>>
>>
>>
>
> --
> Uwe Schindler
> H.-H.-Meier-Allee 63, 28213 Bremen
> http://www.thetaphi.de
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org
Re: lucene-solr git commit: SOLR-8586: add index fingerprinting and use it in peersync (cherry picked from commit 629767b)
Posted by Uwe Schindler <uw...@thetaphi.de>.
Hi Yonik,
I committed a change yesterday. Could you backports this, too? It broke the forbidden tests' because it used currentTimeMillis instead of RTimer.
Uwe
Am 5. Februar 2016 17:14:34 MEZ, schrieb yonik@apache.org:
>Repository: lucene-solr
>Updated Branches:
> refs/heads/branch_5x 482b40f84 -> ff83a4001
>
>
>SOLR-8586: add index fingerprinting and use it in peersync
>(cherry picked from commit 629767b)
>
>
>Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
>Commit:
>http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
>Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
>Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
>
>Branch: refs/heads/branch_5x
>Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
>Parents: 482b40f
>Author: yonik <yo...@apache.org>
>Authored: Thu Feb 4 14:54:08 2016 -0500
>Committer: yonik <yo...@apache.org>
>Committed: Fri Feb 5 11:11:34 2016 -0500
>
>----------------------------------------------------------------------
> solr/CHANGES.txt | 4 +
> .../org/apache/solr/cloud/SyncStrategy.java | 2 +-
> .../handler/component/RealTimeGetComponent.java | 8 +
>.../apache/solr/update/IndexFingerprint.java | 232
>+++++++++++++++++++
> .../java/org/apache/solr/update/PeerSync.java | 78 +++++--
> .../java/org/apache/solr/update/UpdateLog.java | 85 +++----
> .../org/apache/solr/update/PeerSyncTest.java | 23 ++
> 7 files changed, 374 insertions(+), 58 deletions(-)
>----------------------------------------------------------------------
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
>----------------------------------------------------------------------
>diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
>index f2067c6..2b7b46e 100644
>--- a/solr/CHANGES.txt
>+++ b/solr/CHANGES.txt
>@@ -119,6 +119,10 @@ New Features
> * SOLR-8415: Provide command to switch between non/secure mode in ZK
> (Mike Drob, Gregory Chanan)
>
>+* SOLR-8586: added index fingerprint, a hash over all versions
>currently in the index.
>+ PeerSync now uses this to check if replicas are in sync. (yonik)
>+
>+
> Bug Fixes
> ----------------------
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>index 7a16598..d811f5c 100644
>--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>@@ -173,7 +173,7 @@ public class SyncStrategy {
>// if we can't reach a replica for sync, we still consider the overall
>sync a success
>// TODO: as an assurance, we should still try and tell the sync nodes
>that we couldn't reach
> // to recover once more?
>- PeerSync peerSync = new PeerSync(core, syncWith,
>core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true,
>true, peerSyncOnlyWithActive);
>+ PeerSync peerSync = new PeerSync(core, syncWith,
>core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true,
>true, peerSyncOnlyWithActive, false);
> return peerSync.sync();
> }
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>index 2bbf5a2..14a4185 100644
>---
>a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>+++
>b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>@@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
> import org.apache.solr.search.SolrIndexSearcher;
> import org.apache.solr.search.SolrReturnFields;
> import org.apache.solr.update.DocumentBuilder;
>+import org.apache.solr.update.IndexFingerprint;
> import org.apache.solr.update.PeerSync;
> import org.apache.solr.update.UpdateLog;
> import org.apache.solr.util.RefCounted;
>@@ -536,6 +537,8 @@ public class RealTimeGetComponent extends
>SearchComponent
> int nVersions = params.getInt("getVersions", -1);
> if (nVersions == -1) return;
>
>+ boolean doFingerprint = params.getBool("fingerprint", false);
>+
> String sync = params.get("sync");
> if (sync != null) {
> processSync(rb, nVersions, sync);
>@@ -548,6 +551,11 @@ public class RealTimeGetComponent extends
>SearchComponent
>try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
> rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
> }
>+
>+ if (doFingerprint) {
>+ IndexFingerprint fingerprint =
>IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
>+ rb.rsp.add("fingerprint", fingerprint.toObject());
>+ }
> }
>
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>new file mode 100644
>index 0000000..c73b57b
>--- /dev/null
>+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>@@ -0,0 +1,232 @@
>+package org.apache.solr.update;
>+
>+/*
>+ * Licensed to the Apache Software Foundation (ASF) under one or more
>+ * contributor license agreements. See the NOTICE file distributed
>with
>+ * this work for additional information regarding copyright ownership.
>+ * The ASF licenses this file to You under the Apache License, Version
>2.0
>+ * (the "License"); you may not use this file except in compliance
>with
>+ * the License. You may obtain a copy of the License at
>+ *
>+ * http://www.apache.org/licenses/LICENSE-2.0
>+ *
>+ * Unless required by applicable law or agreed to in writing, software
>+ * distributed under the License is distributed on an "AS IS" BASIS,
>+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>implied.
>+ * See the License for the specific language governing permissions and
>+ * limitations under the License.
>+ */
>+
>+import java.io.IOException;
>+import java.lang.invoke.MethodHandles;
>+import java.net.ConnectException;
>+import java.net.SocketException;
>+import java.util.ArrayList;
>+import java.util.Collections;
>+import java.util.Comparator;
>+import java.util.HashSet;
>+import java.util.LinkedHashMap;
>+import java.util.List;
>+import java.util.Map;
>+import java.util.Set;
>+
>+import org.apache.http.NoHttpResponseException;
>+import org.apache.http.client.HttpClient;
>+import org.apache.http.conn.ConnectTimeoutException;
>+import org.apache.lucene.index.LeafReader;
>+import org.apache.lucene.index.LeafReaderContext;
>+import org.apache.lucene.queries.function.FunctionValues;
>+import org.apache.lucene.queries.function.ValueSource;
>+import org.apache.lucene.util.Bits;
>+import org.apache.lucene.util.BytesRef;
>+import org.apache.solr.client.solrj.SolrServerException;
>+import org.apache.solr.cloud.ZkController;
>+import org.apache.solr.common.SolrException;
>+import org.apache.solr.common.SolrInputDocument;
>+import org.apache.solr.common.params.ModifiableSolrParams;
>+import org.apache.solr.common.util.Hash;
>+import org.apache.solr.common.util.NamedList;
>+import org.apache.solr.common.util.StrUtils;
>+import org.apache.solr.core.SolrCore;
>+import org.apache.solr.handler.component.HttpShardHandlerFactory;
>+import org.apache.solr.handler.component.ShardHandler;
>+import org.apache.solr.handler.component.ShardHandlerFactory;
>+import org.apache.solr.handler.component.ShardRequest;
>+import org.apache.solr.handler.component.ShardResponse;
>+import org.apache.solr.logging.MDCLoggingContext;
>+import org.apache.solr.request.LocalSolrQueryRequest;
>+import org.apache.solr.request.SolrQueryRequest;
>+import org.apache.solr.response.SolrQueryResponse;
>+import org.apache.solr.schema.SchemaField;
>+import org.apache.solr.search.SolrIndexSearcher;
>+import org.apache.solr.update.processor.UpdateRequestProcessor;
>+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
>+import org.apache.solr.util.RefCounted;
>+import org.slf4j.Logger;
>+import org.slf4j.LoggerFactory;
>+
>+import static
>org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
>+import static
>org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
>+
>+/** @lucene.internal */
>+public class IndexFingerprint {
>+ private static final Logger log =
>LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>+
>+ private long maxVersionSpecified;
>+ private long maxVersionEncountered;
>+ private long maxInHash;
>+ private long versionsHash;
>+ private long numVersions;
>+ private long numDocs;
>+ private long maxDoc;
>+
>+ public long getMaxVersionSpecified() {
>+ return maxVersionSpecified;
>+ }
>+
>+ public long getMaxVersionEncountered() {
>+ return maxVersionEncountered;
>+ }
>+
>+ public long getMaxInHash() {
>+ return maxInHash;
>+ }
>+
>+ public long getVersionsHash() {
>+ return versionsHash;
>+ }
>+
>+ public long getNumVersions() {
>+ return numVersions;
>+ }
>+
>+ public long getNumDocs() {
>+ return numDocs;
>+ }
>+
>+ public long getMaxDoc() {
>+ return maxDoc;
>+ }
>+
>+ /** Opens a new realtime searcher and returns it's fingerprint */
>+ public static IndexFingerprint getFingerprint(SolrCore core, long
>maxVersion) throws IOException {
>+ core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
>+ RefCounted<SolrIndexSearcher> newestSearcher =
>core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
>+ try {
>+ return getFingerprint(newestSearcher.get(), maxVersion);
>+ } finally {
>+ if (newestSearcher != null) {
>+ newestSearcher.decref();
>+ }
>+ }
>+ }
>+
>+ public static IndexFingerprint getFingerprint(SolrIndexSearcher
>searcher, long maxVersion) throws IOException {
>+ long start = System.currentTimeMillis();
>+
>+ SchemaField versionField =
>VersionInfo.getAndCheckVersionField(searcher.getSchema());
>+
>+ IndexFingerprint f = new IndexFingerprint();
>+ f.maxVersionSpecified = maxVersion;
>+ f.maxDoc = searcher.maxDoc();
>+
>+ // TODO: this could be parallelized, or even cached per-segment if
>performance becomes an issue
>+ ValueSource vs =
>versionField.getType().getValueSource(versionField, null);
>+ Map funcContext = ValueSource.newContext(searcher);
>+ vs.createWeight(funcContext, searcher);
>+ for (LeafReaderContext ctx :
>searcher.getTopReaderContext().leaves()) {
>+ int maxDoc = ctx.reader().maxDoc();
>+ f.numDocs += ctx.reader().numDocs();
>+ Bits liveDocs = ctx.reader().getLiveDocs();
>+ FunctionValues fv = vs.getValues(funcContext, ctx);
>+ for (int doc = 0; doc < maxDoc; doc++) {
>+ if (liveDocs != null && !liveDocs.get(doc)) continue;
>+ long v = fv.longVal(doc);
>+ f.maxVersionEncountered = Math.max(v,
>f.maxVersionEncountered);
>+ if (v <= f.maxVersionSpecified) {
>+ f.maxInHash = Math.max(v, f.maxInHash);
>+ f.versionsHash += Hash.fmix64(v);
>+ f.numVersions++;
>+ }
>+ }
>+ }
>+
>+ long end = System.currentTimeMillis();
>+ log.info("IndexFingerprint millis:" + (end-start) + " result:" +
>f);
>+
>+ return f;
>+ }
>+
>+ /** returns 0 for equal, negative if f1 is less recent than f2,
>positive if more recent */
>+ public static int compare(IndexFingerprint f1, IndexFingerprint f2)
>{
>+ int cmp;
>+
>+ // NOTE: some way want number of docs in index to take precedence
>over highest version (add-only systems for sure)
>+
>+ // if we're comparing all of the versions in the index, then go by
>the highest encountered.
>+ if (f1.maxVersionSpecified == Long.MAX_VALUE) {
>+ cmp = Long.compare(f1.maxVersionEncountered,
>f2.maxVersionEncountered);
>+ if (cmp != 0) return cmp;
>+ }
>+
>+ // Go by the highest version under the requested max.
>+ cmp = Long.compare(f1.maxInHash, f2.maxInHash);
>+ if (cmp != 0) return cmp;
>+
>+ // go by who has the most documents in the index
>+ cmp = Long.compare(f1.numVersions, f2.numVersions);
>+ if (cmp != 0) return cmp;
>+
>+ // both have same number of documents, so go by hash
>+ cmp = Long.compare(f1.versionsHash, f2.versionsHash);
>+ return cmp;
>+ }
>+
>+ /**
>+ * Create a generic object suitable for serializing with
>ResponseWriters
>+ */
>+ public Object toObject() {
>+ Map<String,Object> map = new LinkedHashMap<>();
>+ map.put("maxVersionSpecified", maxVersionSpecified);
>+ map.put("maxVersionEncountered", maxVersionEncountered);
>+ map.put("maxInHash", maxInHash);
>+ map.put("versionsHash", versionsHash);
>+ map.put("numVersions", numVersions);
>+ map.put("numDocs", numDocs);
>+ map.put("maxDoc", maxDoc);
>+ return map;
>+ }
>+
>+ private static long getLong(Object o, String key, long def) {
>+ long v = def;
>+
>+ Object oval = null;
>+ if (o instanceof Map) {
>+ oval = ((Map)o).get(key);
>+ } else if (o instanceof NamedList) {
>+ oval = ((NamedList)o).get(key);
>+ }
>+
>+ return oval != null ? ((Number)oval).longValue() : def;
>+ }
>+
>+ /**
>+ * Create an IndexFingerprint object from a deserialized generic
>object (Map or NamedList)
>+ */
>+ public static IndexFingerprint fromObject(Object o) {
>+ IndexFingerprint f = new IndexFingerprint();
>+ f.maxVersionSpecified = getLong(o, "maxVersionSpecified",
>Long.MAX_VALUE);
>+ f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
>+ f.maxInHash = getLong(o, "maxInHash", -1);
>+ f.versionsHash = getLong(o, "versionsHash", -1);
>+ f.numVersions = getLong(o, "numVersions", -1);
>+ f.numDocs = getLong(o, "numDocs", -1);
>+ f.maxDoc = getLong(o, "maxDoc", -1);
>+ return f;
>+ }
>+
>+ @Override
>+ public String toString() {
>+ return toObject().toString();
>+ }
>+}
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>index c5ddaf0..dbc0091 100644
>--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>@@ -68,6 +68,7 @@ public class PeerSync {
> private UpdateLog ulog;
> private HttpShardHandlerFactory shardHandlerFactory;
> private ShardHandler shardHandler;
>+ private List<SyncShardRequest> requests = new ArrayList<>();
>
> private List<Long> startingVersions;
>
>@@ -76,8 +77,10 @@ public class PeerSync {
> private Set<Long> requestedUpdateSet;
> private long ourLowThreshold; // 20th percentile
> private long ourHighThreshold; // 80th percentile
>+ private long ourHighest; // currently just used for
>logging/debugging purposes
> private final boolean cantReachIsSuccess;
> private final boolean getNoVersionsIsSuccess;
>+ private final boolean doFingerprint;
> private final HttpClient client;
> private final boolean onlyIfActive;
> private SolrCore core;
>@@ -116,6 +119,8 @@ public class PeerSync {
>
> private static class SyncShardRequest extends ShardRequest {
> List<Long> reportedVersions;
>+ IndexFingerprint fingerprint;
>+ boolean doFingerprintComparison;
> List<Long> requestedUpdates;
> Exception updateException;
> }
>@@ -125,16 +130,17 @@ public class PeerSync {
> }
>
>public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
>- this(core, replicas, nUpdates, cantReachIsSuccess,
>getNoVersionsIsSuccess, false);
>+ this(core, replicas, nUpdates, cantReachIsSuccess,
>getNoVersionsIsSuccess, false, true);
> }
>
>- public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>onlyIfActive) {
>+ public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>onlyIfActive, boolean doFingerprint) {
> this.core = core;
> this.replicas = replicas;
> this.nUpdates = nUpdates;
> this.maxUpdates = nUpdates;
> this.cantReachIsSuccess = cantReachIsSuccess;
> this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
>+ this.doFingerprint = doFingerprint;
>this.client =
>core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
> this.onlyIfActive = onlyIfActive;
>
>@@ -169,9 +175,8 @@ public class PeerSync {
> return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
> }
>
>- /** Returns true if peer sync was successful, meaning that this core
>may not be considered to have the latest updates
>- * when considering the last N updates between it and its peers.
>- * A commit is not performed.
>+ /** Returns true if peer sync was successful, meaning that this core
>may be considered to have the latest updates.
>+ * It does not mean that the remote replica is in sync with us.
> */
> public boolean sync() {
> if (ulog == null) {
>@@ -210,7 +215,7 @@ public class PeerSync {
>
> ourLowThreshold = percentile(startingVersions, 0.8f);
> ourHighThreshold = percentile(startingVersions, 0.2f);
>-
>+
> // now make sure that the starting updates overlap our updates
> // there shouldn't be reorders, so any overlap will do.
>
>@@ -231,6 +236,7 @@ public class PeerSync {
> }
>
> ourUpdates = newList;
>+ Collections.sort(ourUpdates, absComparator);
> } else {
>
> if (ourUpdates.size() > 0) {
>@@ -243,9 +249,10 @@ public class PeerSync {
> return false;
> }
> }
>-
>+
>+ ourHighest = ourUpdates.get(0);
> ourUpdateSet = new HashSet<>(ourUpdates);
>- requestedUpdateSet = new HashSet<>(ourUpdates);
>+ requestedUpdateSet = new HashSet<>();
>
> for (;;) {
> ShardResponse srsp = shardHandler.takeCompletedOrError();
>@@ -257,9 +264,18 @@ public class PeerSync {
> return false;
> }
> }
>-
>- log.info(msg() + "DONE. sync succeeded");
>- return true;
>+
>+ // finish up any comparisons with other shards that we deferred
>+ boolean success = true;
>+ for (SyncShardRequest sreq : requests) {
>+ if (sreq.doFingerprintComparison) {
>+ success = compareFingerprint(sreq);
>+ if (!success) break;
>+ }
>+ }
>+
>+ log.info(msg() + "DONE. sync " + (success ? "succeeded" :
>"failed"));
>+ return success;
> } finally {
> MDCLoggingContext.clear();
> }
>@@ -267,6 +283,7 @@ public class PeerSync {
>
> private void requestVersions(String replica) {
> SyncShardRequest sreq = new SyncShardRequest();
>+ requests.add(sreq);
> sreq.purpose = 1;
> sreq.shards = new String[]{replica};
> sreq.actualShards = sreq.shards;
>@@ -274,6 +291,7 @@ public class PeerSync {
> sreq.params.set("qt","/get");
> sreq.params.set("distrib",false);
> sreq.params.set("getVersions",nUpdates);
>+ sreq.params.set("fingerprint",doFingerprint);
> shardHandler.submit(sreq, replica, sreq.params);
> }
>
>@@ -355,7 +373,12 @@ public class PeerSync {
> SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
> sreq.reportedVersions = otherVersions;
>
>- log.info(msg() + " Received " + otherVersions.size() + " versions
>from " + sreq.shards[0] );
>+ Object fingerprint =
>srsp.getSolrResponse().getResponse().get("fingerprint");
>+
>+ log.info(msg() + " Received " + otherVersions.size() + " versions
>from " + sreq.shards[0] + " fingerprint:" + fingerprint );
>+ if (fingerprint != null) {
>+ sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
>+ }
>
> if (otherVersions.size() == 0) {
> return getNoVersionsIsSuccess;
>@@ -371,13 +394,14 @@ public class PeerSync {
>
> long otherHigh = percentile(otherVersions, .2f);
> long otherLow = percentile(otherVersions, .8f);
>+ long otherHighest = otherVersions.get(0);
>
> if (ourHighThreshold < otherLow) {
> // Small overlap between version windows and ours is older
>// This means that we might miss updates if we attempted to use this
>method.
> // Since there exists just one replica that is so much newer, we must
> // fail the sync.
>- log.info(msg() + " Our versions are too old.
>ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
>+ log.info(msg() + " Our versions are too old.
>ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow +
>" ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
> return false;
> }
>
>@@ -385,7 +409,10 @@ public class PeerSync {
> // Small overlap between windows and ours is newer.
>// Using this list to sync would result in requesting/replaying results
>we don't need
> // and possibly bringing deleted docs back to life.
>- log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>+ log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ "
>ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>+
>+ // Because our versions are newer, IndexFingerprint with the
>remote would not match us.
>+ // We return true on our side, but the remote peersync with us
>should fail.
> return true;
> }
>
>@@ -408,9 +435,15 @@ public class PeerSync {
> sreq.requestedUpdates = toRequest;
>
> if (toRequest.isEmpty()) {
>- log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>+ log.info(msg() + " No additional versions requested.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ "
>ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>
>// we had (or already requested) all the updates referenced by the
>replica
>+
>+ // If we requested updates from another replica, we can't
>compare fingerprints yet with this replica, we need to defer
>+ if (doFingerprint) {
>+ sreq.doFingerprintComparison = true;
>+ }
>+
> return true;
> }
>
>@@ -422,6 +455,19 @@ public class PeerSync {
> return requestUpdates(srsp, toRequest);
> }
>
>+ private boolean compareFingerprint(SyncShardRequest sreq) {
>+ if (sreq.fingerprint == null) return true;
>+ try {
>+ IndexFingerprint ourFingerprint =
>IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
>+ int cmp = IndexFingerprint.compare(ourFingerprint,
>sreq.fingerprint);
>+ log.info("Fingerprint comparison: " + cmp);
>+ return cmp == 0; // currently, we only check for equality...
>+ } catch(IOException e){
>+ log.error(msg() + "Error getting index fingerprint", e);
>+ return false;
>+ }
>+ }
>+
>private boolean requestUpdates(ShardResponse srsp, List<Long>
>toRequest) {
> String replica = srsp.getShardRequest().shards[0];
>
>@@ -556,7 +602,7 @@ public class PeerSync {
> }
> }
>
>- return true;
>+ return compareFingerprint(sreq);
> }
>
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>index 7d7d3ef..7e3fc9f 100644
>--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>@@ -283,7 +283,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
> if (debug) {
>log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing
>tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
> }
>-
>+
> TransactionLog oldLog = null;
> for (String oldLogName : tlogFiles) {
> File f = new File(tlogDir, oldLogName);
>@@ -334,11 +334,11 @@ public class UpdateLog implements
>PluginInfoInitialized {
> }
>
> }
>-
>+
> public String getLogDir() {
> return tlogDir.getAbsolutePath();
> }
>-
>+
> public List<Long> getStartingVersions() {
> return startingVersions;
> }
>@@ -503,32 +503,35 @@ public class UpdateLog implements
>PluginInfoInitialized {
> if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
>// given that we just did a delete-by-query, we don't know what
>documents were
> // affected and hence we must purge our caches.
>- if (map != null) map.clear();
>- if (prevMap != null) prevMap.clear();
>- if (prevMap2 != null) prevMap2.clear();
>-
>+ openRealtimeSearcher();
> trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
>
>- // oldDeletes.clear();
>-
>- // We must cause a new IndexReader to be opened before
>anything looks at these caches again
>- // so that a cache miss will read fresh data.
>- //
>- // TODO: FUTURE: open a new searcher lazily for better
>throughput with delete-by-query commands
>- try {
>- RefCounted<SolrIndexSearcher> holder =
>uhandler.core.openNewSearcher(true, true);
>- holder.decref();
>- } catch (Exception e) {
>- SolrException.log(log, "Error opening realtime searcher for
>deleteByQuery", e);
>+ if (trace) {
>+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>+ log.trace("TLOG: added deleteByQuery " + cmd.query + " to "
>+ tlog + " " + ptr + " map=" + System.identityHashCode(map));
> }
>-
> }
>+ }
>+ }
>
>- LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>-
>- if (trace) {
>- log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>tlog + " " + ptr + " map=" + System.identityHashCode(map));
>+ /** Opens a new realtime searcher and clears the id caches.
>+ * This may also be called when we updates are being buffered (from
>PeerSync/IndexFingerprint)
>+ */
>+ public void openRealtimeSearcher() {
>+ synchronized (this) {
>+ // We must cause a new IndexReader to be opened before anything
>looks at these caches again
>+ // so that a cache miss will read fresh data.
>+ try {
>+ RefCounted<SolrIndexSearcher> holder =
>uhandler.core.openNewSearcher(true, true);
>+ holder.decref();
>+ } catch (Exception e) {
>+ SolrException.log(log, "Error opening realtime searcher", e);
>+ return;
> }
>+
>+ if (map != null) map.clear();
>+ if (prevMap != null) prevMap.clear();
>+ if (prevMap2 != null) prevMap2.clear();
> }
> }
>
>@@ -620,7 +623,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
> public boolean hasUncommittedChanges() {
> return tlog != null;
> }
>-
>+
> public void preCommit(CommitUpdateCommand cmd) {
> synchronized (this) {
> if (debug) {
>@@ -878,11 +881,11 @@ public class UpdateLog implements
>PluginInfoInitialized {
> theLog.forceClose();
> }
> }
>-
>+
> public void close(boolean committed) {
> close(committed, false);
> }
>-
>+
> public void close(boolean committed, boolean deleteOnClose) {
> synchronized (this) {
> recoveryExecutor.shutdown(); // no new tasks
>@@ -923,7 +926,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
> this.id = id;
> }
> }
>-
>+
> public class RecentUpdates implements Closeable {
>
> final Deque<TransactionLog> logList; // newest first
>@@ -950,17 +953,17 @@ public class UpdateLog implements
>PluginInfoInitialized {
>
> public List<Long> getVersions(int n) {
> List<Long> ret = new ArrayList<>(n);
>-
>+
> for (List<Update> singleList : updateList) {
> for (Update ptr : singleList) {
> ret.add(ptr.version);
> if (--n <= 0) return ret;
> }
> }
>-
>+
> return ret;
> }
>-
>+
> public Object lookup(long version) {
> Update update = updates.get(version);
> if (update == null) return null;
>@@ -1004,7 +1007,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
> try {
> o = reader.next();
> if (o==null) break;
>-
>+
> // should currently be a List<Oper,Ver,Doc/Id>
> List entry = (List)o;
>
>@@ -1027,13 +1030,13 @@ public class UpdateLog implements
>PluginInfoInitialized {
>
> updatesForLog.add(update);
> updates.put(version, update);
>-
>+
> if (oper == UpdateLog.DELETE_BY_QUERY) {
> deleteByQueryList.add(update);
> } else if (oper == UpdateLog.DELETE) {
> deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
> }
>-
>+
> break;
>
> case UpdateLog.COMMIT:
>@@ -1063,7 +1066,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
> }
>
> }
>-
>+
> @Override
> public void close() {
> for (TransactionLog log : logList) {
>@@ -1331,10 +1334,10 @@ public class UpdateLog implements
>PluginInfoInitialized {
>"log replay status {} active={} starting pos={} current pos={} current
>size={} % read={}",
> translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
> Math.floor(cpos / (double) csize * 100.));
>-
>+
> }
> }
>-
>+
> o = null;
> o = tlogReader.next();
> if (o == null && activeLog) {
>@@ -1513,25 +1516,25 @@ public class UpdateLog implements
>PluginInfoInitialized {
> }
> }
> }
>-
>+
> protected String getTlogDir(SolrCore core, PluginInfo info) {
> String dataDir = (String) info.initArgs.get("dir");
>-
>+
> String ulogDir = core.getCoreDescriptor().getUlogDir();
> if (ulogDir != null) {
> dataDir = ulogDir;
> }
>-
>+
> if (dataDir == null || dataDir.length() == 0) {
> dataDir = core.getDataDir();
> }
>
> return dataDir + "/" + TLOG_NAME;
> }
>-
>+
> /**
> * Clears the logs on the file system. Only call before init.
>- *
>+ *
> * @param core the SolrCore
> * @param ulogPluginInfo the init info for the UpdateHandler
> */
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>index 8083ad0..bcaf846 100644
>--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>@@ -168,6 +168,29 @@ public class PeerSyncTest extends
>BaseDistributedSearchTestCase {
>
> assertSync(client1, numVersions, true, shardsArr[0]);
>client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*",
>"sort","_version_ desc"), client0, client1);
>+
>+ // now lets check fingerprinting causes appropriate fails
>+ v = 4000;
>+ add(client0, seenLeader,
>sdoc("id",Integer.toString((int)v),"_version_",v));
>+ toAdd = numVersions+10;
>+ for (int i=0; i<toAdd; i++) {
>+ add(client0, seenLeader,
>sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>+ add(client1, seenLeader,
>sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>+ }
>+
>+ // client0 now has an additional add beyond our window and the
>fingerprint should cause this to fail
>+ assertSync(client1, numVersions, false, shardsArr[0]);
>+
>+ // lets add the missing document and verify that order doesn't
>matter
>+ add(client1, seenLeader,
>sdoc("id",Integer.toString((int)v),"_version_",v));
>+ assertSync(client1, numVersions, true, shardsArr[0]);
>+
>+ // lets do some overwrites to ensure that repeated updates and
>maxDoc don't matter
>+ for (int i=0; i<10; i++) {
>+ add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i
>+ 1), "_version_", v + i + 1));
>+ }
>+ assertSync(client1, numVersions, true, shardsArr[0]);
>+
> }
>
>
--
Uwe Schindler
H.-H.-Meier-Allee 63, 28213 Bremen
http://www.thetaphi.de