You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2016/03/01 18:07:17 UTC

[34/50] [abbrv] lucene-solr git commit: SOLR-445: have to dedup remote DBQ errors because it's a broadcast to all leaders.

SOLR-445: have to dedup remote DBQ errors because it's a broadcast to all leaders.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/cffca39c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/cffca39c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/cffca39c

Branch: refs/heads/jira/SOLR-445
Commit: cffca39cb8c84c24065b724d46806bf800834431
Parents: 7fd9d2e
Author: Chris Hostetter <ho...@apache.org>
Authored: Mon Feb 29 16:10:11 2016 -0700
Committer: Chris Hostetter <ho...@apache.org>
Committed: Mon Feb 29 16:10:11 2016 -0700

----------------------------------------------------------------------
 .../processor/TolerantUpdateProcessor.java      | 59 +++++++++++++++++---
 1 file changed, 52 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cffca39c/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
index 68be135..a325420 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
@@ -18,8 +18,10 @@ package org.apache.solr.update.processor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 
 
 import org.apache.lucene.util.BytesRef;
@@ -94,6 +96,27 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
    */
   private final List<KnownErr> knownErrors = new ArrayList<KnownErr>();
 
+  // Kludge: Because deleteByQuery updates are forwarded to every leader, we can get identical
+  // errors reported by every leader for the same underlying problem.
+  //
+  // It would be nice if we could cleanly handle the unlikely (but possible) situation of an
+  // update stream that includes multiple identical DBQs, with identical failures, and 
+  // to report each one once, for example...
+  //   add: id#1
+  //   dbq: foo:bar
+  //   add: id#2
+  //   add: id#3
+  //   dbq: foo:bar
+  //
+  // ...but i can't figure out a way to accurately identify & return duplicate 
+  // KnownErrs from duplicate identical underlying requests w/o erroneously returning identical 
+  // KnownErrs for the *same* underlying request but from diff shards.
+  //
+  // So as a kludge, we keep track of them for deduping against identical remote failures
+  //
+  // :nocommit: probably need to use this for "commit" as well?
+  private Set<KnownErr> knownDBQErrors = new HashSet<>();
+        
   private final FirstErrTracker firstErrTracker = new FirstErrTracker();
   private final DistribPhase distribPhase;
 
@@ -162,9 +185,19 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
 
       // nocommit: do we need isLeader type logic like processAdd ? does processAdd even need it?
       
-      CmdType type = cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ;
-      String id = cmd.isDeleteById() ? cmd.id : cmd.query;
-      knownErrors.add(new KnownErr(type, id, t.getMessage()));
+      KnownErr err = new KnownErr(cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ,
+                                  cmd.isDeleteById() ? cmd.id : cmd.query,
+                                  t.getMessage());
+      knownErrors.add(err);
+
+      // NOTE: we're not using this to dedup before adding to knownErrors.
+      // if we're lucky enough to get an immediate local failure (ie: we're a leader, or some other processor
+      // failed) then recording the multiple failures is a good thing -- helps us with an accurate fail
+      // fast if we exceed maxErrors
+      if (CmdType.DELQ.equals(err.type)) {
+        knownDBQErrors.add(err);
+      }
+      
       if (knownErrors.size() > maxErrors) {
         firstErrTracker.throwFirst();
       }
@@ -187,6 +220,7 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
     } catch (DistributedUpdateProcessor.DistributedUpdatesAsyncException duae) {
       firstErrTracker.caught(duae);
 
+      
       // adjust out stats based on the distributed errors
       for (Error error : duae.errors) {
         // we can't trust the req info from the Error, because multiple original requests might have been
@@ -205,14 +239,25 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
           log.warn("remote error has no metadata to aggregate: " + remoteErr.getMessage(), remoteErr);
           continue;
         }
-        
+
         for (int i = 0; i < remoteErrMetadata.size(); i++) {
           KnownErr err = KnownErr.parseMetadataIfKnownErr(remoteErrMetadata.getName(i),
                                                           remoteErrMetadata.getVal(i));
-          if (null != err) {
-            knownErrors.add(err);
+          if (null == err) {
+            // some metadata unrelated to this update processor
+            continue;
+          }
+
+          if (CmdType.DELQ.equals(err.type)) {
+            if (knownDBQErrors.contains(err)) {
+              // we've already seen this identical error, probably a dup from another shard
+              continue;
+            } else {
+              knownDBQErrors.add(err);
+            }
           }
-          // else: some metadata unrelated to this update processor
+          
+          knownErrors.add(err);
         }
       }
     }