You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2016/03/01 18:07:17 UTC
[34/50] [abbrv] lucene-solr git commit: SOLR-445: have to dedup
remote DBQ errors because it's a broadcast to all leaders.
SOLR-445: have to dedup remote DBQ errors because it's a broadcast to all leaders.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/cffca39c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/cffca39c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/cffca39c
Branch: refs/heads/jira/SOLR-445
Commit: cffca39cb8c84c24065b724d46806bf800834431
Parents: 7fd9d2e
Author: Chris Hostetter <ho...@apache.org>
Authored: Mon Feb 29 16:10:11 2016 -0700
Committer: Chris Hostetter <ho...@apache.org>
Committed: Mon Feb 29 16:10:11 2016 -0700
----------------------------------------------------------------------
.../processor/TolerantUpdateProcessor.java | 59 +++++++++++++++++---
1 file changed, 52 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cffca39c/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
index 68be135..a325420 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
@@ -18,8 +18,10 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Set;
import org.apache.lucene.util.BytesRef;
@@ -94,6 +96,27 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
*/
private final List<KnownErr> knownErrors = new ArrayList<KnownErr>();
+ // Kludge: Because deleteByQuery updates are forwarded to every leader, we can get identical
+ // errors reported by every leader for the same underlying problem.
+ //
+ // It would be nice if we could cleanly handle the unlikely (but possible) situation of an
+ // update stream that includes multiple identical DBQs, with identical failures, and
+ // to report each one once, for example...
+ // add: id#1
+ // dbq: foo:bar
+ // add: id#2
+ // add: id#3
+ // dbq: foo:bar
+ //
+ // ...but i can't figure out a way to accurately identify & return duplicate
+ // KnownErrs from duplicate identical underlying requests w/o erroneously returning identical
+ // KnownErrs for the *same* underlying request but from diff shards.
+ //
+ // So as a kludge, we keep track of them for deduping against identical remote failures
+ //
+ // :nocommit: probably need to use this for "commit" as well?
+ private Set<KnownErr> knownDBQErrors = new HashSet<>();
+
private final FirstErrTracker firstErrTracker = new FirstErrTracker();
private final DistribPhase distribPhase;
@@ -162,9 +185,19 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
// nocommit: do we need isLeader type logic like processAdd ? does processAdd even need it?
- CmdType type = cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ;
- String id = cmd.isDeleteById() ? cmd.id : cmd.query;
- knownErrors.add(new KnownErr(type, id, t.getMessage()));
+ KnownErr err = new KnownErr(cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ,
+ cmd.isDeleteById() ? cmd.id : cmd.query,
+ t.getMessage());
+ knownErrors.add(err);
+
+ // NOTE: we're not using this to dedup before adding to knownErrors.
+ // if we're lucky enough to get an immediate local failure (ie: we're a leader, or some other processor
+ // failed) then recording the multiple failures is a good thing -- helps us with an accurate fail
+ // fast if we exceed maxErrors
+ if (CmdType.DELQ.equals(err.type)) {
+ knownDBQErrors.add(err);
+ }
+
if (knownErrors.size() > maxErrors) {
firstErrTracker.throwFirst();
}
@@ -187,6 +220,7 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
} catch (DistributedUpdateProcessor.DistributedUpdatesAsyncException duae) {
firstErrTracker.caught(duae);
+
// adjust out stats based on the distributed errors
for (Error error : duae.errors) {
// we can't trust the req info from the Error, because multiple original requests might have been
@@ -205,14 +239,25 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
log.warn("remote error has no metadata to aggregate: " + remoteErr.getMessage(), remoteErr);
continue;
}
-
+
for (int i = 0; i < remoteErrMetadata.size(); i++) {
KnownErr err = KnownErr.parseMetadataIfKnownErr(remoteErrMetadata.getName(i),
remoteErrMetadata.getVal(i));
- if (null != err) {
- knownErrors.add(err);
+ if (null == err) {
+ // some metadata unrelated to this update processor
+ continue;
+ }
+
+ if (CmdType.DELQ.equals(err.type)) {
+ if (knownDBQErrors.contains(err)) {
+ // we've already seen this identical error, probably a dup from another shard
+ continue;
+ } else {
+ knownDBQErrors.add(err);
+ }
}
- // else: some metadata unrelated to this update processor
+
+ knownErrors.add(err);
}
}
}