You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2016/02/05 17:14:34 UTC

lucene-solr git commit: SOLR-8586: add index fingerprinting and use it in peersync (cherry picked from commit 629767b)

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_5x 482b40f84 -> ff83a4001


SOLR-8586: add index fingerprinting and use it in peersync
(cherry picked from commit 629767b)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400

Branch: refs/heads/branch_5x
Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
Parents: 482b40f
Author: yonik <yo...@apache.org>
Authored: Thu Feb 4 14:54:08 2016 -0500
Committer: yonik <yo...@apache.org>
Committed: Fri Feb 5 11:11:34 2016 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +
 .../org/apache/solr/cloud/SyncStrategy.java     |   2 +-
 .../handler/component/RealTimeGetComponent.java |   8 +
 .../apache/solr/update/IndexFingerprint.java    | 232 +++++++++++++++++++
 .../java/org/apache/solr/update/PeerSync.java   |  78 +++++--
 .../java/org/apache/solr/update/UpdateLog.java  |  85 +++----
 .../org/apache/solr/update/PeerSyncTest.java    |  23 ++
 7 files changed, 374 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f2067c6..2b7b46e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -119,6 +119,10 @@ New Features
 * SOLR-8415: Provide command to switch between non/secure mode in ZK
   (Mike Drob, Gregory Chanan)
 
+* SOLR-8586: added index fingerprint, a hash over all versions currently in the index.
+  PeerSync now uses this to check if replicas are in sync. (yonik)
+
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index 7a16598..d811f5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -173,7 +173,7 @@ public class SyncStrategy {
     // if we can't reach a replica for sync, we still consider the overall sync a success
     // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
     // to recover once more?
-    PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
+    PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive, false);
     return peerSync.sync();
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 2bbf5a2..14a4185 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.search.SolrReturnFields;
 import org.apache.solr.update.DocumentBuilder;
+import org.apache.solr.update.IndexFingerprint;
 import org.apache.solr.update.PeerSync;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.RefCounted;
@@ -536,6 +537,8 @@ public class RealTimeGetComponent extends SearchComponent
     int nVersions = params.getInt("getVersions", -1);
     if (nVersions == -1) return;
 
+    boolean doFingerprint = params.getBool("fingerprint", false);
+
     String sync = params.get("sync");
     if (sync != null) {
       processSync(rb, nVersions, sync);
@@ -548,6 +551,11 @@ public class RealTimeGetComponent extends SearchComponent
     try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
       rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
     }
+
+    if (doFingerprint) {
+      IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
+      rb.rsp.add("fingerprint", fingerprint.toObject());
+    }
   }
 
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
new file mode 100644
index 0000000..c73b57b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
@@ -0,0 +1,232 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/** @lucene.internal */
+public class IndexFingerprint {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private long maxVersionSpecified;
+  private long maxVersionEncountered;
+  private long maxInHash;
+  private long versionsHash;
+  private long numVersions;
+  private long numDocs;
+  private long maxDoc;
+
+  public long getMaxVersionSpecified() {
+    return maxVersionSpecified;
+  }
+
+  public long getMaxVersionEncountered() {
+    return maxVersionEncountered;
+  }
+
+  public long getMaxInHash() {
+    return maxInHash;
+  }
+
+  public long getVersionsHash() {
+    return versionsHash;
+  }
+
+  public long getNumVersions() {
+    return numVersions;
+  }
+
+  public long getNumDocs() {
+    return numDocs;
+  }
+
+  public long getMaxDoc() {
+    return maxDoc;
+  }
+
+  /** Opens a new realtime searcher and returns it's fingerprint */
+  public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
+    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
+    RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
+    try {
+      return getFingerprint(newestSearcher.get(), maxVersion);
+    } finally {
+      if (newestSearcher != null) {
+        newestSearcher.decref();
+      }
+    }
+  }
+
+  public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
+    long start = System.currentTimeMillis();
+
+    SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
+
+    IndexFingerprint f = new IndexFingerprint();
+    f.maxVersionSpecified = maxVersion;
+    f.maxDoc = searcher.maxDoc();
+
+    // TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
+    ValueSource vs = versionField.getType().getValueSource(versionField, null);
+    Map funcContext = ValueSource.newContext(searcher);
+    vs.createWeight(funcContext, searcher);
+    for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
+      int maxDoc = ctx.reader().maxDoc();
+      f.numDocs += ctx.reader().numDocs();
+      Bits liveDocs = ctx.reader().getLiveDocs();
+      FunctionValues fv = vs.getValues(funcContext, ctx);
+      for (int doc = 0; doc < maxDoc; doc++) {
+        if (liveDocs != null && !liveDocs.get(doc)) continue;
+        long v = fv.longVal(doc);
+        f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
+        if (v <= f.maxVersionSpecified) {
+          f.maxInHash = Math.max(v, f.maxInHash);
+          f.versionsHash += Hash.fmix64(v);
+          f.numVersions++;
+        }
+      }
+    }
+
+    long end = System.currentTimeMillis();
+    log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
+
+    return f;
+  }
+
+  /** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
+  public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
+    int cmp;
+
+    // NOTE: some way want number of docs in index to take precedence over highest version (add-only systems for sure)
+
+    // if we're comparing all of the versions in the index, then go by the highest encountered.
+    if (f1.maxVersionSpecified == Long.MAX_VALUE) {
+      cmp = Long.compare(f1.maxVersionEncountered, f2.maxVersionEncountered);
+      if (cmp != 0) return cmp;
+    }
+
+    // Go by the highest version under the requested max.
+    cmp = Long.compare(f1.maxInHash, f2.maxInHash);
+    if (cmp != 0) return cmp;
+
+    // go by who has the most documents in the index
+    cmp = Long.compare(f1.numVersions, f2.numVersions);
+    if (cmp != 0) return cmp;
+
+    // both have same number of documents, so go by hash
+    cmp = Long.compare(f1.versionsHash, f2.versionsHash);
+    return cmp;
+  }
+
+  /**
+   * Create a generic object suitable for serializing with ResponseWriters
+   */
+  public Object toObject() {
+    Map<String,Object> map = new LinkedHashMap<>();
+    map.put("maxVersionSpecified", maxVersionSpecified);
+    map.put("maxVersionEncountered", maxVersionEncountered);
+    map.put("maxInHash", maxInHash);
+    map.put("versionsHash", versionsHash);
+    map.put("numVersions", numVersions);
+    map.put("numDocs", numDocs);
+    map.put("maxDoc", maxDoc);
+    return map;
+  }
+
+  private static long getLong(Object o, String key, long def) {
+    long v = def;
+
+    Object oval = null;
+    if (o instanceof Map) {
+      oval = ((Map)o).get(key);
+    } else if (o instanceof NamedList) {
+      oval = ((NamedList)o).get(key);
+    }
+
+    return oval != null ? ((Number)oval).longValue() : def;
+  }
+
+  /**
+   * Create an IndexFingerprint object from a deserialized generic object (Map or NamedList)
+   */
+  public static IndexFingerprint fromObject(Object o) {
+    IndexFingerprint f = new IndexFingerprint();
+    f.maxVersionSpecified = getLong(o, "maxVersionSpecified", Long.MAX_VALUE);
+    f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
+    f.maxInHash = getLong(o, "maxInHash", -1);
+    f.versionsHash = getLong(o, "versionsHash", -1);
+    f.numVersions = getLong(o, "numVersions", -1);
+    f.numDocs = getLong(o, "numDocs", -1);
+    f.maxDoc = getLong(o, "maxDoc", -1);
+    return f;
+  }
+
+  @Override
+  public String toString() {
+    return toObject().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index c5ddaf0..dbc0091 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -68,6 +68,7 @@ public class PeerSync  {
   private UpdateLog ulog;
   private HttpShardHandlerFactory shardHandlerFactory;
   private ShardHandler shardHandler;
+  private List<SyncShardRequest> requests = new ArrayList<>();
 
   private List<Long> startingVersions;
 
@@ -76,8 +77,10 @@ public class PeerSync  {
   private Set<Long> requestedUpdateSet;
   private long ourLowThreshold;  // 20th percentile
   private long ourHighThreshold; // 80th percentile
+  private long ourHighest;  // currently just used for logging/debugging purposes
   private final boolean cantReachIsSuccess;
   private final boolean getNoVersionsIsSuccess;
+  private final boolean doFingerprint;
   private final HttpClient client;
   private final boolean onlyIfActive;
   private SolrCore core;
@@ -116,6 +119,8 @@ public class PeerSync  {
 
   private static class SyncShardRequest extends ShardRequest {
     List<Long> reportedVersions;
+    IndexFingerprint fingerprint;
+    boolean doFingerprintComparison;
     List<Long> requestedUpdates;
     Exception updateException;
   }
@@ -125,16 +130,17 @@ public class PeerSync  {
   }
   
   public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
-    this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false);
+    this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false, true);
   }
   
-  public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
+  public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive, boolean doFingerprint) {
     this.core = core;
     this.replicas = replicas;
     this.nUpdates = nUpdates;
     this.maxUpdates = nUpdates;
     this.cantReachIsSuccess = cantReachIsSuccess;
     this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
+    this.doFingerprint = doFingerprint;
     this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
     this.onlyIfActive = onlyIfActive;
     
@@ -169,9 +175,8 @@ public class PeerSync  {
     return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
   }
 
-  /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
-   *  when considering the last N updates between it and its peers.
-   *  A commit is not performed.
+  /** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
+   * It does not mean that the remote replica is in sync with us.
    */
   public boolean sync() {
     if (ulog == null) {
@@ -210,7 +215,7 @@ public class PeerSync  {
         
         ourLowThreshold = percentile(startingVersions, 0.8f);
         ourHighThreshold = percentile(startingVersions, 0.2f);
-        
+
         // now make sure that the starting updates overlap our updates
         // there shouldn't be reorders, so any overlap will do.
         
@@ -231,6 +236,7 @@ public class PeerSync  {
         }
         
         ourUpdates = newList;
+        Collections.sort(ourUpdates, absComparator);
       } else {
         
         if (ourUpdates.size() > 0) {
@@ -243,9 +249,10 @@ public class PeerSync  {
           return false;
         }
       }
-      
+
+      ourHighest = ourUpdates.get(0);
       ourUpdateSet = new HashSet<>(ourUpdates);
-      requestedUpdateSet = new HashSet<>(ourUpdates);
+      requestedUpdateSet = new HashSet<>();
       
       for (;;) {
         ShardResponse srsp = shardHandler.takeCompletedOrError();
@@ -257,9 +264,18 @@ public class PeerSync  {
           return false;
         }
       }
-      
-      log.info(msg() + "DONE. sync succeeded");
-      return true;
+
+      // finish up any comparisons with other shards that we deferred
+      boolean success = true;
+      for (SyncShardRequest sreq : requests) {
+        if (sreq.doFingerprintComparison) {
+          success = compareFingerprint(sreq);
+          if (!success) break;
+        }
+      }
+
+      log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
+      return success;
     } finally {
       MDCLoggingContext.clear();
     }
@@ -267,6 +283,7 @@ public class PeerSync  {
   
   private void requestVersions(String replica) {
     SyncShardRequest sreq = new SyncShardRequest();
+    requests.add(sreq);
     sreq.purpose = 1;
     sreq.shards = new String[]{replica};
     sreq.actualShards = sreq.shards;
@@ -274,6 +291,7 @@ public class PeerSync  {
     sreq.params.set("qt","/get");
     sreq.params.set("distrib",false);
     sreq.params.set("getVersions",nUpdates);
+    sreq.params.set("fingerprint",doFingerprint);
     shardHandler.submit(sreq, replica, sreq.params);
   }
 
@@ -355,7 +373,12 @@ public class PeerSync  {
     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
     sreq.reportedVersions =  otherVersions;
 
-    log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
+    Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
+
+    log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] + " fingerprint:" + fingerprint );
+    if (fingerprint != null) {
+      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
+    }
 
     if (otherVersions.size() == 0) {
       return getNoVersionsIsSuccess; 
@@ -371,13 +394,14 @@ public class PeerSync  {
     
     long otherHigh = percentile(otherVersions, .2f);
     long otherLow = percentile(otherVersions, .8f);
+    long otherHighest = otherVersions.get(0);
 
     if (ourHighThreshold < otherLow) {
       // Small overlap between version windows and ours is older
       // This means that we might miss updates if we attempted to use this method.
       // Since there exists just one replica that is so much newer, we must
       // fail the sync.
-      log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
+      log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
       return false;
     }
 
@@ -385,7 +409,10 @@ public class PeerSync  {
       // Small overlap between windows and ours is newer.
       // Using this list to sync would result in requesting/replaying results we don't need
       // and possibly bringing deleted docs back to life.
-      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
+
+      // Because our versions are newer, IndexFingerprint with the remote would not match us.
+      // We return true on our side, but the remote peersync with us should fail.
       return true;
     }
     
@@ -408,9 +435,15 @@ public class PeerSync  {
     sreq.requestedUpdates = toRequest;
     
     if (toRequest.isEmpty()) {
-      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+      log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
 
       // we had (or already requested) all the updates referenced by the replica
+
+      // If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to defer
+      if (doFingerprint) {
+        sreq.doFingerprintComparison = true;
+      }
+
       return true;
     }
     
@@ -422,6 +455,19 @@ public class PeerSync  {
     return requestUpdates(srsp, toRequest);
   }
 
+  private boolean compareFingerprint(SyncShardRequest sreq) {
+    if (sreq.fingerprint == null) return true;
+    try {
+      IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
+      int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
+      log.info("Fingerprint comparison: " + cmp);
+      return cmp == 0;  // currently, we only check for equality...
+    } catch(IOException e){
+      log.error(msg() + "Error getting index fingerprint", e);
+      return false;
+    }
+  }
+
   private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
     String replica = srsp.getShardRequest().shards[0];
 
@@ -556,7 +602,7 @@ public class PeerSync  {
       }
     }
 
-    return true;
+    return compareFingerprint(sreq);
   }
 
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 7d7d3ef..7e3fc9f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -283,7 +283,7 @@ public class UpdateLog implements PluginInfoInitialized {
     if (debug) {
       log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
     }
-    
+
     TransactionLog oldLog = null;
     for (String oldLogName : tlogFiles) {
       File f = new File(tlogDir, oldLogName);
@@ -334,11 +334,11 @@ public class UpdateLog implements PluginInfoInitialized {
     }
 
   }
-  
+
   public String getLogDir() {
     return tlogDir.getAbsolutePath();
   }
-  
+
   public List<Long> getStartingVersions() {
     return startingVersions;
   }
@@ -503,32 +503,35 @@ public class UpdateLog implements PluginInfoInitialized {
       if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
         // given that we just did a delete-by-query, we don't know what documents were
         // affected and hence we must purge our caches.
-        if (map != null) map.clear();
-        if (prevMap != null) prevMap.clear();
-        if (prevMap2 != null) prevMap2.clear();
-
+        openRealtimeSearcher();
         trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
 
-        // oldDeletes.clear();
-
-        // We must cause a new IndexReader to be opened before anything looks at these caches again
-        // so that a cache miss will read fresh data.
-        //
-        // TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands
-        try {
-          RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
-          holder.decref();
-        } catch (Exception e) {
-          SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+        if (trace) {
+          LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+          log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
         }
-
       }
+    }
+  }
 
-      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
-
-      if (trace) {
-        log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+  /** Opens a new realtime searcher and clears the id caches.
+   * This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint)
+   */
+  public void openRealtimeSearcher() {
+    synchronized (this) {
+      // We must cause a new IndexReader to be opened before anything looks at these caches again
+      // so that a cache miss will read fresh data.
+      try {
+        RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+        holder.decref();
+      } catch (Exception e) {
+        SolrException.log(log, "Error opening realtime searcher", e);
+        return;
       }
+
+      if (map != null) map.clear();
+      if (prevMap != null) prevMap.clear();
+      if (prevMap2 != null) prevMap2.clear();
     }
   }
 
@@ -620,7 +623,7 @@ public class UpdateLog implements PluginInfoInitialized {
   public boolean hasUncommittedChanges() {
     return tlog != null;
   }
-  
+
   public void preCommit(CommitUpdateCommand cmd) {
     synchronized (this) {
       if (debug) {
@@ -878,11 +881,11 @@ public class UpdateLog implements PluginInfoInitialized {
       theLog.forceClose();
     }
   }
-  
+
   public void close(boolean committed) {
     close(committed, false);
   }
-  
+
   public void close(boolean committed, boolean deleteOnClose) {
     synchronized (this) {
       recoveryExecutor.shutdown(); // no new tasks
@@ -923,7 +926,7 @@ public class UpdateLog implements PluginInfoInitialized {
       this.id = id;
     }
   }
-  
+
   public class RecentUpdates implements Closeable {
 
     final Deque<TransactionLog> logList;    // newest first
@@ -950,17 +953,17 @@ public class UpdateLog implements PluginInfoInitialized {
 
     public List<Long> getVersions(int n) {
       List<Long> ret = new ArrayList<>(n);
-      
+
       for (List<Update> singleList : updateList) {
         for (Update ptr : singleList) {
           ret.add(ptr.version);
           if (--n <= 0) return ret;
         }
       }
-      
+
       return ret;
     }
-    
+
     public Object lookup(long version) {
       Update update = updates.get(version);
       if (update == null) return null;
@@ -1004,7 +1007,7 @@ public class UpdateLog implements PluginInfoInitialized {
             try {
               o = reader.next();
               if (o==null) break;
-              
+
               // should currently be a List<Oper,Ver,Doc/Id>
               List entry = (List)o;
 
@@ -1027,13 +1030,13 @@ public class UpdateLog implements PluginInfoInitialized {
 
                   updatesForLog.add(update);
                   updates.put(version, update);
-                  
+
                   if (oper == UpdateLog.DELETE_BY_QUERY) {
                     deleteByQueryList.add(update);
                   } else if (oper == UpdateLog.DELETE) {
                     deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
                   }
-                  
+
                   break;
 
                 case UpdateLog.COMMIT:
@@ -1063,7 +1066,7 @@ public class UpdateLog implements PluginInfoInitialized {
       }
 
     }
-    
+
     @Override
     public void close() {
       for (TransactionLog log : logList) {
@@ -1331,10 +1334,10 @@ public class UpdateLog implements PluginInfoInitialized {
                         "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
                         translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
                         Math.floor(cpos / (double) csize * 100.));
-                
+
               }
             }
-            
+
             o = null;
             o = tlogReader.next();
             if (o == null && activeLog) {
@@ -1513,25 +1516,25 @@ public class UpdateLog implements PluginInfoInitialized {
       }
     }
   }
-  
+
   protected String getTlogDir(SolrCore core, PluginInfo info) {
     String dataDir = (String) info.initArgs.get("dir");
-    
+
     String ulogDir = core.getCoreDescriptor().getUlogDir();
     if (ulogDir != null) {
       dataDir = ulogDir;
     }
-    
+
     if (dataDir == null || dataDir.length() == 0) {
       dataDir = core.getDataDir();
     }
 
     return dataDir + "/" + TLOG_NAME;
   }
-  
+
   /**
    * Clears the logs on the file system. Only call before init.
-   * 
+   *
    * @param core the SolrCore
    * @param ulogPluginInfo the init info for the UpdateHandler
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
index 8083ad0..bcaf846 100644
--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
@@ -168,6 +168,29 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
 
     assertSync(client1, numVersions, true, shardsArr[0]);
     client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+
+    // now lets check fingerprinting causes appropriate fails
+    v = 4000;
+    add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+    toAdd = numVersions+10;
+    for (int i=0; i<toAdd; i++) {
+      add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+      add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+    }
+
+    // client0 now has an additional add beyond our window and the fingerprint should cause this to fail
+    assertSync(client1, numVersions, false, shardsArr[0]);
+
+    // lets add the missing document and verify that order doesn't matter
+    add(client1, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+    assertSync(client1, numVersions, true, shardsArr[0]);
+
+    // lets do some overwrites to ensure that repeated updates and maxDoc don't matter
+    for (int i=0; i<10; i++) {
+      add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
+    }
+    assertSync(client1, numVersions, true, shardsArr[0]);
+
   }
 
 


Re: lucene-solr git commit: SOLR-8586: add index fingerprinting and use it in peersync (cherry picked from commit 629767b)

Posted by Yonik Seeley <ys...@gmail.com>.
On Fri, Feb 5, 2016 at 11:23 AM, Uwe Schindler <uw...@thetaphi.de> wrote:
> Hi Yonik,
>
> I committed a change yesterday. Could you backports this, too? It broke the
> forbidden tests' because it used currentTimeMillis instead of RTimer.

Ah, I hadn't noticed (I got busy between the commit to trunk yesterday
and the backport to 5x today).
Will do.

-Yonik

> Uwe
>
> Am 5. Februar 2016 17:14:34 MEZ, schrieb yonik@apache.org:
>>
>> Repository: lucene-solr
>> Updated Branches:
>>   refs/heads/branch_5x 482b40f84 -> ff83a4001
>>
>>
>> SOLR-8586: add index fingerprinting and use it in peersync
>> (cherry picked from commit 629767b)
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
>> Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
>> Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
>>
>> Branch: refs/heads/branch_5x
>> Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
>> Parents: 482b40f
>> Author: yonik
>> <yo...@apache.org>
>> Authored: Thu Feb 4 14:54:08 2016 -0500
>> Committer: yonik <yo...@apache.org>
>> Committed: Fri Feb 5 11:11:34 2016 -0500
>>
>> ________________________________
>>
>>  solr/CHANGES.txt                                |   4 +
>>  .../org/apache/solr/cloud/SyncStrategy.java     |   2 +-
>>  .../handler/component/RealTimeGetComponent.java |   8 +
>>  .../apache/solr/update/IndexFingerprint.java    | 232 +++++++++++++++++++
>>  .../java/org/apache/solr/update/PeerSync.java   |  78 +++++--
>>  .../java/org/apache/solr/update/UpdateLog.java  |  85 +++----
>>  .../org/apache/solr/update/PeerSyncTest.java    |  23 ++
>>  7 files changed, 374 insertions(+), 58 deletions(-)
>> ________________________________
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
>> ________________________________
>>
>> diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
>> index
>> f2067c6..2b7b46e 100644
>> --- a/solr/CHANGES.txt
>> +++ b/solr/CHANGES.txt
>> @@ -119,6 +119,10 @@ New Features
>>  * SOLR-8415: Provide command to switch between non/secure mode in ZK
>>    (Mike Drob, Gregory Chanan)
>>
>> +* SOLR-8586: added index fingerprint, a hash over all versions currently
>> in the index.
>> +  PeerSync now uses this to check if replicas are in sync. (yonik)
>> +
>> +
>>  Bug Fixes
>>  ----------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> ________________________________
>>
>> diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> index 7a16598..d811f5c 100644
>> ---
>> a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> @@ -173,7 +173,7 @@ public class SyncStrategy {
>>      // if we can't reach a replica for sync, we still consider the
>> overall sync a success
>>      // TODO: as an assurance, we should still try and tell the sync nodes
>> that we couldn't reach
>>      // to recover once more?
>> -    PeerSync peerSync = new PeerSync(core, syncWith,
>> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true,
>> peerSyncOnlyWithActive);
>> +    PeerSync peerSync = new PeerSync(core, syncWith,
>> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true,
>> peerSyncOnlyWithActive, false);
>>      return peerSync.sync();
>>    }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> index 2bbf5a2..14a4185 100644
>> ---
>> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> +++
>> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> @@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
>>  import org.apache.solr.search.SolrIndexSearcher;
>>  import org.apache.solr.search.SolrReturnFields;
>>  import org.apache.solr.update.DocumentBuilder;
>> +import org.apache.solr.update.IndexFingerprint;
>>
>> import org.apache.solr.update.PeerSync;
>>  import org.apache.solr.update.UpdateLog;
>>  import org.apache.solr.util.RefCounted;
>> @@ -536,6 +537,8 @@ public class RealTimeGetComponent extends
>> SearchComponent
>>      int nVersions = params.getInt("getVersions", -1);
>>      if (nVersions == -1) return;
>>
>> +    boolean doFingerprint = params.getBool("fingerprint", false);
>> +
>>      String sync = params.get("sync");
>>      if (sync != null) {
>>        processSync(rb, nVersions, sync);
>> @@ -548,6 +551,11 @@ public class RealTimeGetComponent extends
>> SearchComponent
>>      try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates())
>> {
>>        rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
>>      }
>> +
>> +    if (doFingerprint) {
>> +      IndexFingerprint fingerprint =
>> IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
>> +      rb.rsp.add("fingerprint", fingerprint.toObject());
>> +    }
>>    }
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> new file mode 100644
>> index 0000000..c73b57b
>> --- /dev/null
>> +++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> @@ -0,0 +1,232 @@
>> +package org.apache.solr.update;
>> +
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License,
>> Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +import java.io.IOException;
>> +import java.lang.invoke.MethodHandles;
>> +import java.net.ConnectException;
>> +import java.net.SocketException;
>> +import java.util.ArrayList;
>> +import java.util.Collections;
>> +import
>> java.util.Comparator;
>> +import java.util.HashSet;
>> +import java.util.LinkedHashMap;
>> +import java.util.List;
>> +import java.util.Map;
>> +import java.util.Set;
>> +
>> +import org.apache.http.NoHttpResponseException;
>> +import org.apache.http.client.HttpClient;
>> +import org.apache.http.conn.ConnectTimeoutException;
>> +import org.apache.lucene.index.LeafReader;
>> +import org.apache.lucene.index.LeafReaderContext;
>> +import org.apache.lucene.queries.function.FunctionValues;
>> +import org.apache.lucene.queries.function.ValueSource;
>> +import org.apache.lucene.util.Bits;
>> +import org.apache.lucene.util.BytesRef;
>> +import org.apache.solr.client.solrj.SolrServerException;
>> +import org.apache.solr.cloud.ZkController;
>> +import org.apache.solr.common.SolrException;
>> +import org.apache.solr.common.SolrInputDocument;
>> +import org.apache.solr.common.params.ModifiableSolrParams;
>> +import org.apache.solr.common.util.Hash;
>> +import org.apache.solr.common.util.NamedList;
>> +import org.apache.solr.common.util.StrUtils;
>> +import org.apache.solr.core.SolrCore;
>> +import org.apache.solr.handler.component.HttpShardHandlerFactory;
>> +import org.apache.solr.handler.component.ShardHandler;
>> +import org.apache.solr.handler.component.ShardHandlerFactory;
>> +import org.apache.solr.handler.component.ShardRequest;
>> +import org.apache.solr.handler.component.ShardResponse;
>> +import org.apache.solr.logging.MDCLoggingContext;
>> +import org.apache.solr.request.LocalSolrQueryRequest;
>> +import org.apache.solr.request.SolrQueryRequest;
>> +import org.apache.solr.response.SolrQueryResponse;
>> +import org.apache.solr.schema.SchemaField;
>> +import org.apache.solr.search.SolrIndexSearcher;
>> +import org.apache.solr.update.processor.UpdateRequestProcessor;
>> +import org.apache.solr.update.processor.UpdateRequestProcessorChain;
>> +import org.apache.solr.util.RefCounted;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import static
>> org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
>> +import static
>> org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
>> +
>> +/** @lucene.internal */
>> +public class IndexFingerprint {
>> +  private static final Logger log =
>> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>> +
>> +  private long maxVersionSpecified;
>> +  private long maxVersionEncountered;
>> +  private long maxInHash;
>> +  private long versionsHash;
>> +  private long numVersions;
>> +  private long numDocs;
>> +  private long maxDoc;
>> +
>> +  public long getMaxVersionSpecified() {
>> +    return maxVersionSpecified;
>> +  }
>> +
>> +  public long getMaxVersionEncountered() {
>> +    return maxVersionEncountered;
>> +  }
>> +
>> +  public long getMaxInHash() {
>> +    return
>> maxInHash;
>> +  }
>> +
>> +  public long getVersionsHash() {
>> +    return versionsHash;
>> +  }
>> +
>> +  public long getNumVersions() {
>> +    return numVersions;
>> +  }
>> +
>> +  public long getNumDocs() {
>> +    return numDocs;
>> +  }
>> +
>> +  public long getMaxDoc() {
>> +    return maxDoc;
>> +  }
>> +
>> +  /** Opens a new realtime searcher and returns it's fingerprint */
>> +  public static IndexFingerprint getFingerprint(SolrCore core, long
>> maxVersion) throws IOException {
>> +    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
>> +    RefCounted<SolrIndexSearcher> newestSearcher =
>> core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
>> +    try {
>> +      return getFingerprint(newestSearcher.get(), maxVersion);
>> +    } finally {
>> +      if (newestSearcher != null) {
>> +        newestSearcher.decref();
>> +      }
>> +    }
>> +  }
>> +
>> +  public
>> static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long
>> maxVersion) throws IOException {
>> +    long start = System.currentTimeMillis();
>> +
>> +    SchemaField versionField =
>> VersionInfo.getAndCheckVersionField(searcher.getSchema());
>> +
>> +    IndexFingerprint f = new IndexFingerprint();
>> +    f.maxVersionSpecified = maxVersion;
>> +    f.maxDoc = searcher.maxDoc();
>> +
>> +    // TODO: this could be parallelized, or even cached per-segment if
>> performance becomes an issue
>> +    ValueSource vs = versionField.getType().getValueSource(versionField,
>> null);
>> +    Map funcContext = ValueSource.newContext(searcher);
>> +    vs.createWeight(funcContext, searcher);
>> +    for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves())
>> {
>> +      int maxDoc = ctx.reader().maxDoc();
>> +      f.numDocs += ctx.reader().numDocs();
>> +      Bits liveDocs = ctx.reader().getLiveDocs();
>> +      FunctionValues fv =
>> vs.getValues(funcContext, ctx);
>> +      for (int doc = 0; doc < maxDoc; doc++) {
>> +        if (liveDocs != null && !liveDocs.get(doc)) continue;
>> +        long v = fv.longVal(doc);
>> +        f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
>> +        if (v <= f.maxVersionSpecified) {
>> +          f.maxInHash = Math.max(v, f.maxInHash);
>> +          f.versionsHash += Hash.fmix64(v);
>> +          f.numVersions++;
>> +        }
>> +      }
>> +    }
>> +
>> +    long end = System.currentTimeMillis();
>> +    log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
>> +
>> +    return f;
>> +  }
>> +
>> +  /** returns 0 for equal, negative if f1 is less recent than f2,
>> positive if more recent */
>> +  public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
>> +    int cmp;
>> +
>> +    // NOTE: some way want number of docs in index to take
>> precedence over highest version (add-only systems for sure)
>> +
>> +    // if we're comparing all of the versions in the index, then go by
>> the highest encountered.
>> +    if (f1.maxVersionSpecified == Long.MAX_VALUE) {
>> +      cmp = Long.compare(f1.maxVersionEncountered,
>> f2.maxVersionEncountered);
>> +      if (cmp != 0) return cmp;
>> +    }
>> +
>> +    // Go by the highest version under the requested max.
>> +    cmp = Long.compare(f1.maxInHash, f2.maxInHash);
>> +    if (cmp != 0) return cmp;
>> +
>> +    // go by who has the most documents in the index
>> +    cmp = Long.compare(f1.numVersions, f2.numVersions);
>> +    if (cmp != 0) return cmp;
>> +
>> +    // both have same number of documents, so go by hash
>> +    cmp = Long.compare(f1.versionsHash, f2.versionsHash);
>> +    return cmp;
>> +  }
>> +
>> +  /**
>> +   * Create a generic object suitable for serializing with
>> ResponseWriters
>> +   */
>> +  public Object
>> toObject() {
>> +    Map<String,Object> map = new LinkedHashMap<>();
>> +    map.put("maxVersionSpecified", maxVersionSpecified);
>> +    map.put("maxVersionEncountered", maxVersionEncountered);
>> +    map.put("maxInHash", maxInHash);
>> +    map.put("versionsHash", versionsHash);
>> +    map.put("numVersions", numVersions);
>> +    map.put("numDocs", numDocs);
>> +    map.put("maxDoc", maxDoc);
>> +    return map;
>> +  }
>> +
>> +  private static long getLong(Object o, String key, long def) {
>> +    long v = def;
>> +
>> +    Object oval = null;
>> +    if (o instanceof Map) {
>> +      oval = ((Map)o).get(key);
>> +    } else if (o instanceof NamedList) {
>> +      oval = ((NamedList)o).get(key);
>> +    }
>> +
>> +    return oval != null ? ((Number)oval).longValue() : def;
>> +  }
>> +
>> +  /**
>> +   * Create an IndexFingerprint object from a deserialized generic object
>> (Map or NamedList)
>> +   */
>> +
>> public static IndexFingerprint fromObject(Object o) {
>> +    IndexFingerprint f = new IndexFingerprint();
>> +    f.maxVersionSpecified = getLong(o, "maxVersionSpecified",
>> Long.MAX_VALUE);
>> +    f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
>> +    f.maxInHash = getLong(o, "maxInHash", -1);
>> +    f.versionsHash = getLong(o, "versionsHash", -1);
>> +    f.numVersions = getLong(o, "numVersions", -1);
>> +    f.numDocs = getLong(o, "numDocs", -1);
>> +    f.maxDoc = getLong(o, "maxDoc", -1);
>> +    return f;
>> +  }
>> +
>> +  @Override
>> +  public String toString() {
>> +    return toObject().toString();
>> +  }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> index c5ddaf0..dbc0091 100644
>> --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> @@ -68,6 +68,7 @@ public class PeerSync  {
>>    private UpdateLog ulog;
>>    private HttpShardHandlerFactory shardHandlerFactory;
>>    private ShardHandler shardHandler;
>> +  private List<SyncShardRequest> requests = new ArrayList<>();
>>
>>    private List<Long> startingVersions;
>>
>> @@ -76,8 +77,10 @@ public class PeerSync  {
>>    private Set<Long> requestedUpdateSet;
>>    private long ourLowThreshold;  // 20th percentile
>>    private long ourHighThreshold; // 80th percentile
>> +  private long ourHighest;  // currently just used for logging/debugging
>> purposes
>>    private final boolean cantReachIsSuccess;
>>    private final
>> boolean getNoVersionsIsSuccess;
>> +  private final boolean doFingerprint;
>>    private final HttpClient client;
>>    private final boolean onlyIfActive;
>>    private SolrCore core;
>> @@ -116,6 +119,8 @@ public class PeerSync  {
>>
>>    private static class SyncShardRequest extends ShardRequest {
>>      List<Long> reportedVersions;
>> +    IndexFingerprint fingerprint;
>> +    boolean doFingerprintComparison;
>>      List<Long> requestedUpdates;
>>      Exception updateException;
>>    }
>> @@ -125,16 +130,17 @@ public class PeerSync  {
>>    }
>>
>>    public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
>> -    this(core, replicas, nUpdates, cantReachIsSuccess,
>> getNoVersionsIsSuccess, false);
>> +    this(core, replicas, nUpdates, cantReachIsSuccess,
>> getNoVersionsIsSuccess, false, true);
>>    }
>>
>> -  public PeerSync(SolrCore
>> core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess,
>> boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
>> +  public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>> onlyIfActive, boolean doFingerprint) {
>>      this.core = core;
>>      this.replicas = replicas;
>>      this.nUpdates = nUpdates;
>>      this.maxUpdates = nUpdates;
>>      this.cantReachIsSuccess = cantReachIsSuccess;
>>      this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
>> +    this.doFingerprint = doFingerprint;
>>      this.client =
>> core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
>>      this.onlyIfActive = onlyIfActive;
>>
>> @@ -169,9 +175,8 @@ public class PeerSync  {
>>      return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
>>    }
>>
>> -  /** Returns true if peer sync was successful,
>> meaning that this core may not be considered to have the latest updates
>> -   *  when considering the last N updates between it and its peers.
>> -   *  A commit is not performed.
>> +  /** Returns true if peer sync was successful, meaning that this core
>> may be considered to have the latest updates.
>> +   * It does not mean that the remote replica is in sync with us.
>>     */
>>    public boolean sync() {
>>      if (ulog == null) {
>> @@ -210,7 +215,7 @@ public class PeerSync  {
>>
>>          ourLowThreshold = percentile(startingVersions, 0.8f);
>>          ourHighThreshold = percentile(startingVersions, 0.2f);
>> -
>> +
>>          // now make sure that the starting updates overlap our updates
>>          // there shouldn't be reorders, so any overlap will do.
>>
>> @@ -231,6 +236,7 @@ public class PeerSync  {
>>          }
>>
>>          ourUpdates = newList;
>> +
>> Collections.sort(ourUpdates, absComparator);
>>        } else {
>>
>>          if (ourUpdates.size() > 0) {
>> @@ -243,9 +249,10 @@ public class PeerSync  {
>>            return false;
>>          }
>>        }
>> -
>> +
>> +      ourHighest = ourUpdates.get(0);
>>        ourUpdateSet = new HashSet<>(ourUpdates);
>> -      requestedUpdateSet = new HashSet<>(ourUpdates);
>> +      requestedUpdateSet = new HashSet<>();
>>
>>        for (;;) {
>>          ShardResponse srsp = shardHandler.takeCompletedOrError();
>> @@ -257,9 +264,18 @@ public class PeerSync  {
>>            return false;
>>          }
>>        }
>> -
>> -      log.info(msg() + "DONE. sync succeeded");
>> -      return true;
>> +
>> +      // finish up any comparisons with other shards that we deferred
>> +      boolean success = true;
>> +      for (SyncShardRequest sreq : requests)
>> {
>> +        if (sreq.doFingerprintComparison) {
>> +          success = compareFingerprint(sreq);
>> +          if (!success) break;
>> +        }
>> +      }
>> +
>> +      log.info(msg() + "DONE. sync " + (success ? "succeeded" :
>> "failed"));
>> +      return success;
>>      } finally {
>>        MDCLoggingContext.clear();
>>      }
>> @@ -267,6 +283,7 @@ public class PeerSync  {
>>
>>    private void requestVersions(String replica) {
>>      SyncShardRequest sreq = new SyncShardRequest();
>> +    requests.add(sreq);
>>      sreq.purpose = 1;
>>      sreq.shards = new String[]{replica};
>>      sreq.actualShards = sreq.shards;
>> @@ -274,6 +291,7 @@ public class PeerSync  {
>>      sreq.params.set("qt","/get");
>>      sreq.params.set("distrib",false);
>>      sreq.params.set("getVersions",nUpdates);
>> +    sreq.params.set("fingerprint",doFingerprint);
>>      shardHandler.submit(sreq, replica,
>> sreq.params);
>>    }
>>
>> @@ -355,7 +373,12 @@ public class PeerSync  {
>>      SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
>>      sreq.reportedVersions =  otherVersions;
>>
>> -    log.info(msg() + " Received " + otherVersions.size() + " versions
>> from " + sreq.shards[0] );
>> +    Object fingerprint =
>> srsp.getSolrResponse().getResponse().get("fingerprint");
>> +
>> +    log.info(msg() + " Received " + otherVersions.size() + " versions
>> from " + sreq.shards[0] + " fingerprint:" + fingerprint );
>> +    if (fingerprint != null) {
>> +      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
>> +    }
>>
>>      if (otherVersions.size() == 0) {
>>        return getNoVersionsIsSuccess;
>> @@ -371,13 +394,14 @@ public class PeerSync  {
>>
>>      long otherHigh = percentile(otherVersions, .2f);
>>      long otherLow =
>> percentile(otherVersions, .8f);
>> +    long otherHighest = otherVersions.get(0);
>>
>>      if (ourHighThreshold < otherLow) {
>>        // Small overlap between version windows and ours is older
>>        // This means that we might miss updates if we attempted to use
>> this method.
>>        // Since there exists just one replica that is so much newer, we
>> must
>>        // fail the sync.
>> -      log.info(msg() + " Our versions are too old.
>> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
>> +      log.info(msg() + " Our versions are too old.
>> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + "
>> ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>>        return false;
>>      }
>>
>> @@ -385,7 +409,10 @@ public class PeerSync  {
>>        // Small overlap between windows and ours is newer.
>>        // Using this list to sync would
>> result in requesting/replaying results we don't need
>>        // and possibly bringing deleted docs back to life.
>> -      log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>> +      log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest="
>> + ourHighest + " otherHighest=" + otherHighest);
>> +
>> +      // Because our versions are newer, IndexFingerprint with the remote
>> would not match us.
>> +      // We return true on our side, but the remote peersync with us
>> should fail.
>>        return true;
>>      }
>>
>> @@ -408,9 +435,15 @@ public class PeerSync  {
>>      sreq.requestedUpdates = toRequest;
>>
>>      if (toRequest.isEmpty()) {
>> -      log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + "
>> otherHigh="+otherHigh);
>> +      log.info(msg() + " No additional versions requested.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest="
>> + ourHighest + " otherHighest=" + otherHighest);
>>
>>        // we had (or already requested) all the updates referenced by the
>> replica
>> +
>> +      // If we requested updates from another replica, we can't compare
>> fingerprints yet with this replica, we need to defer
>> +      if (doFingerprint) {
>> +        sreq.doFingerprintComparison = true;
>> +      }
>> +
>>        return true;
>>      }
>>
>> @@ -422,6 +455,19 @@ public class PeerSync  {
>>      return requestUpdates(srsp, toRequest);
>>    }
>>
>> +  private boolean compareFingerprint(SyncShardRequest sreq) {
>> +    if (sreq.fingerprint == null) return true;
>> +    try {
>> +      IndexFingerprint ourFingerprint =
>> IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
>> +
>> int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
>> +      log.info("Fingerprint comparison: " + cmp);
>> +      return cmp == 0;  // currently, we only check for equality...
>> +    } catch(IOException e){
>> +      log.error(msg() + "Error getting index fingerprint", e);
>> +      return false;
>> +    }
>> +  }
>> +
>>    private boolean requestUpdates(ShardResponse srsp, List<Long>
>> toRequest) {
>>      String replica = srsp.getShardRequest().shards[0];
>>
>> @@ -556,7 +602,7 @@ public class PeerSync  {
>>        }
>>      }
>>
>> -    return true;
>> +    return compareFingerprint(sreq);
>>    }
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> ________________________________
>>
>> diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> index 7d7d3ef..7e3fc9f 100644
>> --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> @@ -283,7 +283,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>      if (debug) {
>>        log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing
>> tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
>>      }
>> -
>> +
>>      TransactionLog oldLog = null;
>>      for (String oldLogName : tlogFiles) {
>>        File f = new File(tlogDir, oldLogName);
>> @@ -334,11 +334,11 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>      }
>>
>>    }
>> -
>> +
>>    public String getLogDir() {
>>      return tlogDir.getAbsolutePath();
>>    }
>> -
>> +
>>    public List<Long>
>> getStartingVersions() {
>>      return startingVersions;
>>    }
>> @@ -503,32 +503,35 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
>>          // given that we just did a delete-by-query, we don't know what
>> documents were
>>          // affected and hence we must purge our caches.
>> -        if (map != null) map.clear();
>> -        if (prevMap != null) prevMap.clear();
>> -        if (prevMap2 != null) prevMap2.clear();
>> -
>> +        openRealtimeSearcher();
>>          trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
>>
>> -        // oldDeletes.clear();
>> -
>> -        // We must cause a new IndexReader to be opened before anything
>> looks at these caches again
>> -        // so that a cache miss will read fresh data.
>> -        //
>> -        // TODO: FUTURE: open a new searcher lazily for better throughput
>> with delete-by-query commands
>> -
>>    try {
>> -          RefCounted<SolrIndexSearcher> holder =
>> uhandler.core.openNewSearcher(true, true);
>> -          holder.decref();
>> -        } catch (Exception e) {
>> -          SolrException.log(log, "Error opening realtime searcher for
>> deleteByQuery", e);
>> +        if (trace) {
>> +          LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>> +          log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>> tlog + " " + ptr + " map=" + System.identityHashCode(map));
>>          }
>> -
>>        }
>> +    }
>> +  }
>>
>> -      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>> -
>> -      if (trace) {
>> -        log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>> tlog + " " + ptr + " map=" + System.identityHashCode(map));
>> +  /** Opens a new realtime searcher and clears the id caches.
>> +   * This may also be called when we updates are being buffered (from
>> PeerSync/IndexFingerprint)
>> +   */
>> +
>> public void openRealtimeSearcher() {
>> +    synchronized (this) {
>> +      // We must cause a new IndexReader to be opened before anything
>> looks at these caches again
>> +      // so that a cache miss will read fresh data.
>> +      try {
>> +        RefCounted<SolrIndexSearcher> holder =
>> uhandler.core.openNewSearcher(true, true);
>> +        holder.decref();
>> +      } catch (Exception e) {
>> +        SolrException.log(log, "Error opening realtime searcher", e);
>> +        return;
>>        }
>> +
>> +      if (map != null) map.clear();
>> +      if (prevMap != null) prevMap.clear();
>> +      if (prevMap2 != null) prevMap2.clear();
>>      }
>>    }
>>
>> @@ -620,7 +623,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>    public boolean hasUncommittedChanges() {
>>      return tlog != null;
>>    }
>> -
>> +
>>    public void preCommit(CommitUpdateCommand cmd) {
>>      synchronized (this) {
>>
>>   if (debug) {
>> @@ -878,11 +881,11 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        theLog.forceClose();
>>      }
>>    }
>> -
>> +
>>    public void close(boolean committed) {
>>      close(committed, false);
>>    }
>> -
>> +
>>    public void close(boolean committed, boolean deleteOnClose) {
>>      synchronized (this) {
>>        recoveryExecutor.shutdown(); // no new tasks
>> @@ -923,7 +926,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        this.id = id;
>>      }
>>    }
>> -
>> +
>>    public class RecentUpdates implements Closeable {
>>
>>      final Deque<TransactionLog> logList;    // newest first
>> @@ -950,17 +953,17 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>
>>      public List<Long> getVersions(int n) {
>>        List<Long> ret = new ArrayList<>(n);
>> -
>> +
>>        for
>> (List<Update> singleList : updateList) {
>>          for (Update ptr : singleList) {
>>            ret.add(ptr.version);
>>            if (--n <= 0) return ret;
>>          }
>>        }
>> -
>> +
>>        return ret;
>>      }
>> -
>> +
>>      public Object lookup(long version) {
>>        Update update = updates.get(version);
>>        if (update == null) return null;
>> @@ -1004,7 +1007,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>              try {
>>                o = reader.next();
>>                if (o==null) break;
>> -
>> +
>>                // should currently be a List<Oper,Ver,Doc/Id>
>>                List entry = (List)o;
>>
>> @@ -1027,13 +1030,13 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>
>>                    updatesForLog.add(update);
>>                    updates.put(version, update);
>> -
>> +
>>
>>                  if (oper == UpdateLog.DELETE_BY_QUERY) {
>>                      deleteByQueryList.add(update);
>>                    } else if (oper == UpdateLog.DELETE) {
>>                      deleteList.add(new DeleteUpdate(version,
>> (byte[])entry.get(2)));
>>                    }
>> -
>> +
>>                    break;
>>
>>                  case UpdateLog.COMMIT:
>> @@ -1063,7 +1066,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        }
>>
>>      }
>> -
>> +
>>      @Override
>>      public void close() {
>>        for (TransactionLog log : logList) {
>> @@ -1331,10 +1334,10 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>                          "log replay status {} active={} starting pos={}
>> current pos={} current size={} % read={}",
>>                          translog, activeLog,
>> recoveryInfo.positionOfStart, cpos, csize,
>>                          Math.floor(cpos /
>> (double) csize * 100.));
>> -
>> +
>>                }
>>              }
>> -
>> +
>>              o = null;
>>              o = tlogReader.next();
>>              if (o == null && activeLog) {
>> @@ -1513,25 +1516,25 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        }
>>      }
>>    }
>> -
>> +
>>    protected String getTlogDir(SolrCore core, PluginInfo info) {
>>      String dataDir = (String) info.initArgs.get("dir");
>> -
>> +
>>      String ulogDir = core.getCoreDescriptor().getUlogDir();
>>      if (ulogDir != null) {
>>        dataDir = ulogDir;
>>      }
>> -
>> +
>>      if (dataDir == null || dataDir.length() == 0) {
>>        dataDir = core.getDataDir();
>>      }
>>
>>      return dataDir + "/" + TLOG_NAME;
>>    }
>> -
>> +
>>    /**
>>     * Clears the logs on the file system. Only call before init.
>> -   *
>> +   *
>>
>>  * @param core the SolrCore
>>     * @param ulogPluginInfo the init info for the UpdateHandler
>>     */
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> ________________________________
>>
>> diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> index 8083ad0..bcaf846 100644
>> --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> @@ -168,6 +168,29 @@ public class PeerSyncTest extends
>> BaseDistributedSearchTestCase {
>>
>>      assertSync(client1, numVersions, true, shardsArr[0]);
>>      client0.commit(); client1.commit(); queryAndCompare(params("q",
>> "*:*", "sort","_version_ desc"), client0,
>> client1);
>> +
>> +    // now lets check fingerprinting causes appropriate fails
>> +    v = 4000;
>> +    add(client0, seenLeader,
>> sdoc("id",Integer.toString((int)v),"_version_",v));
>> +    toAdd = numVersions+10;
>> +    for (int i=0; i<toAdd; i++) {
>> +      add(client0, seenLeader,
>> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>> +      add(client1, seenLeader,
>> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>> +    }
>> +
>> +    // client0 now has an additional add beyond our window and the
>> fingerprint should cause this to fail
>> +    assertSync(client1, numVersions, false, shardsArr[0]);
>> +
>> +    // lets add the missing document and verify that order doesn't matter
>> +    add(client1, seenLeader,
>> sdoc("id",Integer.toString((int)v),"_version_",v));
>> +    assertSync(client1, numVersions, true, shardsArr[0]);
>> +
>> +    // lets do some overwrites to ensure that repeated updates and maxDoc
>> don't
>> matter
>> +    for (int i=0; i<10; i++) {
>> +      add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i +
>> 1), "_version_", v + i + 1));
>> +    }
>> +    assertSync(client1, numVersions, true, shardsArr[0]);
>> +
>>    }
>>
>>
>>
>
> --
> Uwe Schindler
> H.-H.-Meier-Allee 63, 28213 Bremen
> http://www.thetaphi.de

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org


Re: lucene-solr git commit: SOLR-8586: add index fingerprinting and use it in peersync (cherry picked from commit 629767b)

Posted by Uwe Schindler <uw...@thetaphi.de>.
Hi Yonik,

I committed a change yesterday. Could you backports this, too? It broke the forbidden tests' because it used currentTimeMillis instead of RTimer.

Uwe

Am 5. Februar 2016 17:14:34 MEZ, schrieb yonik@apache.org:
>Repository: lucene-solr
>Updated Branches:
>  refs/heads/branch_5x 482b40f84 -> ff83a4001
>
>
>SOLR-8586: add index fingerprinting and use it in peersync
>(cherry picked from commit 629767b)
>
>
>Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
>Commit:
>http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
>Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
>Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
>
>Branch: refs/heads/branch_5x
>Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
>Parents: 482b40f
>Author: yonik <yo...@apache.org>
>Authored: Thu Feb 4 14:54:08 2016 -0500
>Committer: yonik <yo...@apache.org>
>Committed: Fri Feb 5 11:11:34 2016 -0500
>
>----------------------------------------------------------------------
> solr/CHANGES.txt                                |   4 +
> .../org/apache/solr/cloud/SyncStrategy.java     |   2 +-
> .../handler/component/RealTimeGetComponent.java |   8 +
>.../apache/solr/update/IndexFingerprint.java    | 232
>+++++++++++++++++++
> .../java/org/apache/solr/update/PeerSync.java   |  78 +++++--
> .../java/org/apache/solr/update/UpdateLog.java  |  85 +++----
> .../org/apache/solr/update/PeerSyncTest.java    |  23 ++
> 7 files changed, 374 insertions(+), 58 deletions(-)
>----------------------------------------------------------------------
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
>----------------------------------------------------------------------
>diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
>index f2067c6..2b7b46e 100644
>--- a/solr/CHANGES.txt
>+++ b/solr/CHANGES.txt
>@@ -119,6 +119,10 @@ New Features
> * SOLR-8415: Provide command to switch between non/secure mode in ZK
>   (Mike Drob, Gregory Chanan)
> 
>+* SOLR-8586: added index fingerprint, a hash over all versions
>currently in the index.
>+  PeerSync now uses this to check if replicas are in sync. (yonik)
>+
>+
> Bug Fixes
> ----------------------
> 
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>index 7a16598..d811f5c 100644
>--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>@@ -173,7 +173,7 @@ public class SyncStrategy {
>// if we can't reach a replica for sync, we still consider the overall
>sync a success
>// TODO: as an assurance, we should still try and tell the sync nodes
>that we couldn't reach
>     // to recover once more?
>-    PeerSync peerSync = new PeerSync(core, syncWith,
>core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true,
>true, peerSyncOnlyWithActive);
>+    PeerSync peerSync = new PeerSync(core, syncWith,
>core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true,
>true, peerSyncOnlyWithActive, false);
>     return peerSync.sync();
>   }
>   
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>index 2bbf5a2..14a4185 100644
>---
>a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>+++
>b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>@@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
> import org.apache.solr.search.SolrIndexSearcher;
> import org.apache.solr.search.SolrReturnFields;
> import org.apache.solr.update.DocumentBuilder;
>+import org.apache.solr.update.IndexFingerprint;
> import org.apache.solr.update.PeerSync;
> import org.apache.solr.update.UpdateLog;
> import org.apache.solr.util.RefCounted;
>@@ -536,6 +537,8 @@ public class RealTimeGetComponent extends
>SearchComponent
>     int nVersions = params.getInt("getVersions", -1);
>     if (nVersions == -1) return;
> 
>+    boolean doFingerprint = params.getBool("fingerprint", false);
>+
>     String sync = params.get("sync");
>     if (sync != null) {
>       processSync(rb, nVersions, sync);
>@@ -548,6 +551,11 @@ public class RealTimeGetComponent extends
>SearchComponent
>try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
>       rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
>     }
>+
>+    if (doFingerprint) {
>+      IndexFingerprint fingerprint =
>IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
>+      rb.rsp.add("fingerprint", fingerprint.toObject());
>+    }
>   }
> 
>   
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>new file mode 100644
>index 0000000..c73b57b
>--- /dev/null
>+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>@@ -0,0 +1,232 @@
>+package org.apache.solr.update;
>+
>+/*
>+ * Licensed to the Apache Software Foundation (ASF) under one or more
>+ * contributor license agreements.  See the NOTICE file distributed
>with
>+ * this work for additional information regarding copyright ownership.
>+ * The ASF licenses this file to You under the Apache License, Version
>2.0
>+ * (the "License"); you may not use this file except in compliance
>with
>+ * the License.  You may obtain a copy of the License at
>+ *
>+ *     http://www.apache.org/licenses/LICENSE-2.0
>+ *
>+ * Unless required by applicable law or agreed to in writing, software
>+ * distributed under the License is distributed on an "AS IS" BASIS,
>+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>implied.
>+ * See the License for the specific language governing permissions and
>+ * limitations under the License.
>+ */
>+
>+import java.io.IOException;
>+import java.lang.invoke.MethodHandles;
>+import java.net.ConnectException;
>+import java.net.SocketException;
>+import java.util.ArrayList;
>+import java.util.Collections;
>+import java.util.Comparator;
>+import java.util.HashSet;
>+import java.util.LinkedHashMap;
>+import java.util.List;
>+import java.util.Map;
>+import java.util.Set;
>+
>+import org.apache.http.NoHttpResponseException;
>+import org.apache.http.client.HttpClient;
>+import org.apache.http.conn.ConnectTimeoutException;
>+import org.apache.lucene.index.LeafReader;
>+import org.apache.lucene.index.LeafReaderContext;
>+import org.apache.lucene.queries.function.FunctionValues;
>+import org.apache.lucene.queries.function.ValueSource;
>+import org.apache.lucene.util.Bits;
>+import org.apache.lucene.util.BytesRef;
>+import org.apache.solr.client.solrj.SolrServerException;
>+import org.apache.solr.cloud.ZkController;
>+import org.apache.solr.common.SolrException;
>+import org.apache.solr.common.SolrInputDocument;
>+import org.apache.solr.common.params.ModifiableSolrParams;
>+import org.apache.solr.common.util.Hash;
>+import org.apache.solr.common.util.NamedList;
>+import org.apache.solr.common.util.StrUtils;
>+import org.apache.solr.core.SolrCore;
>+import org.apache.solr.handler.component.HttpShardHandlerFactory;
>+import org.apache.solr.handler.component.ShardHandler;
>+import org.apache.solr.handler.component.ShardHandlerFactory;
>+import org.apache.solr.handler.component.ShardRequest;
>+import org.apache.solr.handler.component.ShardResponse;
>+import org.apache.solr.logging.MDCLoggingContext;
>+import org.apache.solr.request.LocalSolrQueryRequest;
>+import org.apache.solr.request.SolrQueryRequest;
>+import org.apache.solr.response.SolrQueryResponse;
>+import org.apache.solr.schema.SchemaField;
>+import org.apache.solr.search.SolrIndexSearcher;
>+import org.apache.solr.update.processor.UpdateRequestProcessor;
>+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
>+import org.apache.solr.util.RefCounted;
>+import org.slf4j.Logger;
>+import org.slf4j.LoggerFactory;
>+
>+import static
>org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
>+import static
>org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
>+
>+/** @lucene.internal */
>+public class IndexFingerprint {
>+  private static final Logger log =
>LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>+
>+  private long maxVersionSpecified;
>+  private long maxVersionEncountered;
>+  private long maxInHash;
>+  private long versionsHash;
>+  private long numVersions;
>+  private long numDocs;
>+  private long maxDoc;
>+
>+  public long getMaxVersionSpecified() {
>+    return maxVersionSpecified;
>+  }
>+
>+  public long getMaxVersionEncountered() {
>+    return maxVersionEncountered;
>+  }
>+
>+  public long getMaxInHash() {
>+    return maxInHash;
>+  }
>+
>+  public long getVersionsHash() {
>+    return versionsHash;
>+  }
>+
>+  public long getNumVersions() {
>+    return numVersions;
>+  }
>+
>+  public long getNumDocs() {
>+    return numDocs;
>+  }
>+
>+  public long getMaxDoc() {
>+    return maxDoc;
>+  }
>+
>+  /** Opens a new realtime searcher and returns it's fingerprint */
>+  public static IndexFingerprint getFingerprint(SolrCore core, long
>maxVersion) throws IOException {
>+    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
>+    RefCounted<SolrIndexSearcher> newestSearcher =
>core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
>+    try {
>+      return getFingerprint(newestSearcher.get(), maxVersion);
>+    } finally {
>+      if (newestSearcher != null) {
>+        newestSearcher.decref();
>+      }
>+    }
>+  }
>+
>+  public static IndexFingerprint getFingerprint(SolrIndexSearcher
>searcher, long maxVersion) throws IOException {
>+    long start = System.currentTimeMillis();
>+
>+    SchemaField versionField =
>VersionInfo.getAndCheckVersionField(searcher.getSchema());
>+
>+    IndexFingerprint f = new IndexFingerprint();
>+    f.maxVersionSpecified = maxVersion;
>+    f.maxDoc = searcher.maxDoc();
>+
>+    // TODO: this could be parallelized, or even cached per-segment if
>performance becomes an issue
>+    ValueSource vs =
>versionField.getType().getValueSource(versionField, null);
>+    Map funcContext = ValueSource.newContext(searcher);
>+    vs.createWeight(funcContext, searcher);
>+    for (LeafReaderContext ctx :
>searcher.getTopReaderContext().leaves()) {
>+      int maxDoc = ctx.reader().maxDoc();
>+      f.numDocs += ctx.reader().numDocs();
>+      Bits liveDocs = ctx.reader().getLiveDocs();
>+      FunctionValues fv = vs.getValues(funcContext, ctx);
>+      for (int doc = 0; doc < maxDoc; doc++) {
>+        if (liveDocs != null && !liveDocs.get(doc)) continue;
>+        long v = fv.longVal(doc);
>+        f.maxVersionEncountered = Math.max(v,
>f.maxVersionEncountered);
>+        if (v <= f.maxVersionSpecified) {
>+          f.maxInHash = Math.max(v, f.maxInHash);
>+          f.versionsHash += Hash.fmix64(v);
>+          f.numVersions++;
>+        }
>+      }
>+    }
>+
>+    long end = System.currentTimeMillis();
>+    log.info("IndexFingerprint millis:" + (end-start) + " result:" +
>f);
>+
>+    return f;
>+  }
>+
>+  /** returns 0 for equal, negative if f1 is less recent than f2,
>positive if more recent */
>+  public static int compare(IndexFingerprint f1, IndexFingerprint f2)
>{
>+    int cmp;
>+
>+    // NOTE: some way want number of docs in index to take precedence
>over highest version (add-only systems for sure)
>+
>+    // if we're comparing all of the versions in the index, then go by
>the highest encountered.
>+    if (f1.maxVersionSpecified == Long.MAX_VALUE) {
>+      cmp = Long.compare(f1.maxVersionEncountered,
>f2.maxVersionEncountered);
>+      if (cmp != 0) return cmp;
>+    }
>+
>+    // Go by the highest version under the requested max.
>+    cmp = Long.compare(f1.maxInHash, f2.maxInHash);
>+    if (cmp != 0) return cmp;
>+
>+    // go by who has the most documents in the index
>+    cmp = Long.compare(f1.numVersions, f2.numVersions);
>+    if (cmp != 0) return cmp;
>+
>+    // both have same number of documents, so go by hash
>+    cmp = Long.compare(f1.versionsHash, f2.versionsHash);
>+    return cmp;
>+  }
>+
>+  /**
>+   * Create a generic object suitable for serializing with
>ResponseWriters
>+   */
>+  public Object toObject() {
>+    Map<String,Object> map = new LinkedHashMap<>();
>+    map.put("maxVersionSpecified", maxVersionSpecified);
>+    map.put("maxVersionEncountered", maxVersionEncountered);
>+    map.put("maxInHash", maxInHash);
>+    map.put("versionsHash", versionsHash);
>+    map.put("numVersions", numVersions);
>+    map.put("numDocs", numDocs);
>+    map.put("maxDoc", maxDoc);
>+    return map;
>+  }
>+
>+  private static long getLong(Object o, String key, long def) {
>+    long v = def;
>+
>+    Object oval = null;
>+    if (o instanceof Map) {
>+      oval = ((Map)o).get(key);
>+    } else if (o instanceof NamedList) {
>+      oval = ((NamedList)o).get(key);
>+    }
>+
>+    return oval != null ? ((Number)oval).longValue() : def;
>+  }
>+
>+  /**
>+   * Create an IndexFingerprint object from a deserialized generic
>object (Map or NamedList)
>+   */
>+  public static IndexFingerprint fromObject(Object o) {
>+    IndexFingerprint f = new IndexFingerprint();
>+    f.maxVersionSpecified = getLong(o, "maxVersionSpecified",
>Long.MAX_VALUE);
>+    f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
>+    f.maxInHash = getLong(o, "maxInHash", -1);
>+    f.versionsHash = getLong(o, "versionsHash", -1);
>+    f.numVersions = getLong(o, "numVersions", -1);
>+    f.numDocs = getLong(o, "numDocs", -1);
>+    f.maxDoc = getLong(o, "maxDoc", -1);
>+    return f;
>+  }
>+
>+  @Override
>+  public String toString() {
>+    return toObject().toString();
>+  }
>+}
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>index c5ddaf0..dbc0091 100644
>--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>@@ -68,6 +68,7 @@ public class PeerSync  {
>   private UpdateLog ulog;
>   private HttpShardHandlerFactory shardHandlerFactory;
>   private ShardHandler shardHandler;
>+  private List<SyncShardRequest> requests = new ArrayList<>();
> 
>   private List<Long> startingVersions;
> 
>@@ -76,8 +77,10 @@ public class PeerSync  {
>   private Set<Long> requestedUpdateSet;
>   private long ourLowThreshold;  // 20th percentile
>   private long ourHighThreshold; // 80th percentile
>+  private long ourHighest;  // currently just used for
>logging/debugging purposes
>   private final boolean cantReachIsSuccess;
>   private final boolean getNoVersionsIsSuccess;
>+  private final boolean doFingerprint;
>   private final HttpClient client;
>   private final boolean onlyIfActive;
>   private SolrCore core;
>@@ -116,6 +119,8 @@ public class PeerSync  {
> 
>   private static class SyncShardRequest extends ShardRequest {
>     List<Long> reportedVersions;
>+    IndexFingerprint fingerprint;
>+    boolean doFingerprintComparison;
>     List<Long> requestedUpdates;
>     Exception updateException;
>   }
>@@ -125,16 +130,17 @@ public class PeerSync  {
>   }
>   
>public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
>-    this(core, replicas, nUpdates, cantReachIsSuccess,
>getNoVersionsIsSuccess, false);
>+    this(core, replicas, nUpdates, cantReachIsSuccess,
>getNoVersionsIsSuccess, false, true);
>   }
>   
>-  public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>onlyIfActive) {
>+  public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>onlyIfActive, boolean doFingerprint) {
>     this.core = core;
>     this.replicas = replicas;
>     this.nUpdates = nUpdates;
>     this.maxUpdates = nUpdates;
>     this.cantReachIsSuccess = cantReachIsSuccess;
>     this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
>+    this.doFingerprint = doFingerprint;
>this.client =
>core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
>     this.onlyIfActive = onlyIfActive;
>     
>@@ -169,9 +175,8 @@ public class PeerSync  {
>  return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
>   }
> 
>-  /** Returns true if peer sync was successful, meaning that this core
>may not be considered to have the latest updates
>-   *  when considering the last N updates between it and its peers.
>-   *  A commit is not performed.
>+  /** Returns true if peer sync was successful, meaning that this core
>may be considered to have the latest updates.
>+   * It does not mean that the remote replica is in sync with us.
>    */
>   public boolean sync() {
>     if (ulog == null) {
>@@ -210,7 +215,7 @@ public class PeerSync  {
>         
>         ourLowThreshold = percentile(startingVersions, 0.8f);
>         ourHighThreshold = percentile(startingVersions, 0.2f);
>-        
>+
>         // now make sure that the starting updates overlap our updates
>         // there shouldn't be reorders, so any overlap will do.
>         
>@@ -231,6 +236,7 @@ public class PeerSync  {
>         }
>         
>         ourUpdates = newList;
>+        Collections.sort(ourUpdates, absComparator);
>       } else {
>         
>         if (ourUpdates.size() > 0) {
>@@ -243,9 +249,10 @@ public class PeerSync  {
>           return false;
>         }
>       }
>-      
>+
>+      ourHighest = ourUpdates.get(0);
>       ourUpdateSet = new HashSet<>(ourUpdates);
>-      requestedUpdateSet = new HashSet<>(ourUpdates);
>+      requestedUpdateSet = new HashSet<>();
>       
>       for (;;) {
>         ShardResponse srsp = shardHandler.takeCompletedOrError();
>@@ -257,9 +264,18 @@ public class PeerSync  {
>           return false;
>         }
>       }
>-      
>-      log.info(msg() + "DONE. sync succeeded");
>-      return true;
>+
>+      // finish up any comparisons with other shards that we deferred
>+      boolean success = true;
>+      for (SyncShardRequest sreq : requests) {
>+        if (sreq.doFingerprintComparison) {
>+          success = compareFingerprint(sreq);
>+          if (!success) break;
>+        }
>+      }
>+
>+      log.info(msg() + "DONE. sync " + (success ? "succeeded" :
>"failed"));
>+      return success;
>     } finally {
>       MDCLoggingContext.clear();
>     }
>@@ -267,6 +283,7 @@ public class PeerSync  {
>   
>   private void requestVersions(String replica) {
>     SyncShardRequest sreq = new SyncShardRequest();
>+    requests.add(sreq);
>     sreq.purpose = 1;
>     sreq.shards = new String[]{replica};
>     sreq.actualShards = sreq.shards;
>@@ -274,6 +291,7 @@ public class PeerSync  {
>     sreq.params.set("qt","/get");
>     sreq.params.set("distrib",false);
>     sreq.params.set("getVersions",nUpdates);
>+    sreq.params.set("fingerprint",doFingerprint);
>     shardHandler.submit(sreq, replica, sreq.params);
>   }
> 
>@@ -355,7 +373,12 @@ public class PeerSync  {
>     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
>     sreq.reportedVersions =  otherVersions;
> 
>-    log.info(msg() + " Received " + otherVersions.size() + " versions
>from " + sreq.shards[0] );
>+    Object fingerprint =
>srsp.getSolrResponse().getResponse().get("fingerprint");
>+
>+    log.info(msg() + " Received " + otherVersions.size() + " versions
>from " + sreq.shards[0] + " fingerprint:" + fingerprint );
>+    if (fingerprint != null) {
>+      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
>+    }
> 
>     if (otherVersions.size() == 0) {
>       return getNoVersionsIsSuccess; 
>@@ -371,13 +394,14 @@ public class PeerSync  {
>     
>     long otherHigh = percentile(otherVersions, .2f);
>     long otherLow = percentile(otherVersions, .8f);
>+    long otherHighest = otherVersions.get(0);
> 
>     if (ourHighThreshold < otherLow) {
>       // Small overlap between version windows and ours is older
>// This means that we might miss updates if we attempted to use this
>method.
>  // Since there exists just one replica that is so much newer, we must
>       // fail the sync.
>-      log.info(msg() + " Our versions are too old.
>ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
>+      log.info(msg() + " Our versions are too old.
>ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow +
>" ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>       return false;
>     }
> 
>@@ -385,7 +409,10 @@ public class PeerSync  {
>       // Small overlap between windows and ours is newer.
>// Using this list to sync would result in requesting/replaying results
>we don't need
>       // and possibly bringing deleted docs back to life.
>-      log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>+      log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ "
>ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>+
>+      // Because our versions are newer, IndexFingerprint with the
>remote would not match us.
>+      // We return true on our side, but the remote peersync with us
>should fail.
>       return true;
>     }
>     
>@@ -408,9 +435,15 @@ public class PeerSync  {
>     sreq.requestedUpdates = toRequest;
>     
>     if (toRequest.isEmpty()) {
>-      log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>+      log.info(msg() + " No additional versions requested.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ "
>ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
> 
>// we had (or already requested) all the updates referenced by the
>replica
>+
>+      // If we requested updates from another replica, we can't
>compare fingerprints yet with this replica, we need to defer
>+      if (doFingerprint) {
>+        sreq.doFingerprintComparison = true;
>+      }
>+
>       return true;
>     }
>     
>@@ -422,6 +455,19 @@ public class PeerSync  {
>     return requestUpdates(srsp, toRequest);
>   }
> 
>+  private boolean compareFingerprint(SyncShardRequest sreq) {
>+    if (sreq.fingerprint == null) return true;
>+    try {
>+      IndexFingerprint ourFingerprint =
>IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
>+      int cmp = IndexFingerprint.compare(ourFingerprint,
>sreq.fingerprint);
>+      log.info("Fingerprint comparison: " + cmp);
>+      return cmp == 0;  // currently, we only check for equality...
>+    } catch(IOException e){
>+      log.error(msg() + "Error getting index fingerprint", e);
>+      return false;
>+    }
>+  }
>+
>private boolean requestUpdates(ShardResponse srsp, List<Long>
>toRequest) {
>     String replica = srsp.getShardRequest().shards[0];
> 
>@@ -556,7 +602,7 @@ public class PeerSync  {
>       }
>     }
> 
>-    return true;
>+    return compareFingerprint(sreq);
>   }
> 
> 
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>index 7d7d3ef..7e3fc9f 100644
>--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>@@ -283,7 +283,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>     if (debug) {
>log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing
>tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
>     }
>-    
>+
>     TransactionLog oldLog = null;
>     for (String oldLogName : tlogFiles) {
>       File f = new File(tlogDir, oldLogName);
>@@ -334,11 +334,11 @@ public class UpdateLog implements
>PluginInfoInitialized {
>     }
> 
>   }
>-  
>+
>   public String getLogDir() {
>     return tlogDir.getAbsolutePath();
>   }
>-  
>+
>   public List<Long> getStartingVersions() {
>     return startingVersions;
>   }
>@@ -503,32 +503,35 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
>// given that we just did a delete-by-query, we don't know what
>documents were
>         // affected and hence we must purge our caches.
>-        if (map != null) map.clear();
>-        if (prevMap != null) prevMap.clear();
>-        if (prevMap2 != null) prevMap2.clear();
>-
>+        openRealtimeSearcher();
>         trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
> 
>-        // oldDeletes.clear();
>-
>-        // We must cause a new IndexReader to be opened before
>anything looks at these caches again
>-        // so that a cache miss will read fresh data.
>-        //
>-        // TODO: FUTURE: open a new searcher lazily for better
>throughput with delete-by-query commands
>-        try {
>-          RefCounted<SolrIndexSearcher> holder =
>uhandler.core.openNewSearcher(true, true);
>-          holder.decref();
>-        } catch (Exception e) {
>-          SolrException.log(log, "Error opening realtime searcher for
>deleteByQuery", e);
>+        if (trace) {
>+          LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>+          log.trace("TLOG: added deleteByQuery " + cmd.query + " to "
>+ tlog + " " + ptr + " map=" + System.identityHashCode(map));
>         }
>-
>       }
>+    }
>+  }
> 
>-      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>-
>-      if (trace) {
>-        log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>tlog + " " + ptr + " map=" + System.identityHashCode(map));
>+  /** Opens a new realtime searcher and clears the id caches.
>+   * This may also be called when we updates are being buffered (from
>PeerSync/IndexFingerprint)
>+   */
>+  public void openRealtimeSearcher() {
>+    synchronized (this) {
>+      // We must cause a new IndexReader to be opened before anything
>looks at these caches again
>+      // so that a cache miss will read fresh data.
>+      try {
>+        RefCounted<SolrIndexSearcher> holder =
>uhandler.core.openNewSearcher(true, true);
>+        holder.decref();
>+      } catch (Exception e) {
>+        SolrException.log(log, "Error opening realtime searcher", e);
>+        return;
>       }
>+
>+      if (map != null) map.clear();
>+      if (prevMap != null) prevMap.clear();
>+      if (prevMap2 != null) prevMap2.clear();
>     }
>   }
> 
>@@ -620,7 +623,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>   public boolean hasUncommittedChanges() {
>     return tlog != null;
>   }
>-  
>+
>   public void preCommit(CommitUpdateCommand cmd) {
>     synchronized (this) {
>       if (debug) {
>@@ -878,11 +881,11 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       theLog.forceClose();
>     }
>   }
>-  
>+
>   public void close(boolean committed) {
>     close(committed, false);
>   }
>-  
>+
>   public void close(boolean committed, boolean deleteOnClose) {
>     synchronized (this) {
>       recoveryExecutor.shutdown(); // no new tasks
>@@ -923,7 +926,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       this.id = id;
>     }
>   }
>-  
>+
>   public class RecentUpdates implements Closeable {
> 
>     final Deque<TransactionLog> logList;    // newest first
>@@ -950,17 +953,17 @@ public class UpdateLog implements
>PluginInfoInitialized {
> 
>     public List<Long> getVersions(int n) {
>       List<Long> ret = new ArrayList<>(n);
>-      
>+
>       for (List<Update> singleList : updateList) {
>         for (Update ptr : singleList) {
>           ret.add(ptr.version);
>           if (--n <= 0) return ret;
>         }
>       }
>-      
>+
>       return ret;
>     }
>-    
>+
>     public Object lookup(long version) {
>       Update update = updates.get(version);
>       if (update == null) return null;
>@@ -1004,7 +1007,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>             try {
>               o = reader.next();
>               if (o==null) break;
>-              
>+
>               // should currently be a List<Oper,Ver,Doc/Id>
>               List entry = (List)o;
> 
>@@ -1027,13 +1030,13 @@ public class UpdateLog implements
>PluginInfoInitialized {
> 
>                   updatesForLog.add(update);
>                   updates.put(version, update);
>-                  
>+
>                   if (oper == UpdateLog.DELETE_BY_QUERY) {
>                     deleteByQueryList.add(update);
>                   } else if (oper == UpdateLog.DELETE) {
>       deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
>                   }
>-                  
>+
>                   break;
> 
>                 case UpdateLog.COMMIT:
>@@ -1063,7 +1066,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       }
> 
>     }
>-    
>+
>     @Override
>     public void close() {
>       for (TransactionLog log : logList) {
>@@ -1331,10 +1334,10 @@ public class UpdateLog implements
>PluginInfoInitialized {
>"log replay status {} active={} starting pos={} current pos={} current
>size={} % read={}",
>        translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
>                         Math.floor(cpos / (double) csize * 100.));
>-                
>+
>               }
>             }
>-            
>+
>             o = null;
>             o = tlogReader.next();
>             if (o == null && activeLog) {
>@@ -1513,25 +1516,25 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       }
>     }
>   }
>-  
>+
>   protected String getTlogDir(SolrCore core, PluginInfo info) {
>     String dataDir = (String) info.initArgs.get("dir");
>-    
>+
>     String ulogDir = core.getCoreDescriptor().getUlogDir();
>     if (ulogDir != null) {
>       dataDir = ulogDir;
>     }
>-    
>+
>     if (dataDir == null || dataDir.length() == 0) {
>       dataDir = core.getDataDir();
>     }
> 
>     return dataDir + "/" + TLOG_NAME;
>   }
>-  
>+
>   /**
>    * Clears the logs on the file system. Only call before init.
>-   * 
>+   *
>    * @param core the SolrCore
>    * @param ulogPluginInfo the init info for the UpdateHandler
>    */
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>index 8083ad0..bcaf846 100644
>--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>@@ -168,6 +168,29 @@ public class PeerSyncTest extends
>BaseDistributedSearchTestCase {
> 
>     assertSync(client1, numVersions, true, shardsArr[0]);
>client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*",
>"sort","_version_ desc"), client0, client1);
>+
>+    // now lets check fingerprinting causes appropriate fails
>+    v = 4000;
>+    add(client0, seenLeader,
>sdoc("id",Integer.toString((int)v),"_version_",v));
>+    toAdd = numVersions+10;
>+    for (int i=0; i<toAdd; i++) {
>+      add(client0, seenLeader,
>sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>+      add(client1, seenLeader,
>sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>+    }
>+
>+    // client0 now has an additional add beyond our window and the
>fingerprint should cause this to fail
>+    assertSync(client1, numVersions, false, shardsArr[0]);
>+
>+    // lets add the missing document and verify that order doesn't
>matter
>+    add(client1, seenLeader,
>sdoc("id",Integer.toString((int)v),"_version_",v));
>+    assertSync(client1, numVersions, true, shardsArr[0]);
>+
>+    // lets do some overwrites to ensure that repeated updates and
>maxDoc don't matter
>+    for (int i=0; i<10; i++) {
>+      add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i
>+ 1), "_version_", v + i + 1));
>+    }
>+    assertSync(client1, numVersions, true, shardsArr[0]);
>+
>   }
> 
> 

--
Uwe Schindler
H.-H.-Meier-Allee 63, 28213 Bremen
http://www.thetaphi.de