You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2016/03/25 19:03:52 UTC

[1/3] lucene-solr:master: SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause failures when sending multiple updates in a single request.

Repository: lucene-solr
Updated Branches:
  refs/heads/master 2babaf8c3 -> f051f56be


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java
new file mode 100644
index 0000000..9bbead8
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.xml.xpath.XPathExpressionException;
+
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.ToleratedUpdateError.CmdType;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.servlet.DirectSolrConnection;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.util.BaseTestHarness;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+public class TolerantUpdateProcessorTest extends UpdateProcessorTestBase {
+  
+  /**
+   * List of valid + invalid documents
+   */
+  private static List<SolrInputDocument> docs = null;
+  /**
+   * IDs of the invalid documents in <code>docs</code>
+   */
+  private static String[] badIds = null;
+  
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    initCore("solrconfig-update-processor-chains.xml", "schema12.xml");
+  }
+  
+  @AfterClass
+  public static void tearDownClass() {
+    docs = null;
+    badIds = null;
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    //expected exception messages
+    ignoreException("Error adding field");
+    ignoreException("Document is missing mandatory uniqueKey field");
+    if (docs == null) {
+      docs = new ArrayList<>(20);
+      badIds = new String[10];
+      for(int i = 0; i < 10;i++) {
+        // a valid document
+        docs.add(doc(field("id", 1f, String.valueOf(2*i)), field("weight", 1f, i)));
+        // ... and an invalid one
+        docs.add(doc(field("id", 1f, String.valueOf(2*i+1)), field("weight", 1f, "b")));
+        badIds[i] = String.valueOf(2*i+1);
+      }
+    }
+    
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    resetExceptionIgnores();
+    assertU(delQ("*:*"));
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='0']");
+    super.tearDown();
+  }
+
+  /**
+   * future proof TolerantUpdateProcessor against new default method impls being added to UpdateProcessor 
+   * to ensure that every method involved in a processor chain life cycle is overridden with 
+   * exception catching/tracking.
+   */
+  public void testReflection() {
+    for (Method method : TolerantUpdateProcessor.class.getMethods()) {
+      if (method.getDeclaringClass().equals(Object.class)) {
+        continue;
+      }
+      assertEquals("base class(es) has changed, TolerantUpdateProcessor needs updated to ensure it " +
+                   "overrides all solr update lifcycle methods with exception tracking: " + method.toString(),
+                   TolerantUpdateProcessor.class, method.getDeclaringClass());
+    }
+  }
+ 
+  
+  @Test
+  public void testValidAdds() throws IOException {
+    SolrInputDocument validDoc = doc(field("id", 1f, "1"), field("text", 1f, "the quick brown fox"));
+    add("tolerant-chain-max-errors-10", null, validDoc);
+    
+    validDoc = doc(field("id", 1f, "2"), field("text", 1f, "the quick brown fox"));
+    add("tolerant-chain-max-errors-not-set", null, validDoc);
+    
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='2']");
+    assertQ(req("q","id:1")
+        ,"//result[@numFound='1']");
+    assertQ(req("q","id:2")
+        ,"//result[@numFound='1']");
+  }
+  
+  @Test
+  public void testInvalidAdds() throws IOException {
+    SolrInputDocument invalidDoc = doc(field("text", 1f, "the quick brown fox")); //no id
+    try {
+      // This doc should fail without being tolerant
+      add("not-tolerant", null, invalidDoc);
+      fail("Expecting exception");
+    } catch (Exception e) {
+      //expected
+      assertTrue(e.getMessage().contains("Document is missing mandatory uniqueKey field"));
+    }
+    assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc}), null, "(unknown)");
+    
+    //a valid doc
+    SolrInputDocument validDoc = doc(field("id", 1f, "1"), field("text", 1f, "the quick brown fox"));
+    
+    try {
+      // This batch should fail without being tolerant
+      add("not-tolerant", null, Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}));
+      fail("Expecting exception");
+    } catch (Exception e) {
+      //expected
+      assertTrue(e.getMessage().contains("Document is missing mandatory uniqueKey field"));
+    }
+    
+    assertU(commit());
+    assertQ(req("q","id:1")
+        ,"//result[@numFound='0']");
+    
+    
+    assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}), null, "(unknown)");
+    assertU(commit());
+    
+    // verify that the good document made it in. 
+    assertQ(req("q","id:1")
+        ,"//result[@numFound='1']");
+    
+    invalidDoc = doc(field("id", 1f, "2"), field("weight", 1f, "aaa"));
+    validDoc = doc(field("id", 1f, "3"), field("weight", 1f, "3"));
+    
+    try {
+      // This batch should fail without being tolerant
+      add("not-tolerant", null, Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc})); //no id
+      fail("Expecting exception");
+    } catch (Exception e) {
+      //expected
+      assertTrue(e.getMessage().contains("Error adding field"));
+    }
+    
+    assertU(commit());
+    assertQ(req("q","id:3")
+        ,"//result[@numFound='0']");
+    
+    assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}), null, "2");
+    assertU(commit());
+    
+    // The valid document was indexed
+    assertQ(req("q","id:3")
+        ,"//result[@numFound='1']");
+    
+    // The invalid document was NOT indexed
+    assertQ(req("q","id:2")
+        ,"//result[@numFound='0']");
+    
+  }
+  
+  @Test
+  public void testMaxErrorsDefault() throws IOException {
+    try {
+      // by default the TolerantUpdateProcessor accepts all errors, so this batch should succeed with 10 errors.
+      assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, null, badIds);
+    } catch(Exception e) {
+      fail("Shouldn't get an exception for this batch: " + e.getMessage());
+    }
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='10']");
+  }
+  
+  public void testMaxErrorsSucceed() throws IOException {
+    ModifiableSolrParams requestParams = new ModifiableSolrParams();
+    requestParams.add("maxErrors", "10");
+    // still OK
+    assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, requestParams, badIds);
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='10']");
+  }
+  
+  @Test
+  public void testMaxErrorsThrowsException() throws IOException {
+    ModifiableSolrParams requestParams = new ModifiableSolrParams();
+    requestParams.add("maxErrors", "5");
+    try {
+      // should fail
+      assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, requestParams, badIds);
+      fail("Expecting exception");
+    } catch (SolrException e) {
+      assertTrue(e.getMessage(),
+                 e.getMessage().contains("ERROR: [doc=1] Error adding field 'weight'='b' msg=For input string: \"b\""));
+    }
+    //the first good documents made it to the index
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='6']");
+  }
+
+  @Test
+  public void testMaxErrorsInfinite() throws IOException {
+    ModifiableSolrParams requestParams = new ModifiableSolrParams();
+    requestParams.add("maxErrors", "-1");
+    try {
+      assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, null, badIds);
+    } catch(Exception e) {
+      fail("Shouldn't get an exception for this batch: " + e.getMessage());
+    }
+    assertU(commit());
+    assertQ(req("q","*:*")
+            ,"//result[@numFound='10']");
+  }
+  
+  @Test
+  public void testMaxErrors0() throws IOException {
+    //make the TolerantUpdateProcessor intolerant
+    List<SolrInputDocument> smallBatch = docs.subList(0, 2);
+    ModifiableSolrParams requestParams = new ModifiableSolrParams();
+    requestParams.add("maxErrors", "0");
+    try {
+      // should fail
+      assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", smallBatch, requestParams, "1");
+      fail("Expecting exception");
+    } catch (SolrException e) {
+      assertTrue(e.getMessage().contains("ERROR: [doc=1] Error adding field 'weight'='b' msg=For input string: \"b\""));
+    }
+    //the first good documents made it to the index
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='1']");
+  }
+  
+  @Test
+  public void testInvalidDelete() throws XPathExpressionException, SAXException {
+    ignoreException("undefined field invalidfield");
+    String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox"));
+    assertNull(BaseTestHarness.validateXPath(response,
+                                             "//int[@name='status']=0",
+                                             "//arr[@name='errors']",
+                                             "count(//arr[@name='errors']/lst)=0"));
+    
+    response = update("tolerant-chain-max-errors-10", delQ("invalidfield:1"));
+    assertNull(BaseTestHarness.validateXPath
+               (response,
+                "//int[@name='status']=0",
+                "count(//arr[@name='errors']/lst)=1",
+                "//arr[@name='errors']/lst/str[@name='type']/text()='DELQ'",
+                "//arr[@name='errors']/lst/str[@name='id']/text()='invalidfield:1'",
+                "//arr[@name='errors']/lst/str[@name='message']/text()='undefined field invalidfield'"));
+  }
+  
+  @Test
+  public void testValidDelete() throws XPathExpressionException, SAXException {
+    ignoreException("undefined field invalidfield");
+    String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox"));
+    assertNull(BaseTestHarness.validateXPath(response,
+                                             "//int[@name='status']=0",
+                                             "//arr[@name='errors']",
+                                             "count(//arr[@name='errors']/lst)=0"));
+
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='1']");
+    
+    response = update("tolerant-chain-max-errors-10", delQ("id:1"));
+    assertNull(BaseTestHarness.validateXPath(response,
+                                             "//int[@name='status']=0",
+                                             "//arr[@name='errors']",
+                                             "count(//arr[@name='errors']/lst)=0"));
+    assertU(commit());
+    assertQ(req("q","*:*")
+        ,"//result[@numFound='0']");
+  }
+  
+  @Test
+  public void testResponse() throws SAXException, XPathExpressionException, IOException {
+    String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox"));
+    assertNull(BaseTestHarness.validateXPath(response,
+                                             "//int[@name='status']=0",
+                                             "//arr[@name='errors']",
+                                             "count(//arr[@name='errors']/lst)=0"));
+    response = update("tolerant-chain-max-errors-10", adoc("text", "the quick brown fox"));
+    assertNull(BaseTestHarness.validateXPath(response, "//int[@name='status']=0",
+        "//int[@name='maxErrors']/text()='10'",
+        "count(//arr[@name='errors']/lst)=1",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='(unknown)'",
+        "//arr[@name='errors']/lst/str[@name='message']/text()='Document is missing mandatory uniqueKey field: id'"));
+    
+    response = update("tolerant-chain-max-errors-10", adoc("text", "the quick brown fox"));
+    StringWriter builder = new StringWriter();
+    builder.append("<add>");
+    for (SolrInputDocument doc:docs) {
+      ClientUtils.writeXML(doc, builder);
+    }
+    builder.append("</add>");
+    response = update("tolerant-chain-max-errors-10", builder.toString());
+    assertNull(BaseTestHarness.validateXPath(response, "//int[@name='status']=0",
+        "//int[@name='maxErrors']/text()='10'",
+        "count(//arr[@name='errors']/lst)=10",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='0')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='1'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='2')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='3'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='4')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='5'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='6')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='7'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='8')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='9'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='10')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='11'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='12')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='13'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='14')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='15'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='16')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='17'",
+        "not(//arr[@name='errors']/lst/str[@name='id']/text()='18')",
+        "//arr[@name='errors']/lst/str[@name='id']/text()='19'"));
+
+    // spot check response when effective maxErrors is unlimited
+    response = update("tolerant-chain-max-errors-not-set", builder.toString());
+    assertNull(BaseTestHarness.validateXPath(response, "//int[@name='maxErrors']/text()='-1'"));
+                                             
+  }
+
+
+  
+  public String update(String chain, String xml) {
+    DirectSolrConnection connection = new DirectSolrConnection(h.getCore());
+    SolrRequestHandler handler = h.getCore().getRequestHandler("/update");
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.add("update.chain", chain);
+    try {
+      return connection.request(handler, params, xml);
+    } catch (SolrException e) {
+      throw (SolrException)e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+    }
+  }
+  
+  private void assertAddsSucceedWithErrors(String chain,
+                                           final Collection<SolrInputDocument> docs,
+                                           SolrParams requestParams, 
+                                           String... idsShouldFail) throws IOException {
+
+    SolrQueryResponse response = add(chain, requestParams, docs);
+    
+    @SuppressWarnings("unchecked")
+    List<SimpleOrderedMap<String>> errors = (List<SimpleOrderedMap<String>>)
+      response.getResponseHeader().get("errors");
+    assertNotNull(errors);
+
+    assertEquals("number of errors", idsShouldFail.length, errors.size());
+    
+    Set<String> addErrorIdsExpected = new HashSet<String>(Arrays.asList(idsShouldFail));
+
+    for (SimpleOrderedMap<String> err : errors) {
+      assertEquals("this method only expects 'add' errors", "ADD", err.get("type"));
+      
+      String id = err.get("id");
+      assertNotNull("null err id", id);
+      assertTrue("unexpected id", addErrorIdsExpected.contains(id));
+
+    }
+  }
+  
+  protected SolrQueryResponse add(final String chain, SolrParams requestParams, final SolrInputDocument doc) throws IOException {
+    return add(chain, requestParams, Arrays.asList(new SolrInputDocument[]{doc}));
+  }
+  
+  protected SolrQueryResponse add(final String chain, SolrParams requestParams, final Collection<SolrInputDocument> docs) throws IOException {
+    
+    SolrCore core = h.getCore();
+    UpdateRequestProcessorChain pc = core.getUpdateProcessingChain(chain);
+    assertNotNull("No Chain named: " + chain, pc);
+    
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    rsp.add("responseHeader", new SimpleOrderedMap<Object>());
+    
+    if(requestParams == null) {
+      requestParams = new ModifiableSolrParams();
+    }
+    
+    SolrQueryRequest req = new LocalSolrQueryRequest(core, requestParams);
+    try {
+      UpdateRequestProcessor processor = pc.createProcessor(req, rsp);
+      for(SolrInputDocument doc:docs) {
+        AddUpdateCommand cmd = new AddUpdateCommand(req);
+        cmd.solrDoc = doc;
+        processor.processAdd(cmd);
+      }
+      processor.finish();
+      
+    } finally {
+      req.close();
+    }
+    return rsp;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 59b37c5..edfe1c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -54,6 +54,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.ToleratedUpdateError;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -72,6 +73,7 @@ import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.KeeperException;
@@ -726,6 +728,11 @@ public class CloudSolrClient extends SolrClient {
     int status = 0;
     Integer rf = null;
     Integer minRf = null;
+    
+    // TolerantUpdateProcessor
+    List<SimpleOrderedMap<String>> toleratedErrors = null; 
+    int maxToleratedErrors = Integer.MAX_VALUE;
+      
     for(int i=0; i<response.size(); i++) {
       NamedList shardResponse = (NamedList)response.getVal(i);
       NamedList header = (NamedList)shardResponse.get("responseHeader");      
@@ -741,6 +748,24 @@ public class CloudSolrClient extends SolrClient {
           rf = routeRf;
       }
       minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
+
+      List<SimpleOrderedMap<String>> shardTolerantErrors = 
+        (List<SimpleOrderedMap<String>>) header.get("errors");
+      if (null != shardTolerantErrors) {
+        Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
+        assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
+        // if we get into some weird state where the nodes disagree about the effective maxErrors,
+        // assume the min value seen to decide if we should fail.
+        maxToleratedErrors = Math.min(maxToleratedErrors,
+                                      ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+        
+        if (null == toleratedErrors) {
+          toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
+        }
+        for (SimpleOrderedMap<String> err : shardTolerantErrors) {
+          toleratedErrors.add(err);
+        }
+      }
     }
 
     NamedList cheader = new NamedList();
@@ -750,7 +775,31 @@ public class CloudSolrClient extends SolrClient {
       cheader.add(UpdateRequest.REPFACT, rf);
     if (minRf != null)
       cheader.add(UpdateRequest.MIN_REPFACT, minRf);
-    
+    if (null != toleratedErrors) {
+      cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
+      cheader.add("errors", toleratedErrors);
+      if (maxToleratedErrors < toleratedErrors.size()) {
+        // cumulative errors are too high, we need to throw a client exception w/correct metadata
+
+        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
+        // then at least one shard should have thrown a real error before this, so we don't worry
+        // about having a more "singular" exception msg for that situation
+        StringBuilder msgBuf =  new StringBuilder()
+          .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
+          
+        NamedList metadata = new NamedList<String>();
+        for (SimpleOrderedMap<String> err : toleratedErrors) {
+          ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
+          metadata.add(te.getMetadataKey(), te.getMetadataValue());
+          
+          msgBuf.append("\n").append(te.getMessage());
+        }
+        
+        SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString());
+        toThrow.setMetadata(metadata);
+        throw toThrow;
+      }
+    }
     condensed.add("responseHeader", cheader);
     return condensed;
   }
@@ -786,6 +835,22 @@ public class CloudSolrClient extends SolrClient {
       super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
       this.throwables = throwables;
       this.routes = routes;
+
+      // create a merged copy of the metadata from all wrapped exceptions
+      NamedList<String> metadata = new NamedList<String>();
+      for (int i = 0; i < throwables.size(); i++) {
+        Throwable t = throwables.getVal(i);
+        if (t instanceof SolrException) {
+          SolrException e = (SolrException) t;
+          NamedList<String> eMeta = e.getMetadata();
+          if (null != eMeta) {
+            metadata.addAll(eMeta);
+          }
+        }
+      }
+      if (0 < metadata.size()) {
+        this.setMetadata(metadata);
+      }
     }
 
     public NamedList<Throwable> getThrowables() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
new file mode 100644
index 0000000..fd8b8c7
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+
+/**
+ * Models the basic information related to a single "tolerated" error that occured during updates.  
+ * This class is only useful when the <code>ToleranteUpdateProcessorFactory</code> is used in an update 
+ * processor chain
+ */
+public final class ToleratedUpdateError {
+    
+  private final static String META_PRE =  ToleratedUpdateError.class.getName() + "--";
+  private final static int META_PRE_LEN = META_PRE.length();
+
+  /**
+   * Given a 'maxErrors' value such that<code>-1 &lt;= maxErrors &lt;= {@link Integer#MAX_VALUE}</code> 
+   * this method returns the original input unless it is <code>-1</code> in which case the effective value of
+   * {@link Integer#MAX_VALUE}  is returned.
+   * Input of <code>maxErrors &lt; -1</code> will trip an assertion and otherwise have undefined behavior.
+   * @see #getUserFriendlyMaxErrors
+   */
+  public static int getEffectiveMaxErrors(int maxErrors) {
+    assert -1 <= maxErrors;
+    return -1 == maxErrors ? Integer.MAX_VALUE : maxErrors;
+  }
+  
+  /**
+   * Given a 'maxErrors' value such that<code>-1 &lt;= maxErrors &lt;= {@link Integer#MAX_VALUE}</code> 
+   * this method returns the original input unless it is {@link Integer#MAX_VALUE} in which case 
+   * <code>-1</code> is returned for user convinience.
+   * Input of <code>maxErrors &lt; -1</code> will trip an assertion and otherwise have undefined behavior.
+   * @see #getEffectiveMaxErrors
+   */
+  public static int getUserFriendlyMaxErrors(int maxErrors) {
+    assert -1 <= maxErrors;
+    return Integer.MAX_VALUE == maxErrors ? -1 : maxErrors;
+  }
+  
+  /** 
+   * returns a list of maps of simple objects suitable for putting in a SolrQueryResponse header 
+   * @see #getSimpleMap
+   * @see #parseMap
+   */
+  public static List<SimpleOrderedMap<String>> formatForResponseHeader(List<ToleratedUpdateError> errs) {
+    List<SimpleOrderedMap<String>> result = new ArrayList<>(errs.size());
+    for (ToleratedUpdateError e : errs) {
+      result.add(e.getSimpleMap());
+    }
+    return result;
+  }
+  
+  /** 
+   * returns a ToleratedUpdateError instance from the data in this Map 
+   * @see #getSimpleMap
+   */
+  public static ToleratedUpdateError parseMap(SimpleOrderedMap<String> data) {
+    final String id = data.get("id");
+    final String message = data.get("message");
+    final String t = data.get("type");
+    if (null == t || null == id || null == message) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Map does not represent a ToleratedUpdateError, must contain 'type', 'id', and 'message'");
+    }
+    try {
+      return new ToleratedUpdateError(CmdType.valueOf(t), id, message);
+    } catch (IllegalArgumentException iae) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid type for ToleratedUpdateError: " + t, iae);
+    }
+  }
+  
+  /** 
+   * returns a ToleratedUpdateError instance if this metadataKey is one we care about, else null 
+   * @see #getMetadataKey
+   * @see #getMetadataValue
+   */
+  public static ToleratedUpdateError parseMetadataIfToleratedUpdateError(String metadataKey,
+                                                                         String metadataVal) {
+    if (! metadataKey.startsWith(META_PRE)) {
+      return null; // not a key we care about
+    }
+    final int typeEnd = metadataKey.indexOf(':', META_PRE_LEN);
+    if (typeEnd < 0) {
+      return null; // has our prefix, but not our format -- must not be a key we (actually) care about
+    }
+    return new ToleratedUpdateError(CmdType.valueOf(metadataKey.substring(META_PRE_LEN, typeEnd)),
+                                    metadataKey.substring(typeEnd+1), metadataVal);
+  }
+
+  private final CmdType type;
+  private final String id; 
+  private final String message;
+  
+  public ToleratedUpdateError(CmdType type, String id, String message) {
+    assert null != type;
+    this.type = type;
+    
+    assert null != id;
+    this.id = id;
+    
+    assert null != message;
+    this.message = message;
+  }
+
+  public CmdType getType() {
+    return type;
+  }
+  public String getId() {
+    return id;
+  }
+  public String getMessage() {
+    return message;
+  }
+  
+  /**
+   * returns a string suitable for use as a key in {@link SolrException#setMetadata}
+   *
+   * @see #parseMetadataIfToleratedUpdateError
+   */
+  public String getMetadataKey() {
+    return META_PRE + type + ":" + id;
+  }
+  
+  /**
+   * returns a string suitable for use as a value in {@link SolrException#setMetadata}
+   *
+   * @see #parseMetadataIfToleratedUpdateError
+   */
+  public String getMetadataValue() {
+    return message.toString();
+  }
+  
+  /** 
+   * returns a map of simple objects suitable for putting in a SolrQueryResponse header 
+   * @see #formatForResponseHeader
+   * @see #parseMap
+   */
+  public SimpleOrderedMap<String> getSimpleMap() {
+    SimpleOrderedMap<String> entry = new SimpleOrderedMap<String>();
+    entry.add("type", type.toString());
+    entry.add("id", id);
+    entry.add("message", message);
+    return entry;
+  }
+  
+  public String toString() {
+    return getMetadataKey() + "=>" + getMetadataValue();
+  }
+  
+  public int hashCode() {
+    int h = this.getClass().hashCode();
+    h = h * 31 + type.hashCode();
+    h = h * 31 + id.hashCode();
+    h = h * 31 + message.hashCode();
+    return h;
+  }
+  
+  public boolean equals(Object o) {
+    if (o instanceof ToleratedUpdateError) {
+      ToleratedUpdateError that = (ToleratedUpdateError)o;
+      return that.type.equals(this.type)
+        && that.id.equals(this.id)
+        && that.message.equals(this.message);
+    }
+    return false;
+  }
+  
+  /**
+   * Helper class for dealing with SolrException metadata (String) keys 
+   */
+  public static enum CmdType {
+    ADD, DELID, DELQ; 
+
+    // if we add support for things like commit, parsing/toString/hashCode logic
+    // needs to be smarter to account for 'id' being null ... "usesId" should be a prop of enum instances
+  }
+}
+
+  

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java b/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java
new file mode 100644
index 0000000..aba07ae
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common;
+
+import java.util.EnumSet;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.ToleratedUpdateError.CmdType;
+import org.apache.solr.common.util.SimpleOrderedMap;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+/** Basic testing of the serialization/encapsulation code in ToleratedUpdateError */
+public class TestToleratedUpdateError extends LuceneTestCase {
+  
+  private final static CmdType[] ALL_TYPES = EnumSet.allOf(CmdType.class).toArray(new CmdType[0]);
+  
+  public void testBasics() {
+    
+    assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals
+                (new ToleratedUpdateError(CmdType.ADD, "doc2", "some error")));
+    assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals
+                (new ToleratedUpdateError(CmdType.ADD, "doc1", "some errorxx")));
+    assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals
+                (new ToleratedUpdateError(CmdType.DELID, "doc1", "some error")));
+  }
+
+  public void testParseMetadataErrorHandling() {
+
+    assertNull(ToleratedUpdateError.parseMetadataIfToleratedUpdateError("some other key", "some value"));
+
+    // see if someone tries to trick us into having an NPE...
+    ToleratedUpdateError valid = new ToleratedUpdateError(CmdType.ADD, "doc2", "some error");
+    String badKey = valid.getMetadataKey().replace(":", "X");
+    assertNull(ToleratedUpdateError.parseMetadataIfToleratedUpdateError(badKey, valid.getMetadataValue()));
+  }
+  
+  public void testParseMapErrorChecking() {
+    SimpleOrderedMap<String> bogus = new SimpleOrderedMap<String>();
+    try {
+      ToleratedUpdateError.parseMap(bogus);
+      fail("map should not be parsable");
+    } catch (SolrException e) {
+      assertTrue(e.toString(), e.getMessage().contains("Map does not represent a ToleratedUpdateError") );
+    }
+
+    bogus.add("id", "some id");
+    bogus.add("message", "some message");
+    try {
+      ToleratedUpdateError.parseMap(bogus);
+      fail("map should still not be parsable");
+    } catch (SolrException e) {
+      assertTrue(e.toString(), e.getMessage().contains("Map does not represent a ToleratedUpdateError") );
+    }
+    
+    bogus.add("type", "not a real type");
+    try {
+      ToleratedUpdateError.parseMap(bogus);
+      fail("invalid type should not be parsable");
+    } catch (SolrException e) {
+      assertTrue(e.toString(), e.getMessage().contains("Invalid type")); 
+    }
+  }
+  
+  public void testParseMap() {
+    // trivial
+    SimpleOrderedMap valid = new SimpleOrderedMap<String>();
+    valid.add("type", CmdType.ADD.toString());
+    valid.add("id", "some id");
+    valid.add("message", "some message");
+    
+    ToleratedUpdateError in = ToleratedUpdateError.parseMap(valid);
+    compare(in, MAP_COPPIER);
+    compare(in, METADATA_COPPIER);
+
+    // randomized
+    int numIters = atLeast(5000);
+    for (int i = 0; i < numIters; i++) {
+      valid = new SimpleOrderedMap<String>();
+      valid.add("type", ALL_TYPES[TestUtil.nextInt(random(), 0, ALL_TYPES.length-1)].toString());
+      valid.add("id", TestUtil.randomUnicodeString(random()));
+      valid.add("message", TestUtil.randomUnicodeString(random()));
+      
+      in = ToleratedUpdateError.parseMap(valid);
+      compare(in, MAP_COPPIER);
+      compare(in, METADATA_COPPIER);
+    }
+  }
+  
+  public void checkRoundTripComparisons(Coppier coppier) {
+
+    // some simple basics
+    for (ToleratedUpdateError in : new ToleratedUpdateError[] {
+        new ToleratedUpdateError(CmdType.ADD, "doc1", "some error"),
+        new ToleratedUpdateError(CmdType.DELID, "doc1", "some diff error"),
+        new ToleratedUpdateError(CmdType.DELQ, "-field:yakko other_field:wakko", "some other error"),
+      }) {
+      
+      compare(in, coppier);
+    }
+
+    // randomized testing of non trivial keys/values
+    int numIters = atLeast(5000);
+    for (int i = 0; i < numIters; i++) {
+      ToleratedUpdateError in = new ToleratedUpdateError
+        (ALL_TYPES[TestUtil.nextInt(random(), 0, ALL_TYPES.length-1)],
+         TestUtil.randomUnicodeString(random()),
+         TestUtil.randomUnicodeString(random()));
+      compare(in, coppier);
+    }
+  }
+
+  public void testMetadataRoundTripComparisons(Coppier coppier) {
+    checkRoundTripComparisons(METADATA_COPPIER);
+  }
+  
+  public void testMapRoundTripComparisons() {
+    checkRoundTripComparisons(MAP_COPPIER);
+  }
+
+  /** trivial sanity check */
+  public void testMaxErrorsValueConversion() {
+    
+    assertEquals(-1, ToleratedUpdateError.getUserFriendlyMaxErrors(-1));
+    assertEquals(-1, ToleratedUpdateError.getUserFriendlyMaxErrors(Integer.MAX_VALUE));
+    
+    assertEquals(Integer.MAX_VALUE, ToleratedUpdateError.getEffectiveMaxErrors(Integer.MAX_VALUE));
+    assertEquals(Integer.MAX_VALUE, ToleratedUpdateError.getEffectiveMaxErrors(-1));
+
+    for (int val : new int[] {0, 1, 10, 42, 600000 }) {
+      assertEquals(val, ToleratedUpdateError.getEffectiveMaxErrors(val));
+      assertEquals(val, ToleratedUpdateError.getUserFriendlyMaxErrors(val));
+    }
+    
+  }
+
+  public void compare(ToleratedUpdateError in, Coppier coppier) {
+      ToleratedUpdateError out = coppier.copy(in);
+      assertNotNull(out);
+      compare(in, out);
+  }
+  
+  public void compare(ToleratedUpdateError in, ToleratedUpdateError out) {
+    assertEquals(out.getType(), in.getType());
+    assertEquals(out.getId(), in.getId());
+    assertEquals(out.getMessage(), in.getMessage());
+    
+    assertEquals(out.hashCode(), in.hashCode());
+    assertEquals(out.toString(), in.toString());
+    
+    assertEquals(in.getMetadataKey(), out.getMetadataKey());
+    assertEquals(in.getMetadataValue(), out.getMetadataValue());
+    
+    assertEquals(out, in);
+    assertEquals(in, out);
+  }
+  
+  private static abstract class Coppier {
+    public abstract ToleratedUpdateError copy(ToleratedUpdateError in);
+  }
+
+  private static final Coppier MAP_COPPIER = new Coppier() {
+    public ToleratedUpdateError copy(ToleratedUpdateError in) {
+      return ToleratedUpdateError.parseMap(in.getSimpleMap());
+    }
+  };
+  
+  private static final Coppier METADATA_COPPIER = new Coppier() {
+    public ToleratedUpdateError copy(ToleratedUpdateError in) {
+      return ToleratedUpdateError.parseMetadataIfToleratedUpdateError
+        (in.getMetadataKey(), in.getMetadataValue());
+    }
+  };
+  
+}
+
+
+
+


[3/3] lucene-solr:master: SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause failures when sending multiple updates in a single request.

Posted by ho...@apache.org.
SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause failures when sending multiple updates in a single request.

SOLR-8890: New static method in DistributedUpdateProcessorFactory to allow UpdateProcessorFactories to indicate request params that should be forwarded when DUP distributes updates.

This commit is a squashed merge from the jira/SOLR-445 branch (as of b08c284b26b1779d03693a45e219db89839461d0)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f051f56b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f051f56b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f051f56b

Branch: refs/heads/master
Commit: f051f56be96b12f1f3e35978ca4c840ae06a801f
Parents: 2babaf8
Author: Chris Hostetter <ho...@apache.org>
Authored: Fri Mar 25 11:00:29 2016 -0700
Committer: Chris Hostetter <ho...@apache.org>
Committed: Fri Mar 25 11:02:55 2016 -0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |    7 +
 .../apache/solr/response/SolrQueryResponse.java |    4 +
 .../apache/solr/update/SolrCmdDistributor.java  |   31 +-
 .../processor/DistributedUpdateProcessor.java   |  135 ++-
 .../DistributedUpdateProcessorFactory.java      |   20 +
 .../processor/TolerantUpdateProcessor.java      |  415 +++++++
 .../TolerantUpdateProcessorFactory.java         |  142 +++
 ...lrconfig-distrib-update-processor-chains.xml |   85 ++
 .../conf/solrconfig-tolerant-update-minimal.xml |   40 +
 .../conf/solrconfig-update-processor-chains.xml |   17 +
 .../cloud/TestTolerantUpdateProcessorCloud.java | 1065 ++++++++++++++++++
 .../TestTolerantUpdateProcessorRandomCloud.java |  389 +++++++
 .../org/apache/solr/core/TestBadConfig.java     |    5 +
 .../processor/TolerantUpdateProcessorTest.java  |  447 ++++++++
 .../solr/client/solrj/impl/CloudSolrClient.java |   67 +-
 .../solr/common/ToleratedUpdateError.java       |  197 ++++
 .../solr/common/TestToleratedUpdateError.java   |  193 ++++
 17 files changed, 3214 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 52e7c31..11e5b54 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -52,6 +52,13 @@ New Features
 * SOLR-8859: AbstractSpatialFieldType will now convert Shapes to/from Strings
   using the SpatialContext.  (ryan)
 
+* SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause
+  failures when sending multiple updates in a single request.
+  (Erick Erickson, Tomás Fernández Löbbe, Anshum Gupta, hossman)
+
+* SOLR-8890: New static method in DistributedUpdateProcessorFactory to allow UpdateProcessorFactories
+  to indicate request params that should be forwarded when DUP distributes updates. (hossman)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java b/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java
index f1ccd08..378dee8 100644
--- a/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java
+++ b/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java
@@ -161,6 +161,10 @@ public class SolrQueryResponse {
 
   /**
    * Causes an error to be returned instead of the results.
+   * 
+   * In general, new calls to this method should not be added. In most cases
+   * you should simply throw an exception and let it bubble out to 
+   * RequestHandlerBase, which will set the exception thrown.
    */
   public void setException(Exception e) {
     err=e;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index d9b6478..a99952d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
@@ -138,7 +139,7 @@ public class SolrCmdDistributor {
             
             SolrException.log(SolrCmdDistributor.log, "forwarding update to "
                 + oldNodeUrl + " failed - retrying ... retries: "
-                + err.req.retries + " " + err.req.cmdString + " params:"
+                + err.req.retries + " " + err.req.cmd.toString() + " params:"
                 + err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
             try {
               Thread.sleep(retryPause);
@@ -187,7 +188,7 @@ public class SolrCmdDistributor {
         uReq.deleteByQuery(cmd.query);
       }
       
-      submit(new Req(cmd.toString(), node, uReq, sync), false);
+      submit(new Req(cmd, node, uReq, sync), false);
     }
   }
   
@@ -200,14 +201,13 @@ public class SolrCmdDistributor {
   }
   
   public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {  
-    String cmdStr = cmd.toString();
     for (Node node : nodes) {
       UpdateRequest uReq = new UpdateRequest();
       if (cmd.isLastDocInBatch)
         uReq.lastDocInBatch();
       uReq.setParams(params);
       uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-      submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
+      submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
     }
     
   }
@@ -226,7 +226,7 @@ public class SolrCmdDistributor {
     log.debug("Distrib commit to: {} params: {}", nodes, params);
     
     for (Node node : nodes) {
-      submit(new Req(cmd.toString(), node, uReq, false), true);
+      submit(new Req(cmd, node, uReq, false), true);
     }
     
   }
@@ -272,7 +272,7 @@ public class SolrCmdDistributor {
     if (log.isDebugEnabled()) {
       log.debug("sending update to "
           + req.node.getUrl() + " retry:"
-          + req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
+          + req.retries + " " + req.cmd + " params:" + req.uReq.getParams());
     }
     
     if (isCommit) {
@@ -314,26 +314,26 @@ public class SolrCmdDistributor {
     public UpdateRequest uReq;
     public int retries;
     public boolean synchronous;
-    public String cmdString;
+    public UpdateCommand cmd;
     public RequestReplicationTracker rfTracker;
     public int pollQueueTime;
 
-    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
-      this(cmdString, node, uReq, synchronous, null, 0);
+    public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) {
+      this(cmd, node, uReq, synchronous, null, 0);
     }
     
-    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
+    public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
       this.node = node;
       this.uReq = uReq;
       this.synchronous = synchronous;
-      this.cmdString = cmdString;
+      this.cmd = cmd;
       this.rfTracker = rfTracker;
       this.pollQueueTime = pollQueueTime;
     }
     
     public String toString() {
       StringBuilder sb = new StringBuilder();
-      sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(cmdString));
+      sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString());
       sb.append("; node=").append(String.valueOf(node));
       return sb.toString();
     }
@@ -382,6 +382,13 @@ public class SolrCmdDistributor {
   public static class Error {
     public Exception e;
     public int statusCode = -1;
+
+    /**
+     * NOTE: This is the request that happened to be executed when this error was <b>triggered</b> the error, 
+     * but because of how {@link StreamingSolrClients} uses {@link ConcurrentUpdateSolrClient} it might not 
+     * actaully be the request that <b>caused</b> the error -- multiple requests are merged &amp; processed as 
+     * a sequential batch.
+     */
     public Req req;
     
     public String toString() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 365de6c..5f4e4f1 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -92,7 +92,8 @@ import org.slf4j.LoggerFactory;
 // NOT mt-safe... create a new processor for each add thread
 // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
 public class DistributedUpdateProcessor extends UpdateRequestProcessor {
-  
+
+  final static String PARAM_WHITELIST_CTX_KEY = DistributedUpdateProcessor.class + "PARAM_WHITELIST_CTX_KEY";
   public static final String DISTRIB_FROM_SHARD = "distrib.from.shard";
   public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection";
   public static final String DISTRIB_FROM_PARENT = "distrib.from.parent";
@@ -292,6 +293,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     this.req = req;
     
+    // this should always be used - see filterParams
+    DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
+      (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS);
+    
     CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
     
     this.zkEnabled  = coreDesc.getCoreContainer().isZooKeeperAware();
@@ -790,38 +795,30 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     cmdDistrib.finish();    
     List<Error> errors = cmdDistrib.getErrors();
     // TODO - we may need to tell about more than one error...
-    
-    // if it's a forward, any fail is a problem - 
-    // otherwise we assume things are fine if we got it locally
-    // until we start allowing min replication param
-    if (errors.size() > 0) {
-      // if one node is a RetryNode, this was a forward request
-      if (errors.get(0).req.node instanceof RetryNode) {
-        rsp.setException(errors.get(0).e);
-      } else {
-        if (log.isWarnEnabled()) {
-          for (Error error : errors) {
-            log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
-          }
-        }
-      }
-      // else
-      // for now we don't error - we assume if it was added locally, we
-      // succeeded 
-    }
-   
-    
-    // if it is not a forward request, for each fail, try to tell them to
-    // recover - the doc was already added locally, so it should have been
-    // legit
 
+    List<Error> errorsForClient = new ArrayList<>(errors.size());
+    
     for (final SolrCmdDistributor.Error error : errors) {
       
       if (error.req.node instanceof RetryNode) {
-        // we don't try to force a leader to recover
-        // when we cannot forward to it
+        // if it's a forward, any fail is a problem - 
+        // otherwise we assume things are fine if we got it locally
+        // until we start allowing min replication param
+        errorsForClient.add(error);
         continue;
       }
+
+      // else...
+      
+      // for now we don't error - we assume if it was added locally, we
+      // succeeded 
+      if (log.isWarnEnabled()) {
+        log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
+      }
+      
+      // Since it is not a forward request, for each fail, try to tell them to
+      // recover - the doc was already added locally, so it should have been
+      // legit
        
       DistribPhase phase =
           DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));       
@@ -841,8 +838,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         // let's just fail this request and let the client retry? or just call processAdd again?
         log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
             " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
-        rsp.setException(error.e);
-        break;
+        errorsForClient.add(error);
+        continue;
       }
 
       String collection = null;
@@ -927,7 +924,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
       rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);
       replicationTracker = null;
-    }    
+    }
+
+    
+    if (0 < errorsForClient.size()) {
+      throw new DistributedUpdatesAsyncException(errorsForClient);
+    }
   }
 
  
@@ -1210,10 +1212,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
   }
 
+  /** @see DistributedUpdateProcessorFactory#addParamToDistributedRequestWhitelist */
   protected ModifiableSolrParams filterParams(SolrParams params) {
     ModifiableSolrParams fparams = new ModifiableSolrParams();
-    passParam(params, fparams, UpdateParams.UPDATE_CHAIN);
-    passParam(params, fparams, TEST_DISTRIB_SKIP_SERVERS);
+    
+    Set<String> whitelist = (Set<String>) this.req.getContext().get(PARAM_WHITELIST_CTX_KEY);
+    assert null != whitelist : "whitelist can't be null, constructor adds to it";
+
+    for (String p : whitelist) {
+      passParam(params, fparams, p);
+    }
     return fparams;
   }
 
@@ -1698,4 +1706,67 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // definitely not the leader.  Otherwise assume we are.
     return DistribPhase.FROMLEADER != phase;
   }
+
+  public static final class DistributedUpdatesAsyncException extends SolrException {
+    public final List<Error> errors;
+    public DistributedUpdatesAsyncException(List<Error> errors) {
+      super(buildCode(errors), buildMsg(errors), null);
+      this.errors = errors;
+
+      // create a merged copy of the metadata from all wrapped exceptions
+      NamedList<String> metadata = new NamedList<String>();
+      for (Error error : errors) {
+        if (error.e instanceof SolrException) {
+          SolrException e = (SolrException) error.e;
+          NamedList<String> eMeta = e.getMetadata();
+          if (null != eMeta) {
+            metadata.addAll(eMeta);
+          }
+        }
+      }
+      if (0 < metadata.size()) {
+        this.setMetadata(metadata);
+      }
+    }
+
+    /** Helper method for constructor */
+    private static final int buildCode(List<Error> errors) {
+      assert null != errors;
+      assert 0 < errors.size();
+
+      int minCode = Integer.MAX_VALUE;
+      int maxCode = Integer.MIN_VALUE;
+      for (Error error : errors) {
+        log.trace("REMOTE ERROR: {}", error);
+        minCode = Math.min(error.statusCode, minCode);
+        maxCode = Math.max(error.statusCode, maxCode);
+      }
+      if (minCode == maxCode) {
+        // all codes are consistent, use that...
+        return minCode;
+      } else if (400 <= minCode && maxCode < 500) {
+        // all codes are 4xx, use 400
+        return ErrorCode.BAD_REQUEST.code;
+      } 
+      // ...otherwise use sensible default
+      return ErrorCode.SERVER_ERROR.code;
+    }
+    
+    /** Helper method for constructor */
+    private static final String buildMsg(List<Error> errors) {
+      assert null != errors;
+      assert 0 < errors.size();
+      
+      if (1 == errors.size()) {
+        return "Async exception during distributed update: " + errors.get(0).e.getMessage();
+      } else {
+        StringBuilder buf = new StringBuilder(errors.size() + " Async exceptions during distributed update: ");
+        for (Error error : errors) {
+          buf.append("\n");
+          buf.append(error.e.getMessage());
+        }
+        return buf.toString();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 4b64dec..6446b1a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -16,6 +16,9 @@
  */
 package org.apache.solr.update.processor;
 
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
@@ -28,6 +31,23 @@ import org.apache.solr.response.SolrQueryResponse;
 public class DistributedUpdateProcessorFactory 
   extends UpdateRequestProcessorFactory 
   implements DistributingUpdateProcessorFactory {
+
+  /**
+   * By default, the {@link DistributedUpdateProcessor} is extremely conservative in the list of request 
+   * params that will be copied/included when updates are forwarded to other nodes.  This method may be 
+   * used by any {@link UpdateRequestProcessorFactory#getInstance} call to annotate a 
+   * SolrQueryRequest with the names of parameters that should also be forwarded.
+   */
+  public static void addParamToDistributedRequestWhitelist(final SolrQueryRequest req, final String... paramNames) {
+    Set<String> whitelist = (Set<String>) req.getContext().get(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY);
+    if (null == whitelist) {
+      whitelist = new TreeSet<String>();
+      req.getContext().put(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, whitelist);
+    }
+    for (String p : paramNames) {
+      whitelist.add(p);
+    }
+  }
   
   @Override
   public void init(NamedList args) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
new file mode 100644
index 0000000..f9437f5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRefBuilder;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.ToleratedUpdateError.CmdType;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.MergeIndexesCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor.Error;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** 
+ * <p> 
+ * Suppresses errors for individual add/delete commands within a single request.
+ * Instead of failing on the first error, at most <code>maxErrors</code> errors (or unlimited 
+ * if <code>-1==maxErrors</code>) are logged and recorded the batch continues. 
+ * The client will receive a <code>status==200</code> response, which includes a list of errors 
+ * that were tolerated.
+ * </p>
+ * <p>
+ * If more then <code>maxErrors</code> occur, the first exception recorded will be re-thrown, 
+ * Solr will respond with <code>status==5xx</code> or <code>status==4xx</code> 
+ * (depending on the underlying exceptions) and it won't finish processing any more updates in the request. 
+ * (ie: subsequent update commands in the request will not be processed even if they are valid).
+ * </p>
+ * 
+ * <p>
+ * NOTE: In cloud based collections, this processor expects to <b>NOT</b> be used on {@link DistribPhase#FROMLEADER} 
+ * requests (because any successes that occur locally on the leader are considered successes even if there is some 
+ * subsequent error on a replica).  {@link TolerantUpdateProcessorFactory} will short circut it away in those 
+ * requests.
+ * </p>
+ * 
+ * @see TolerantUpdateProcessorFactory
+ */
+public class TolerantUpdateProcessor extends UpdateRequestProcessor {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  /**
+   * String to be used as document key for errors when a real uniqueKey can't be determined
+   */
+  private static final String UNKNOWN_ID = "(unknown)"; 
+
+  /**
+   * Response Header
+   */
+  private final NamedList<Object> header;
+  
+  /**
+   * Number of errors this UpdateRequestProcessor will tolerate. If more then this occur, 
+   * the original exception will be thrown, interrupting the processing of the document
+   * batch
+   */
+  private final int maxErrors;
+
+  /** The uniqueKey field */
+  private SchemaField uniqueKeyField;
+
+  private final SolrQueryRequest req;
+  private ZkController zkController;
+
+  /**
+   * Known errors that occurred in this batch, in order encountered (may not be the same as the 
+   * order the commands were originally executed in due to the async distributed updates).
+   */
+  private final List<ToleratedUpdateError> knownErrors = new ArrayList<ToleratedUpdateError>();
+
+  // Kludge: Because deleteByQuery updates are forwarded to every leader, we can get identical
+  // errors reported by every leader for the same underlying problem.
+  //
+  // It would be nice if we could cleanly handle the unlikely (but possible) situation of an
+  // update stream that includes multiple identical DBQs, with identical failures, and 
+  // to report each one once, for example...
+  //   add: id#1
+  //   dbq: foo:bar
+  //   add: id#2
+  //   add: id#3
+  //   dbq: foo:bar
+  //
+  // ...but i can't figure out a way to accurately identify & return duplicate 
+  // ToleratedUpdateErrors from duplicate identical underlying requests w/o erroneously returning identical 
+  // ToleratedUpdateErrors for the *same* underlying request but from diff shards.
+  //
+  // So as a kludge, we keep track of them for deduping against identical remote failures
+  //
+  private Set<ToleratedUpdateError> knownDBQErrors = new HashSet<>();
+        
+  private final FirstErrTracker firstErrTracker = new FirstErrTracker();
+  private final DistribPhase distribPhase;
+
+  public TolerantUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next, int maxErrors, DistribPhase distribPhase) {
+    super(next);
+    assert maxErrors >= -1;
+      
+    header = rsp.getResponseHeader();
+    this.maxErrors = ToleratedUpdateError.getEffectiveMaxErrors(maxErrors);
+    this.req = req;
+    this.distribPhase = distribPhase;
+    assert ! DistribPhase.FROMLEADER.equals(distribPhase);
+    
+    this.zkController = this.req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
+    this.uniqueKeyField = this.req.getCore().getLatestSchema().getUniqueKeyField();
+    assert null != uniqueKeyField : "Factory didn't enforce uniqueKey field?";
+  }
+  
+  @Override
+  public void processAdd(AddUpdateCommand cmd) throws IOException {
+    BytesRef id = null;
+    
+    try {
+      // force AddUpdateCommand to validate+cache the id before proceeding
+      id = cmd.getIndexedId();
+      
+      super.processAdd(cmd);
+
+    } catch (Throwable t) { 
+      firstErrTracker.caught(t);
+      knownErrors.add(new ToleratedUpdateError
+                      (CmdType.ADD,
+                       getPrintableId(id),
+                       t.getMessage()));
+      
+      if (knownErrors.size() > maxErrors) {
+        firstErrTracker.throwFirst();
+      }
+    }
+  }
+
+  @Override
+  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    
+    try {
+      
+      super.processDelete(cmd);
+      
+    } catch (Throwable t) {
+      firstErrTracker.caught(t);
+      
+      ToleratedUpdateError err = new ToleratedUpdateError(cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ,
+                                                          cmd.isDeleteById() ? cmd.id : cmd.query,
+                                                          t.getMessage());
+      knownErrors.add(err);
+
+      // NOTE: we're not using this to dedup before adding to knownErrors.
+      // if we're lucky enough to get an immediate local failure (ie: we're a leader, or some other processor
+      // failed) then recording the multiple failures is a good thing -- helps us with an accurate fail
+      // fast if we exceed maxErrors
+      if (CmdType.DELQ.equals(err.getType())) {
+        knownDBQErrors.add(err);
+      }
+      
+      if (knownErrors.size() > maxErrors) {
+        firstErrTracker.throwFirst();
+      }
+    }
+  }
+
+  @Override
+  public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+    try {
+      super.processMergeIndexes(cmd);
+    } catch (Throwable t) {
+      // we're not tolerante of errors from this type of command, but we
+      // do need to track it so we can annotate it with any other errors we were allready tolerant of
+      firstErrTracker.caught(t);
+      throw t;
+    }
+  }
+
+  @Override
+  public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    try {
+      super.processCommit(cmd);
+    } catch (Throwable t) {
+      // we're not tolerante of errors from this type of command, but we
+      // do need to track it so we can annotate it with any other errors we were allready tolerant of
+      firstErrTracker.caught(t);
+      throw t;
+    }
+  }
+
+  @Override
+  public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+    try {
+      super.processRollback(cmd);
+    } catch (Throwable t) {
+      // we're not tolerante of errors from this type of command, but we
+      // do need to track it so we can annotate it with any other errors we were allready tolerant of
+      firstErrTracker.caught(t);
+      throw t;
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+
+    // even if processAdd threw an error, this.finish() is still called and we might have additional
+    // errors from other remote leaders that we need to check for from the finish method of downstream processors
+    // (like DUP)
+    
+    try {
+      super.finish();
+    } catch (DistributedUpdateProcessor.DistributedUpdatesAsyncException duae) {
+      firstErrTracker.caught(duae);
+
+      
+      // adjust our stats based on each of the distributed errors
+      for (Error error : duae.errors) {
+        // we can't trust the req info from the Error, because multiple original requests might have been
+        // lumped together
+        //
+        // instead we trust the metadata that the TolerantUpdateProcessor running on the remote node added
+        // to the exception when it failed.
+        if ( ! (error.e instanceof SolrException) ) {
+          log.error("async update exception is not SolrException, no metadata to process", error.e);
+          continue;
+        }
+        SolrException remoteErr = (SolrException) error.e;
+        NamedList<String> remoteErrMetadata = remoteErr.getMetadata();
+
+        if (null == remoteErrMetadata) {
+          log.warn("remote error has no metadata to aggregate: " + remoteErr.getMessage(), remoteErr);
+          continue;
+        }
+
+        for (int i = 0; i < remoteErrMetadata.size(); i++) {
+          ToleratedUpdateError err =
+            ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
+                                                                     remoteErrMetadata.getVal(i));
+          if (null == err) {
+            // some metadata unrelated to this update processor
+            continue;
+          }
+
+          if (CmdType.DELQ.equals(err.getType())) {
+            if (knownDBQErrors.contains(err)) {
+              // we've already seen this identical error, probably a dup from another shard
+              continue;
+            } else {
+              knownDBQErrors.add(err);
+            }
+          }
+          
+          knownErrors.add(err);
+        }
+      }
+    }
+
+    header.add("errors", ToleratedUpdateError.formatForResponseHeader(knownErrors));
+    // include in response so client knows what effective value was (may have been server side config)
+    header.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxErrors));
+
+    // annotate any error that might be thrown (or was already thrown)
+    firstErrTracker.annotate(knownErrors);
+
+    // decide if we have hit a situation where we know an error needs to be thrown.
+    
+    if ((DistribPhase.TOLEADER.equals(distribPhase) ? 0 : maxErrors) < knownErrors.size()) {
+      // NOTE: even if maxErrors wasn't exceeeded, we need to throw an error when we have any errors if we're
+      // a leader that was forwarded to by another node so that the forwarding node knows we encountered some
+      // problems and can aggregate the results
+
+      firstErrTracker.throwFirst();
+    }
+  }
+
+  /**
+   * Returns the output of {@link org.apache.solr.schema.FieldType#
+   * indexedToReadable(BytesRef, CharsRefBuilder)} of the field
+   * type of the uniqueKey on the {@link BytesRef} passed as parameter.
+   * <code>ref</code> should be the indexed representation of the id -- if null
+   * (possibly because it's missing in the update) this method will return {@link #UNKNOWN_ID}
+   */
+  private String getPrintableId(BytesRef ref) {
+    if (ref == null) {
+      return UNKNOWN_ID;
+    }
+    return uniqueKeyField.getType().indexedToReadable(ref, new CharsRefBuilder()).toString();
+  }
+
+  /**
+   * Simple helper class for "tracking" any exceptions encountered.
+   * 
+   * Only remembers the "first" exception encountered, and wraps it in a SolrException if needed, so that 
+   * it can later be annotated with the metadata our users expect and re-thrown.
+   *
+   * NOTE: NOT THREAD SAFE
+   */
+  private static final class FirstErrTracker {
+
+    
+    SolrException first = null;
+    boolean thrown = false;
+    
+    public FirstErrTracker() {
+      /* NOOP */
+    }
+    
+    /** 
+     * Call this method immediately anytime an exception is caught from a down stream method -- 
+     * even if you are going to ignore it (for now).  If you plan to rethrow the Exception, use 
+     * {@link #throwFirst} instead.
+     */
+    public void caught(Throwable t) {
+      assert null != t;
+      if (null == first) {
+        if (t instanceof SolrException) {
+          first = (SolrException)t;
+        } else {
+          first = new SolrException(ErrorCode.SERVER_ERROR, "Tolerantly Caught Exception: " + t.getMessage(), t);
+        }
+      }
+    }
+    
+    /** 
+     * Call this method in place of any situation where you would normally (re)throw an exception 
+     * (already passed to the {@link #caught} method because maxErrors was exceeded
+     * is exceed.
+     *
+     * This method will keep a record that this update processor has already thrown the exception, and do 
+     * nothing on future calls, so subsequent update processor methods can update the metadata but won't 
+     * inadvertantly re-throw this (or any other) cascading exception by mistake.
+     */
+    public void throwFirst() throws SolrException {
+      assert null != first : "caught was never called?";
+      if (! thrown) {
+        thrown = true;
+        throw first;
+      }
+    }
+    
+    /** 
+     * Annotates the first exception (which may already have been thrown, or be thrown in the future) with 
+     * the metadata from this update processor.  For use in {@link TolerantUpdateProcessor#finish}
+     */
+    public void annotate(List<ToleratedUpdateError> errors) {
+
+      if (null == first) {
+        return; // no exception to annotate
+      }
+      
+      assert null != errors : "how do we have an exception to annotate w/o any errors?";
+      
+      NamedList<String> firstErrMetadata = first.getMetadata();
+      if (null == firstErrMetadata) { // obnoxious
+        firstErrMetadata = new NamedList<String>();
+        first.setMetadata(firstErrMetadata);
+      } else {
+        // any existing metadata representing ToleratedUpdateErrors in this single exception needs removed
+        // so we can add *all* of the known ToleratedUpdateErrors (from this and other exceptions)
+        for (int i = 0; i < firstErrMetadata.size(); i++) {
+          if (null != ToleratedUpdateError.parseMetadataIfToleratedUpdateError
+              (firstErrMetadata.getName(i), firstErrMetadata.getVal(i))) {
+               
+            firstErrMetadata.remove(i);
+            // NOTE: post decrementing index so we don't miss anything as we remove items
+            i--;
+          }
+        }
+      }
+
+      for (ToleratedUpdateError te : errors) {
+        firstErrMetadata.add(te.getMetadataKey(), te.getMetadataValue());
+      }
+    }
+    
+    
+    /** The first exception that was thrown (or may be thrown) whose metadata can be annotated. */
+    public SolrException getFirst() {
+      return first;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java
new file mode 100644
index 0000000..8cd3500
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import org.apache.solr.util.plugin.SolrCoreAware;
+
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * <p> 
+ * Suppresses errors for individual add/delete commands within a single request.
+ * Instead of failing on the first error, at most <code>maxErrors</code> errors (or unlimited 
+ * if <code>-1==maxErrors</code>) are logged and recorded the batch continues. 
+ * The client will receive a <code>status==200</code> response, which includes a list of errors 
+ * that were tolerated.
+ * </p>
+ * <p>
+ * If more then <code>maxErrors</code> occur, the first exception recorded will be re-thrown, 
+ * Solr will respond with <code>status==5xx</code> or <code>status==4xx</code> 
+ * (depending on the underlying exceptions) and it won't finish processing any more updates in the request. 
+ * (ie: subsequent update commands in the request will not be processed even if they are valid).
+ * </p>
+ * 
+ * <p>
+ * <code>maxErrors</code> is an int value that can be specified in the configuration and/or overridden 
+ * per request. If unset, it will default to {@link Integer#MAX_VALUE}.  Specifying an explicit value 
+ * of <code>-1</code> is supported as shorthand for {@link Integer#MAX_VALUE}, all other negative 
+ * integer values are not supported.
+ * </p>
+ * <p>
+ * An example configuration would be:
+ * </p>
+ * <pre class="prettyprint">
+ * &lt;updateRequestProcessorChain name="tolerant-chain"&gt;
+ *   &lt;processor class="solr.TolerantUpdateProcessorFactory"&gt;
+ *     &lt;int name="maxErrors"&gt;10&lt;/int&gt;
+ *   &lt;/processor&gt;
+ *   &lt;processor class="solr.RunUpdateProcessorFactory" /&gt;
+ * &lt;/updateRequestProcessorChain&gt;
+ * 
+ * </pre>
+ * 
+ * <p>
+ * The <code>maxErrors</code> parameter in the above chain could be overwritten per request, for example:
+ * </p>
+ * <pre class="prettyprint">
+ * curl http://localhost:8983/update?update.chain=tolerant-chain&amp;maxErrors=100 -H "Content-Type: text/xml" -d @myfile.xml
+ * </pre>
+ * 
+ * <p>
+ * <b>NOTE:</b> The behavior of this UpdateProcessofFactory in conjunction with indexing operations 
+ * while a Shard Split is actively in progress is not well defined (or sufficiently tested).  Users 
+ * of this update processor are encouraged to either disable it, or pause updates, while any shard 
+ * splitting is in progress (see <a href="https://issues.apache.org/jira/browse/SOLR-8881">SOLR-8881</a> 
+ * for more details.)
+ * </p>
+ */
+public class TolerantUpdateProcessorFactory extends UpdateRequestProcessorFactory
+  implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
+  
+  /**
+   * Parameter that defines how many errors the UpdateRequestProcessor will tolerate
+   */
+  private final static String MAX_ERRORS_PARAM = "maxErrors";
+  
+  /**
+   * Default maxErrors value that will be use if the value is not set in configuration
+   * or in the request
+   */
+  private int defaultMaxErrors = Integer.MAX_VALUE;
+
+  private boolean informed = false;
+  
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void init( NamedList args ) {
+
+    Object maxErrorsObj = args.get(MAX_ERRORS_PARAM); 
+    if (maxErrorsObj != null) {
+      try {
+        defaultMaxErrors = Integer.valueOf(maxErrorsObj.toString());
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Unnable to parse maxErrors parameter: " + maxErrorsObj, e);
+      }
+      if (defaultMaxErrors < -1) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Config option '"+MAX_ERRORS_PARAM + "' must either be non-negative, or -1 to indicate 'unlimiited': " + maxErrorsObj.toString());
+      }
+    }
+  }
+  
+  @Override
+  public void inform(SolrCore core) {
+    informed = true;
+    if (null == core.getLatestSchema().getUniqueKeyField()) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, this.getClass().getName() +
+                              " requires a schema that includes a uniqueKey field.");
+    }
+  }
+
+  @Override
+  public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+
+    assert informed : "inform(SolrCore) never called?";
+    
+    // short circut if we're a replica processing commands from our leader
+    DistribPhase distribPhase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+    if (DistribPhase.FROMLEADER.equals(distribPhase)) {
+      return next;
+    }
+
+    DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist(req, MAX_ERRORS_PARAM);
+    int maxErrors = req.getParams().getInt(MAX_ERRORS_PARAM, defaultMaxErrors);
+    if (maxErrors < -1) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "'"+MAX_ERRORS_PARAM + "' must either be non-negative, or -1 to indicate 'unlimiited': " + maxErrors);
+    }
+
+    // NOTE: even if 0==maxErrors, we still inject processor into chain so respones has expected header info
+    return new TolerantUpdateProcessor(req, rsp, next, maxErrors, distribPhase);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml
new file mode 100644
index 0000000..97ed18b
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <jmx />
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
+  
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
+    <!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
+    <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
+    <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
+    <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
+    <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
+    <str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
+  </directoryFactory>
+  
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+  <!-- an update processor the explicitly excludes distrib to test
+       clean errors when people attempt atomic updates w/o it
+  -->
+  <updateRequestProcessorChain name="nodistrib" >
+   <processor class="solr.NoOpDistributingUpdateProcessorFactory" />
+   <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+
+  <requestHandler name="standard" class="solr.StandardRequestHandler">
+  </requestHandler>
+
+  <requestHandler name="/get" class="solr.RealTimeGetHandler">
+    <lst name="defaults">
+      <str name="omitHeader">true</str>
+    </lst>
+  </requestHandler>
+  
+  <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" /> 
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler"  />
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog>
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+  </updateHandler>
+  
+  <updateRequestProcessorChain name="tolerant-chain-max-errors-10">
+    <processor class="solr.TolerantUpdateProcessorFactory">
+      <!-- explicitly testing that parsing still works if a valid int is specified as a string -->
+      <str name="maxErrors">10</str>
+    </processor>
+    <processor class="solr.DistributedUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+  <updateRequestProcessorChain name="tolerant-chain-max-errors-not-set">
+    <processor class="solr.TolerantUpdateProcessorFactory"/>
+    <processor class="solr.DistributedUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+  
+  <updateRequestProcessorChain name="not-tolerant">
+  <processor class="solr.DistributedUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+</config>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml
new file mode 100644
index 0000000..d3b90db
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+  
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+  </requestHandler>
+  
+  <updateRequestProcessorChain name="tolerant-chain">
+    <processor class="solr.TolerantUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml
index d0b5472..bb1cbcf 100644
--- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml
@@ -26,6 +26,7 @@
   <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
   <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
   <requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
+  <requestHandler name="/update" class="solr.UpdateRequestHandler"  />
   <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
   <schemaFactory class="ClassicIndexSchemaFactory"/>
 
@@ -628,4 +629,20 @@
     <processor class="solr.RunUpdateProcessorFactory" />
   </updateRequestProcessorChain>
 
+  <updateRequestProcessorChain name="tolerant-chain-max-errors-10">
+    <processor class="solr.TolerantUpdateProcessorFactory">
+      <int name="maxErrors">10</int>
+    </processor>
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+  <updateRequestProcessorChain name="tolerant-chain-max-errors-not-set">
+    <processor class="solr.TolerantUpdateProcessorFactory"/>
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+  
+  <updateRequestProcessorChain name="not-tolerant">
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
 </config>


[2/3] lucene-solr:master: SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause failures when sending multiple updates in a single request.

Posted by ho...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java
new file mode 100644
index 0000000..054c074
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.ToleratedUpdateError.CmdType;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.util.RevertDefaultThreadHandlerRule;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test of TolerantUpdateProcessor using a MiniSolrCloud.  Updates (that include failures which 
+ * should be tolerated) are explicitly tested against various initial nodes to confirm correct 
+ * behavior regardless of routing.
+ *
+ * <p>
+ * <b>NOTE:</b> This test sets up a static instance of MiniSolrCloud with a single collection 
+ * and several clients pointed at specific nodes. These are all re-used across multiple test methods, 
+ * and assumes that the state of the cluster is healthy.
+ * </p>
+ *
+ */
+public class TestTolerantUpdateProcessorCloud extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int NUM_SHARDS = 2; 
+  private static final int REPLICATION_FACTOR = 2; 
+  private static final int NUM_SERVERS = 5; 
+  
+  private static final String COLLECTION_NAME = "test_col";
+  
+  /** A basic client for operations at the cloud level, default collection will be set */
+  private static CloudSolrClient CLOUD_CLIENT;
+
+  /** A client for talking directly to the leader of shard1 */
+  private static HttpSolrClient S_ONE_LEADER_CLIENT;
+  
+  /** A client for talking directly to the leader of shard2 */
+  private static HttpSolrClient S_TWO_LEADER_CLIENT;
+
+  /** A client for talking directly to a passive replica of shard1 */
+  private static HttpSolrClient S_ONE_NON_LEADER_CLIENT;
+  
+  /** A client for talking directly to a passive replica of shard2 */
+  private static HttpSolrClient S_TWO_NON_LEADER_CLIENT;
+
+  /** A client for talking directly to a node that has no piece of the collection */
+  private static HttpSolrClient NO_COLLECTION_CLIENT;
+  
+  /** id field doc routing prefix for shard1 */
+  private static final String S_ONE_PRE = "abc!";
+  
+  /** id field doc routing prefix for shard2 */
+  private static final String S_TWO_PRE = "XYZ!";
+  
+  @BeforeClass
+  private static void createMiniSolrCloudCluster() throws Exception {
+    
+    final String configName = "solrCloudCollectionConfig";
+    final File configDir = new File(TEST_HOME() + File.separator + "collection1" + File.separator + "conf");
+
+    configureCluster(NUM_SERVERS)
+      .addConfig(configName, configDir.toPath())
+      .configure();
+    assertSpinLoopAllJettyAreRunning(cluster);
+    
+    Map<String, String> collectionProperties = new HashMap<>();
+    collectionProperties.put("config", "solrconfig-distrib-update-processor-chains.xml");
+    collectionProperties.put("schema", "schema15.xml"); // string id for doc routing prefix
+
+    assertNotNull(cluster.createCollection(COLLECTION_NAME, NUM_SHARDS, REPLICATION_FACTOR,
+                                           configName, null, null, collectionProperties));
+    
+    CLOUD_CLIENT = cluster.getSolrClient();
+    CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
+    
+    ZkStateReader zkStateReader = CLOUD_CLIENT.getZkStateReader();
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION_NAME, zkStateReader, true, true, 330);
+
+
+    // really hackish way to get a URL for specific nodes based on shard/replica hosting
+    // inspired by TestMiniSolrCloudCluster
+    HashMap<String, String> urlMap = new HashMap<>();
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      URL jettyURL = jetty.getBaseUrl();
+      String nodeKey = jettyURL.getHost() + ":" + jettyURL.getPort() + jettyURL.getPath().replace("/","_");
+      urlMap.put(nodeKey, jettyURL.toString());
+    }
+    zkStateReader.updateClusterState();
+    ClusterState clusterState = zkStateReader.getClusterState();
+    for (Slice slice : clusterState.getSlices(COLLECTION_NAME)) {
+      String shardName = slice.getName();
+      Replica leader = slice.getLeader();
+      assertNotNull("slice has null leader: " + slice.toString(), leader);
+      assertNotNull("slice leader has null node name: " + slice.toString(), leader.getNodeName());
+      String leaderUrl = urlMap.remove(leader.getNodeName());
+      assertNotNull("could not find URL for " + shardName + " leader: " + leader.getNodeName(),
+                    leaderUrl);
+      assertEquals("expected two total replicas for: " + slice.getName(),
+                   2, slice.getReplicas().size());
+      
+      String passiveUrl = null;
+      
+      for (Replica replica : slice.getReplicas()) {
+        if ( ! replica.equals(leader)) {
+          passiveUrl = urlMap.remove(replica.getNodeName());
+          assertNotNull("could not find URL for " + shardName + " replica: " + replica.getNodeName(),
+                        passiveUrl);
+        }
+      }
+      assertNotNull("could not find URL for " + shardName + " replica", passiveUrl);
+
+      if (shardName.equals("shard1")) {
+        S_ONE_LEADER_CLIENT = new HttpSolrClient(leaderUrl + "/" + COLLECTION_NAME + "/");
+        S_ONE_NON_LEADER_CLIENT = new HttpSolrClient(passiveUrl + "/" + COLLECTION_NAME + "/");
+      } else if (shardName.equals("shard2")) {
+        S_TWO_LEADER_CLIENT = new HttpSolrClient(leaderUrl + "/" + COLLECTION_NAME + "/");
+        S_TWO_NON_LEADER_CLIENT = new HttpSolrClient(passiveUrl + "/" + COLLECTION_NAME + "/");
+      } else {
+        fail("unexpected shard: " + shardName);
+      }
+    }
+    assertEquals("Should be exactly one server left (nost hosting either shard)", 1, urlMap.size());
+    NO_COLLECTION_CLIENT = new HttpSolrClient(urlMap.values().iterator().next() +
+                                              "/" + COLLECTION_NAME + "/");
+    
+    assertNotNull(S_ONE_LEADER_CLIENT);
+    assertNotNull(S_TWO_LEADER_CLIENT);
+    assertNotNull(S_ONE_NON_LEADER_CLIENT);
+    assertNotNull(S_TWO_NON_LEADER_CLIENT);
+    assertNotNull(NO_COLLECTION_CLIENT);
+
+    // sanity check that our S_ONE_PRE & S_TWO_PRE really do map to shard1 & shard2 with default routing
+    assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_ONE_PRE + random().nextInt()),
+                                         f("expected_shard_s", "shard1"))).getStatus());
+    assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_TWO_PRE + random().nextInt()),
+                                         f("expected_shard_s", "shard2"))).getStatus());
+    assertEquals(0, CLOUD_CLIENT.commit().getStatus());
+    SolrDocumentList docs = CLOUD_CLIENT.query(params("q", "*:*",
+                                                      "fl","id,expected_shard_s,[shard]")).getResults();
+    assertEquals(2, docs.getNumFound());
+    assertEquals(2, docs.size());
+    for (SolrDocument doc : docs) {
+      String expected = COLLECTION_NAME + "_" + doc.getFirstValue("expected_shard_s") + "_replica";
+      String docShard = doc.getFirstValue("[shard]").toString();
+      assertTrue("shard routing prefixes don't seem to be aligned anymore, " +
+                 "did someone change the default routing rules? " +
+                 "and/or the the default core name rules? " +
+                 "and/or the numShards used by this test? ... " +
+                 "couldn't find " + expected + " as substring of [shard] == '" + docShard +
+                 "' ... for docId == " + doc.getFirstValue("id"),
+                 docShard.contains(expected));
+    }
+  }
+  
+  @Before
+  private void clearCollection() throws Exception {
+    assertEquals(0, CLOUD_CLIENT.deleteByQuery("*:*").getStatus());
+    assertEquals(0, CLOUD_CLIENT.commit().getStatus());
+  }
+
+  public void testSanity() throws Exception {
+    
+    // verify some basic sanity checking of indexing & querying across the collection
+    // w/o using our custom update processor chain
+    
+    assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_ONE_PRE + "1"),
+                                         f("foo_i", 42))).getStatus());
+    assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_TWO_PRE + "2"),
+                                         f("foo_i", 66))).getStatus());
+    assertEquals(0, CLOUD_CLIENT.commit().getStatus());
+
+    for (SolrClient c : Arrays.asList(S_ONE_LEADER_CLIENT, S_TWO_LEADER_CLIENT,
+                                      S_ONE_NON_LEADER_CLIENT, S_TWO_NON_LEADER_CLIENT,
+                                      NO_COLLECTION_CLIENT, CLOUD_CLIENT)) {
+      assertQueryDocIds(c, true, S_ONE_PRE + "1",  S_TWO_PRE + "2");
+      assertQueryDocIds(c, false, "id_not_exists");
+
+      // verify adding 2 broken docs causes a clint exception
+      try {
+        UpdateResponse rsp = update(params(),
+                                    doc(f("id", S_ONE_PRE + "X"), f("foo_i", "bogus_val_X")),
+                                    doc(f("id", S_TWO_PRE + "Y"), f("foo_i", "bogus_val_Y"))
+                                    ).process(c);
+        fail("did not get a top level exception when more then 10 docs failed: " + rsp.toString());
+      } catch (SolrException e) {
+        assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(),
+                     400, e.code());
+      }
+        
+      // verify malformed deleteByQuerys fail
+      try {
+        UpdateResponse rsp = update(params()).deleteByQuery("foo_i:not_a_num").process(c);
+        fail("sanity check for malformed DBQ didn't fail: " + rsp.toString());
+      } catch (SolrException e) {
+        assertEquals("not the expected DBQ failure: " + e.getMessage(), 400, e.code());
+      }
+      
+      // verify oportunistic concurrency deletions fail as we expect when docs are / aren't present
+      for (UpdateRequest r : new UpdateRequest[] {
+          update(params("commit", "true")).deleteById(S_ONE_PRE + "1", -1L),
+          update(params("commit", "true")).deleteById(S_TWO_PRE + "2", -1L),
+          update(params("commit", "true")).deleteById("id_not_exists",  1L)    }) {
+        try {
+          UpdateResponse rsp = r.process(c);
+          fail("sanity check for oportunistic concurrency delete didn't fail: "
+               + r.toString() + " => " + rsp.toString());
+        } catch (SolrException e) {
+          assertEquals("not the expected oportunistic concurrency failure code: "
+                       + r.toString() + " => " + e.getMessage(), 409, e.code());
+        }
+      }
+    }
+  }
+
+  //
+  public void testVariousDeletesViaCloudClient() throws Exception {
+    testVariousDeletes(CLOUD_CLIENT);
+  }
+  public void testVariousDeletesViaShard1LeaderClient() throws Exception {
+    testVariousDeletes(S_ONE_LEADER_CLIENT);
+  }
+  public void testVariousDeletesViaShard2LeaderClient() throws Exception {
+    testVariousDeletes(S_TWO_LEADER_CLIENT);
+  }
+  public void testVariousDeletesViaShard1NonLeaderClient() throws Exception {
+    testVariousDeletes(S_ONE_NON_LEADER_CLIENT);
+  }
+  public void testVariousDeletesViaShard2NonLeaderClient() throws Exception {
+    testVariousDeletes(S_TWO_NON_LEADER_CLIENT);
+  }
+  public void testVariousDeletesViaNoCollectionClient() throws Exception {
+    testVariousDeletes(NO_COLLECTION_CLIENT);
+  }
+  
+  protected static void testVariousDeletes(SolrClient client) throws Exception {
+    assertNotNull("client not initialized", client);
+
+    // 2 docs, one on each shard
+    final String docId1 = S_ONE_PRE + "42";
+    final String docId2 = S_TWO_PRE + "666";
+    
+    UpdateResponse rsp = null;
+    
+    // add 1 doc to each shard
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", docId1), f("foo_i", "2001")),
+                 doc(f("id", docId2), f("foo_i", "1976"))).process(client);
+    assertEquals(0, rsp.getStatus());
+
+    // attempt to delete individual doc id(s) that should fail because of oportunistic concurrency constraints
+    for (String id : new String[] { docId1, docId2 }) {
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "commit", "true")).deleteById(id, -1L).process(client);
+      assertEquals(0, rsp.getStatus());
+      assertUpdateTolerantErrors("failed oportunistic concurrent delId="+id, rsp,
+                                 delIErr(id));
+    }
+    
+    // multiple failed deletes from the same shard (via oportunistic concurrent w/ bogus ids)
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")
+                 ).deleteById(S_ONE_PRE + "X", +1L).deleteById(S_ONE_PRE + "Y", +1L).process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by id for 2 bogus docs", rsp,
+                               delIErr(S_ONE_PRE + "X"), delIErr(S_ONE_PRE + "Y"));
+    assertQueryDocIds(client, true, docId1, docId2);
+    
+    // multiple failed deletes from the diff shards due to oportunistic concurrency constraints
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")).deleteById(docId2, -1L).deleteById(docId1, -1L).process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by id for 2 docs", rsp,
+                               delIErr(docId1), delIErr(docId2));
+    assertQueryDocIds(client, true, docId1, docId2);
+
+    // deleteByQuery using malformed query (fail)
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")).deleteByQuery("bogus_field:foo").process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by query", rsp,
+                               delQErr("bogus_field:foo"));
+    assertQueryDocIds(client, true, docId1, docId2);
+
+    // mix 2 deleteByQuery, one malformed (fail), one that doesn't match anything (ok)
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")
+                 ).deleteByQuery("bogus_field:foo").deleteByQuery("foo_i:23").process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by query", rsp,
+                               delQErr("bogus_field:foo"));
+    assertQueryDocIds(client, true, docId1, docId2);
+    
+    // mix 2 deleteById using _version_=-1, one for real doc1 (fail), one for bogus id (ok)
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")
+                 ).deleteById(docId1, -1L).deleteById("bogus", -1L).process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by id: exists", rsp,
+                               delIErr(docId1));
+    assertQueryDocIds(client, true, docId1, docId2);
+    
+    // mix 2 deleteById using _version_=1, one for real doc1 (ok, deleted), one for bogus id (fail)
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")
+                 ).deleteById(docId1, +1L).deleteById("bogusId", +1L).process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by id: bogus", rsp,
+                               delIErr("bogusId"));
+    assertQueryDocIds(client, false, docId1);
+    assertQueryDocIds(client, true, docId2);
+    
+    // mix 2 deleteByQuery, one malformed (fail), one that alctaully removes some docs (ok)
+    assertQueryDocIds(client, true, docId2);
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true")
+                 ).deleteByQuery("bogus_field:foo").deleteByQuery("foo_i:1976").process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("failed oportunistic concurrent delete by query", rsp,
+                               delQErr("bogus_field:foo"));
+    assertQueryDocIds(client, false, docId2);
+
+  }
+
+  
+  //
+  public void testVariousAddsViaCloudClient() throws Exception {
+    testVariousAdds(CLOUD_CLIENT);
+  }
+  public void testVariousAddsViaShard1LeaderClient() throws Exception {
+    testVariousAdds(S_ONE_LEADER_CLIENT);
+  }
+  public void testVariousAddsViaShard2LeaderClient() throws Exception {
+    testVariousAdds(S_TWO_LEADER_CLIENT);
+  }
+  public void testVariousAddsViaShard1NonLeaderClient() throws Exception {
+    testVariousAdds(S_ONE_NON_LEADER_CLIENT);
+  }
+  public void testVariousAddsViaShard2NonLeaderClient() throws Exception {
+    testVariousAdds(S_TWO_NON_LEADER_CLIENT);
+  }
+  public void testVariousAddsViaNoCollectionClient() throws Exception {
+    testVariousAdds(NO_COLLECTION_CLIENT);
+  }
+
+  protected static void testVariousAdds(SolrClient client) throws Exception {
+    assertNotNull("client not initialized", client);
+    
+    UpdateResponse rsp = null;
+
+    // 2 docs that are both on shard1, the first one should fail
+    for (int maxErrors : new int[] { -1, 2, 47, 10 }) {
+      // regardless of which of these maxErrors values we use, behavior should be the same...
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "maxErrors", ""+maxErrors,
+                          "commit", "true"),
+                   doc(f("id", S_ONE_PRE + "42"), f("foo_i", "bogus_value")),
+                   doc(f("id", S_ONE_PRE + "666"), f("foo_i", "1976"))).process(client);
+      
+      assertEquals(0, rsp.getStatus());
+      assertUpdateTolerantAddErrors("single shard, 1st doc should fail", rsp, S_ONE_PRE + "42");
+      assertEquals(0, client.commit().getStatus());
+      assertQueryDocIds(client, false, S_ONE_PRE + "42");
+      assertQueryDocIds(client, true, S_ONE_PRE + "666");
+
+      // ...only diff should be that we get an accurate report of the effective maxErrors
+      assertEquals(maxErrors, rsp.getResponseHeader().get("maxErrors"));
+    }
+    
+    // 2 docs that are both on shard1, the second one should fail
+    
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-not-set",
+                        "commit", "true"),
+                 doc(f("id", S_ONE_PRE + "55"), f("foo_i", "1976")),
+                 doc(f("id", S_ONE_PRE + "77"), f("foo_i", "bogus_val"))).process(client);
+    
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantAddErrors("single shard, 2nd doc should fail", rsp, S_ONE_PRE + "77");
+    assertQueryDocIds(client, false, S_ONE_PRE + "77");
+    assertQueryDocIds(client, true, S_ONE_PRE + "666", S_ONE_PRE + "55");
+    // since maxErrors is unset, we should get an "unlimited" value back
+    assertEquals(-1, rsp.getResponseHeader().get("maxErrors"));
+
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+
+    // 2 docs on 2 diff shards, first of which should fail
+    
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", S_ONE_PRE + "42"), f("foo_i", "bogus_value")),
+                 doc(f("id", S_TWO_PRE + "666"), f("foo_i", "1976"))).process(client);
+    
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantAddErrors("two shards, 1st doc should fail", rsp, S_ONE_PRE + "42");
+    assertEquals(0, client.commit().getStatus());
+    assertQueryDocIds(client, false, S_ONE_PRE + "42");
+    assertQueryDocIds(client, true, S_TWO_PRE + "666");
+    
+    // 2 docs on 2 diff shards, second of which should fail
+
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", S_ONE_PRE + "55"), f("foo_i", "1976")),
+                 doc(f("id", S_TWO_PRE + "77"), f("foo_i", "bogus_val"))).process(client);
+    
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantAddErrors("two shards, 2nd doc should fail", rsp, S_TWO_PRE + "77");
+    assertQueryDocIds(client, false, S_TWO_PRE + "77");
+    assertQueryDocIds(client, true, S_TWO_PRE + "666", S_ONE_PRE + "55");
+
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+
+    // many docs from diff shards, 1 from each shard should fail
+    
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", S_ONE_PRE + "11")),
+                 doc(f("id", S_TWO_PRE + "21")),
+                 doc(f("id", S_ONE_PRE + "12")),
+                 doc(f("id", S_TWO_PRE + "22"), f("foo_i", "bogus_val")),
+                 doc(f("id", S_ONE_PRE + "13")),
+                 doc(f("id", S_TWO_PRE + "23")),
+                 doc(f("id", S_ONE_PRE + "14")),
+                 doc(f("id", S_TWO_PRE + "24")),
+                 doc(f("id", S_ONE_PRE + "15"), f("foo_i", "bogus_val")),
+                 doc(f("id", S_TWO_PRE + "25")),
+                 doc(f("id", S_ONE_PRE + "16")),
+                 doc(f("id", S_TWO_PRE + "26"))).process(client);
+    
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantAddErrors("many docs, 1 from each shard should fail", rsp,
+                                  S_ONE_PRE + "15",
+                                  S_TWO_PRE + "22");
+    assertQueryDocIds(client, false, S_TWO_PRE + "22", S_ONE_PRE + "15");
+    assertQueryDocIds(client, true,
+                      S_ONE_PRE + "11", S_TWO_PRE + "21", S_ONE_PRE + "12",
+                      S_ONE_PRE + "13", S_TWO_PRE + "23", S_ONE_PRE + "14", S_TWO_PRE + "24",
+                      S_TWO_PRE + "25", S_ONE_PRE + "16", S_TWO_PRE + "26");
+
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+
+    // many docs from diff shards, 1 from each shard should fail and 1 w/o uniqueKey
+    
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", S_ONE_PRE + "11")),
+                 doc(f("id", S_TWO_PRE + "21")),
+                 doc(f("id", S_ONE_PRE + "12")),
+                 doc(f("id", S_TWO_PRE + "22"), f("foo_i", "bogus_val")),
+                 doc(f("id", S_ONE_PRE + "13")),
+                 doc(f("id", S_TWO_PRE + "23")),
+                 doc(f("foo_i", "42")),          // no "id"
+                 doc(f("id", S_ONE_PRE + "14")),
+                 doc(f("id", S_TWO_PRE + "24")),
+                 doc(f("id", S_ONE_PRE + "15"), f("foo_i", "bogus_val")),
+                 doc(f("id", S_TWO_PRE + "25")),
+                 doc(f("id", S_ONE_PRE + "16")),
+                 doc(f("id", S_TWO_PRE + "26"))).process(client);
+    
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantAddErrors("many docs, 1 from each shard (+ no id) should fail", rsp,
+                                  S_ONE_PRE + "15",
+                                  "(unknown)",
+                                  S_TWO_PRE + "22");
+    assertQueryDocIds(client, false, S_TWO_PRE + "22", S_ONE_PRE + "15");
+    assertQueryDocIds(client, true,
+                      S_ONE_PRE + "11", S_TWO_PRE + "21", S_ONE_PRE + "12",
+                      S_ONE_PRE + "13", S_TWO_PRE + "23", S_ONE_PRE + "14", S_TWO_PRE + "24",
+                      S_TWO_PRE + "25", S_ONE_PRE + "16", S_TWO_PRE + "26");
+
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+    
+    // many docs from diff shards, more then 10 (total) should fail
+
+    try {
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "commit", "true"),
+                   doc(f("id", S_ONE_PRE + "11")),
+                   doc(f("id", S_TWO_PRE + "21"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_ONE_PRE + "12")),
+                   doc(f("id", S_TWO_PRE + "22"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_ONE_PRE + "13")),
+                   doc(f("id", S_TWO_PRE + "23"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_ONE_PRE + "14"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_TWO_PRE + "24")),
+                   doc(f("id", S_ONE_PRE + "15"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_TWO_PRE + "25")),
+                   doc(f("id", S_ONE_PRE + "16"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_TWO_PRE + "26"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_ONE_PRE + "17")),
+                   doc(f("id", S_TWO_PRE + "27")),
+                   doc(f("id", S_ONE_PRE + "18"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_TWO_PRE + "28"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_ONE_PRE + "19"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_TWO_PRE + "29"), f("foo_i", "bogus_val")),
+                   doc(f("id", S_ONE_PRE + "10")), // may be skipped, more then 10 fails
+                   doc(f("id", S_TWO_PRE + "20"))  // may be skipped, more then 10 fails
+                   ).process(client);
+      
+      fail("did not get a top level exception when more then 10 docs failed: " + rsp.toString());
+    } catch (SolrException e) {
+      // we can't make any reliable assertions about the error message, because
+      // it varies based on how the request was routed -- see SOLR-8830
+      assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(),
+                   // NOTE: we always expect a 400 because we know that's what we would get from these types of errors
+                   // on a single node setup -- a 5xx type error isn't something we should have triggered
+                   400, e.code());
+
+      // verify that the Exceptions metadata can tell us what failed.
+      NamedList<String> remoteErrMetadata = e.getMetadata();
+      assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata);
+      Set<ToleratedUpdateError> actualKnownErrs
+        = new LinkedHashSet<ToleratedUpdateError>(remoteErrMetadata.size());
+      int actualKnownErrsCount = 0;
+      for (int i = 0; i < remoteErrMetadata.size(); i++) {
+        ToleratedUpdateError err =
+          ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
+                                                                   remoteErrMetadata.getVal(i));
+        if (null == err) {
+          // some metadata unrelated to this update processor
+          continue;
+        }
+        actualKnownErrsCount++;
+        actualKnownErrs.add(err);
+      }
+      assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(),
+                   11, actualKnownErrsCount);
+      assertEquals("at least one dup error in metadata: " + remoteErrMetadata.toString(),
+                   actualKnownErrsCount, actualKnownErrs.size());
+      for (ToleratedUpdateError err : actualKnownErrs) {
+        assertEquals("only expected type of error is ADD: " + err,
+                     CmdType.ADD, err.getType());
+        assertTrue("failed err msg didn't match expected value: " + err,
+                   err.getMessage().contains("bogus_val"));
+      }
+    }
+    assertEquals(0, client.commit().getStatus()); // need to force since update didn't finish
+    assertQueryDocIds(client, false
+                      // explicitly failed
+                      , S_TWO_PRE + "21", S_TWO_PRE + "22", S_TWO_PRE + "23", S_ONE_PRE + "14"
+                      , S_ONE_PRE + "15", S_ONE_PRE + "16", S_TWO_PRE + "26", S_ONE_PRE + "18"
+                      , S_TWO_PRE + "28", S_ONE_PRE + "19", S_TWO_PRE + "29"
+                      //
+                      // // we can't assert for sure these docs were skipped
+                      // // depending on shard we hit, they may have been added async before errors were exceeded
+                      // , S_ONE_PRE + "10", S_TWO_PRE + "20" // skipped
+                      );
+    assertQueryDocIds(client, true,
+                      S_ONE_PRE + "11", S_ONE_PRE + "12", S_ONE_PRE + "13", S_TWO_PRE + "24",
+                      S_TWO_PRE + "25", S_ONE_PRE + "17", S_TWO_PRE + "27");
+    
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+    
+    // many docs from diff shards, more then 10 from a single shard (two) should fail
+
+    try {
+      ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(30);
+      docs.add(doc(f("id", S_ONE_PRE + "z")));
+      docs.add(doc(f("id", S_TWO_PRE + "z")));
+      docs.add(doc(f("id", S_ONE_PRE + "y")));
+      docs.add(doc(f("id", S_TWO_PRE + "y")));
+      for (int i = 0; i < 11; i++) {
+        docs.add(doc(f("id", S_ONE_PRE + i)));
+        docs.add(doc(f("id", S_TWO_PRE + i), f("foo_i", "bogus_val")));
+      }
+      docs.add(doc(f("id", S_ONE_PRE + "x"))); // may be skipped, more then 10 fails
+      docs.add(doc(f("id", S_TWO_PRE + "x"))); // may be skipped, more then 10 fails
+          
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "commit", "true"),
+                   docs.toArray(new SolrInputDocument[docs.size()])).process(client);
+      
+      fail("did not get a top level exception when more then 10 docs failed: " + rsp.toString());
+    } catch (SolrException e) {
+      // we can't make any reliable assertions about the error message, because
+      // it varies based on how the request was routed -- see SOLR-8830
+      assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(),
+                   // NOTE: we always expect a 400 because we know that's what we would get from these types of errors
+                   // on a single node setup -- a 5xx type error isn't something we should have triggered
+                   400, e.code());
+
+      // verify that the Exceptions metadata can tell us what failed.
+      NamedList<String> remoteErrMetadata = e.getMetadata();
+      assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata);
+      Set<ToleratedUpdateError> actualKnownErrs
+        = new LinkedHashSet<ToleratedUpdateError>(remoteErrMetadata.size());
+      int actualKnownErrsCount = 0;
+      for (int i = 0; i < remoteErrMetadata.size(); i++) {
+        ToleratedUpdateError err =
+          ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
+                                                                   remoteErrMetadata.getVal(i));
+        if (null == err) {
+          // some metadata unrelated to this update processor
+          continue;
+        }
+        actualKnownErrsCount++;
+        actualKnownErrs.add(err);
+      }
+      assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(),
+                   11, actualKnownErrsCount);
+      assertEquals("at least one dup error in metadata: " + remoteErrMetadata.toString(),
+                   actualKnownErrsCount, actualKnownErrs.size());
+      for (ToleratedUpdateError err : actualKnownErrs) {
+        assertEquals("only expected type of error is ADD: " + err,
+                     CmdType.ADD, err.getType());
+        assertTrue("failed id had unexpected prefix: " + err,
+                   err.getId().startsWith(S_TWO_PRE));
+        assertTrue("failed err msg didn't match expected value: " + err,
+                   err.getMessage().contains("bogus_val"));
+      }
+           
+    }
+    assertEquals(0, client.commit().getStatus()); // need to force since update didn't finish
+    assertQueryDocIds(client, true
+                      , S_ONE_PRE + "z", S_ONE_PRE + "y", S_TWO_PRE + "z", S_TWO_PRE + "y" // first
+                      //
+                      , S_ONE_PRE + "0", S_ONE_PRE + "1", S_ONE_PRE + "2", S_ONE_PRE + "3", S_ONE_PRE + "4"
+                      , S_ONE_PRE + "5", S_ONE_PRE + "6", S_ONE_PRE + "7", S_ONE_PRE + "8", S_ONE_PRE + "9"
+                      );
+    assertQueryDocIds(client, false
+                      // explicitly failed
+                      , S_TWO_PRE + "0", S_TWO_PRE + "1", S_TWO_PRE + "2", S_TWO_PRE + "3", S_TWO_PRE + "4"
+                      , S_TWO_PRE + "5", S_TWO_PRE + "6", S_TWO_PRE + "7", S_TWO_PRE + "8", S_TWO_PRE + "9"
+                      //
+                      // // we can't assert for sure these docs were skipped
+                      // // depending on shard we hit, they may have been added async before errors were exceeded
+                      // , S_ONE_PRE + "x", S_TWO_PRE + "x", // skipped
+                      );
+
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+    
+    // many docs from diff shards, more then 10 don't have any uniqueKey specified
+
+    try {
+      ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(30);
+      docs.add(doc(f("id", S_ONE_PRE + "z")));
+      docs.add(doc(f("id", S_TWO_PRE + "z")));
+      docs.add(doc(f("id", S_ONE_PRE + "y")));
+      docs.add(doc(f("id", S_TWO_PRE + "y")));
+      for (int i = 0; i < 11; i++) {
+        // no "id" field
+        docs.add(doc(f("foo_i", "" + i)));
+      }
+      docs.add(doc(f("id", S_ONE_PRE + "x"))); // may be skipped, more then 10 fails
+      docs.add(doc(f("id", S_TWO_PRE + "x"))); // may be skipped, more then 10 fails
+          
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "commit", "true"),
+                   docs.toArray(new SolrInputDocument[docs.size()])).process(client);
+      
+      fail("did not get a top level exception when more then 10 docs mising uniqueKey: " + rsp.toString());
+    } catch (SolrException e) {
+      // we can't make any reliable assertions about the error message, because
+      // it varies based on how the request was routed -- see SOLR-8830
+      assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(),
+                   // NOTE: we always expect a 400 because we know that's what we would get from these types of errors
+                   // on a single node setup -- a 5xx type error isn't something we should have triggered
+                   400, e.code());
+
+      // verify that the Exceptions metadata can tell us what failed.
+      NamedList<String> remoteErrMetadata = e.getMetadata();
+      assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata);
+      int actualKnownErrsCount = 0;
+      for (int i = 0; i < remoteErrMetadata.size(); i++) {
+        ToleratedUpdateError err =
+          ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
+                                                                   remoteErrMetadata.getVal(i));
+        if (null == err) {
+          // some metadata unrelated to this update processor
+          continue;
+        }
+        actualKnownErrsCount++;
+        assertEquals("only expected type of error is ADD: " + err,
+                     CmdType.ADD, err.getType());
+        assertTrue("failed id didn't match 'unknown': " + err,
+                   err.getId().contains("unknown"));
+      }
+      assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(),
+                   11, actualKnownErrsCount);
+    }
+    assertEquals(0, client.commit().getStatus()); // need to force since update didn't finish
+    assertQueryDocIds(client, true
+                      , S_ONE_PRE + "z", S_ONE_PRE + "y", S_TWO_PRE + "z", S_TWO_PRE + "y" // first
+                      // // we can't assert for sure these docs were skipped or added
+                      // // depending on shard we hit, they may have been added async before errors were exceeded
+                      // , S_ONE_PRE + "x", S_TWO_PRE + "x" // skipped
+                      );
+
+    // clean slate
+    assertEquals(0, client.deleteByQuery("*:*").getStatus());
+    
+    // many docs from diff shards, more then 10 from a single shard (two) should fail but
+    // request should still succeed because of maxErrors=-1 param
+
+    ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(30);
+    ArrayList<ExpectedErr> expectedErrs = new ArrayList<ExpectedErr>(30);
+    docs.add(doc(f("id", S_ONE_PRE + "z")));
+    docs.add(doc(f("id", S_TWO_PRE + "z")));
+    docs.add(doc(f("id", S_ONE_PRE + "y")));
+    docs.add(doc(f("id", S_TWO_PRE + "y")));
+    for (int i = 0; i < 11; i++) {
+      docs.add(doc(f("id", S_ONE_PRE + i)));
+      docs.add(doc(f("id", S_TWO_PRE + i), f("foo_i", "bogus_val")));
+      expectedErrs.add(addErr(S_TWO_PRE + i));
+    }
+    docs.add(doc(f("id", S_ONE_PRE + "x"))); 
+    docs.add(doc(f("id", S_TWO_PRE + "x"))); 
+    
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "maxErrors", "-1",
+                        "commit", "true"),
+                 docs.toArray(new SolrInputDocument[docs.size()])).process(client);
+    assertUpdateTolerantErrors("many docs from shard2 fail, but req should succeed", rsp,
+                               expectedErrs.toArray(new ExpectedErr[expectedErrs.size()]));
+    assertQueryDocIds(client, true
+                      , S_ONE_PRE + "z", S_ONE_PRE + "y", S_TWO_PRE + "z", S_TWO_PRE + "y" // first
+                      , S_ONE_PRE + "x", S_TWO_PRE + "x" // later
+                      );
+
+  }
+
+  //
+  public void testAddsMixedWithDeletesViaCloudClient() throws Exception {
+    testAddsMixedWithDeletes(CLOUD_CLIENT);
+  }
+  public void testAddsMixedWithDeletesViaShard1LeaderClient() throws Exception {
+    testAddsMixedWithDeletes(S_ONE_LEADER_CLIENT);
+  }
+  public void testAddsMixedWithDeletesViaShard2LeaderClient() throws Exception {
+    testAddsMixedWithDeletes(S_TWO_LEADER_CLIENT);
+  }
+  public void testAddsMixedWithDeletesViaShard1NonLeaderClient() throws Exception {
+    testAddsMixedWithDeletes(S_ONE_NON_LEADER_CLIENT);
+  }
+  public void testAddsMixedWithDeletesViaShard2NonLeaderClient() throws Exception {
+    testAddsMixedWithDeletes(S_TWO_NON_LEADER_CLIENT);
+  }
+  public void testAddsMixedWithDeletesViaNoCollectionClient() throws Exception {
+    testAddsMixedWithDeletes(NO_COLLECTION_CLIENT);
+  }
+  
+  protected static void testAddsMixedWithDeletes(SolrClient client) throws Exception {
+    assertNotNull("client not initialized", client);
+
+    // 3 doc ids, exactly one on shard1
+    final String docId1  = S_ONE_PRE + "42";
+    final String docId21 = S_TWO_PRE + "42";
+    final String docId22 = S_TWO_PRE + "666";
+    
+    UpdateResponse rsp = null;
+    
+    // add 2 docs, one to each shard
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", docId1), f("foo_i", "2001")),
+                 doc(f("id", docId21), f("foo_i", "1976"))).process(client);
+    assertEquals(0, rsp.getStatus());
+
+    // add failure on shard2, delete failure on shard1
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", docId22), f("foo_i", "not_a_num")))
+      .deleteById(docId1, -1L)
+      .process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("shard2 add fail, shard1 delI fail", rsp,
+                               delIErr(docId1, "version conflict"),
+                               addErr(docId22,"not_a_num"));
+    
+    // attempt a request containing 4 errors of various types (add, delI, delQ)
+    for (String maxErrors : new String[] {"4", "-1", "100"}) {
+      // for all of these maxErrors values, the overall request should still succeed
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "maxErrors", maxErrors,
+                          "commit", "true"),
+                   doc(f("id", docId22), f("foo_i", "bogus_val")))
+        .deleteById(docId1, -1L)
+        .deleteByQuery("malformed:[")
+        .deleteById(docId21, -1L)
+        .process(client);
+      
+      assertEquals(0, rsp.getStatus());
+      assertUpdateTolerantErrors("failed variety of updates", rsp,
+                                 delIErr(docId1, "version conflict"),
+                                 delQErr("malformed:[", "SyntaxError"),
+                                 delIErr(docId21,"version conflict"),
+                                 addErr(docId22,"bogus_val"));
+    }
+    
+    // attempt a request containing 4 errors of various types (add, delI, delQ) .. 1 too many
+    try {
+      rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                          "maxErrors", "3",
+                          "commit", "true"),
+                   doc(f("id", docId22), f("foo_i", "bogus_val")))
+        .deleteById(docId1, -1L)
+        .deleteByQuery("malformed:[")
+        .deleteById(docId21, -1L)
+        .process(client);
+      fail("did not get a top level exception when more then 4 updates failed: " + rsp.toString());
+    } catch (SolrException e) {
+      // we can't make any reliable assertions about the error message, because
+      // it varies based on how the request was routed -- see SOLR-8830
+      
+      // likewise, we can't make a firm(er) assertion about the response code...
+      assertTrue("not the type of error we were expecting ("+e.code()+"): " + e.toString(),
+                 // should be one these 2 depending on order that the async errors were hit...
+                 // on a single node setup -- a 5xx type error isn't something we should have triggered
+                 400 == e.code() || 409 == e.code());
+
+      // verify that the Exceptions metadata can tell us what failed.
+      NamedList<String> remoteErrMetadata = e.getMetadata();
+      assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata);
+      Set<ToleratedUpdateError> actualKnownErrs
+        = new LinkedHashSet<ToleratedUpdateError>(remoteErrMetadata.size());
+      int actualKnownErrsCount = 0;
+      for (int i = 0; i < remoteErrMetadata.size(); i++) {
+        ToleratedUpdateError err =
+          ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
+                                                                   remoteErrMetadata.getVal(i));
+        if (null == err) {
+          // some metadata unrelated to this update processor
+          continue;
+        }
+        actualKnownErrsCount++;
+        actualKnownErrs.add(err);
+      }
+      assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(),
+                   4, actualKnownErrsCount);
+      assertEquals("at least one dup error in metadata: " + remoteErrMetadata.toString(),
+                   actualKnownErrsCount, actualKnownErrs.size());
+    }
+
+    // sanity check our 2 existing docs are still here
+    assertQueryDocIds(client, true, docId1, docId21);
+    assertQueryDocIds(client, false, docId22);
+
+    // tolerate some failures along with a DELQ that should succeed
+    rsp = update(params("update.chain", "tolerant-chain-max-errors-10",
+                        "commit", "true"),
+                 doc(f("id", docId22), f("foo_i", "not_a_num")))
+      .deleteById(docId1, -1L)
+      .deleteByQuery("zot_i:[42 to gibberish...")
+      .deleteByQuery("foo_i:[50 TO 2000}")
+      .process(client);
+    assertEquals(0, rsp.getStatus());
+    assertUpdateTolerantErrors("mix fails with one valid DELQ", rsp,
+                               delIErr(docId1, "version conflict"),
+                               delQErr("zot_i:[42 to gibberish..."),
+                               addErr(docId22,"not_a_num"));
+    // one of our previous docs should have been deleted now
+    assertQueryDocIds(client, true, docId1);
+    assertQueryDocIds(client, false, docId21, docId22);
+                      
+  }
+
+  /**
+   * HACK: Loops over every Jetty instance in the specified MiniSolrCloudCluster to see if they are running,
+   * and sleeps small increments until they all report that they are, or a max num iters is reached
+   * 
+   * (work around for SOLR-8862.  Maybe something like this should be promoted into MiniSolrCloudCluster's 
+   * start() method? or SolrCloudTestCase's configureCluster?)
+   */
+  public static void assertSpinLoopAllJettyAreRunning(MiniSolrCloudCluster cluster) throws InterruptedException {
+    // NOTE: idealy we could use an ExecutorService that tried to open Sockets (with a long timeout)
+    // to each of the jetty instances in parallel w/o any sleeping -- but since they pick their ports
+    // dynamically and don't report them until/unless the server is up, that won't neccessarily do us
+    // any good.
+    final int numServers = cluster.getJettySolrRunners().size();
+    int numRunning = 0;
+    for (int i = 5; 0 <= i; i--) {
+      numRunning = 0;
+      for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+        if (jetty.isRunning()) {
+          numRunning++;
+        }
+      }
+      if (numServers == numRunning) {
+        return;
+      } else if (0 == i) {
+        // give up
+        break;
+      }
+      // the more nodes we're waiting on, the longer we should try to sleep (within reason)
+      Thread.sleep(Math.min((numServers - numRunning) * 100, 1000));
+    }
+    assertEquals("giving up waiting for all jetty instances to be running",
+                 numServers, numRunning);
+  }
+  
+  /** Asserts that the UpdateResponse contains the specified expectedErrs and no others */
+  public static void assertUpdateTolerantErrors(String assertionMsgPrefix,
+                                                UpdateResponse response,
+                                                ExpectedErr... expectedErrs) {
+    @SuppressWarnings("unchecked")
+    List<SimpleOrderedMap<String>> errors = (List<SimpleOrderedMap<String>>)
+      response.getResponseHeader().get("errors");
+    
+    assertNotNull(assertionMsgPrefix + ": Null errors: " + response.toString(), errors);
+    assertEquals(assertionMsgPrefix + ": Num error ids: " + errors.toString(),
+                 expectedErrs.length, errors.size());
+
+    for (SimpleOrderedMap<String> err : errors) {
+      String assertErrPre = assertionMsgPrefix + ": " + err.toString();
+
+      String id = err.get("id");
+      assertNotNull(assertErrPre + " ... null id", id);
+      String type = err.get("type");
+      assertNotNull(assertErrPre + " ... null type", type);
+      String message = err.get("message");
+      assertNotNull(assertErrPre + " ... null message", message);
+
+      // inefficient scan, but good nough for the size of sets we're dealing with
+      boolean found = false;
+      for (ExpectedErr expected : expectedErrs) {
+        if (expected.type.equals(type) && expected.id.equals(id)
+            && (null == expected.msgSubStr || message.contains(expected.msgSubStr))) {
+          found = true;
+          break;
+        }
+      }
+      assertTrue(assertErrPre + " ... unexpected err in: " + response.toString(), found);
+
+    }
+  }
+  
+  /** convinience method when the only type of errors you expect are 'add' errors */
+  public static void assertUpdateTolerantAddErrors(String assertionMsgPrefix,
+                                                   UpdateResponse response,
+                                                   String... errorIdsExpected) {
+    ExpectedErr[] expected = new ExpectedErr[errorIdsExpected.length];
+    for (int i = 0; i < expected.length; i++) {
+      expected[i] = addErr(errorIdsExpected[i]);
+    }
+    assertUpdateTolerantErrors(assertionMsgPrefix, response, expected);
+  }
+
+  /** 
+   * Asserts that the specified document ids do/do-not exist in the index, using both the specified client, 
+   * and the CLOUD_CLIENT 
+   */
+  public static void assertQueryDocIds(SolrClient client, boolean shouldExist, String... ids) throws Exception {
+    for (String id : ids) {
+      assertEquals(client.toString() + " should " + (shouldExist ? "" : "not ") + "find id: " + id,
+                   (shouldExist ? 1 : 0),
+                   CLOUD_CLIENT.query(params("q", "{!term f=id}" + id)).getResults().getNumFound());
+    }
+    if (! CLOUD_CLIENT.equals(client) ) {
+      assertQueryDocIds(CLOUD_CLIENT, shouldExist, ids);
+    }
+  }
+  
+  public static UpdateRequest update(SolrParams params, SolrInputDocument... docs) {
+    UpdateRequest r = new UpdateRequest();
+    r.setParams(new ModifiableSolrParams(params));
+    r.add(Arrays.asList(docs));
+    return r;
+  }
+  
+  public static SolrInputDocument doc(SolrInputField... fields) {
+    SolrInputDocument doc = new SolrInputDocument();
+    for (SolrInputField f : fields) {
+      doc.put(f.getName(), f);
+    }
+    return doc;
+  }
+  
+  public static SolrInputField f(String fieldName, Object... values) {
+    SolrInputField f = new SolrInputField(fieldName);
+    f.setValue(values, 1.0F);
+    return f;
+  }
+
+  /** simple helper struct */
+  public static final class ExpectedErr {
+    final String type;
+    final String id;
+    final String msgSubStr; // ignored if null
+
+    public ExpectedErr(String type, String id, String msgSubStr) {
+      this.type = type;
+      this.id = id;
+      this.msgSubStr = msgSubStr;
+    }
+    public String toString() {
+      return "type=<"+type+">,id=<"+id+">,msgSubStr=<"+msgSubStr+">";
+    }
+  }
+  public static ExpectedErr addErr(String id, String msgSubStr) {
+    return new ExpectedErr("ADD", id, msgSubStr);
+  }
+  public static ExpectedErr delIErr(String id, String msgSubStr) {
+    return new ExpectedErr("DELID", id, msgSubStr);
+  }
+  public static ExpectedErr delQErr(String id, String msgSubStr) {
+    return new ExpectedErr("DELQ", id, msgSubStr);
+  }  
+  public static ExpectedErr addErr(String id) {
+    return addErr(id, null);
+  }
+  public static ExpectedErr delIErr(String id) {
+    return delIErr(id, null);
+  }
+  public static ExpectedErr delQErr(String id) {
+    return delQErr(id, null);
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java
new file mode 100644
index 0000000..b3f0423
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.assertUpdateTolerantErrors;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.addErr;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delIErr;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delQErr;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.f;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.update;
+import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.ExpectedErr;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_PARAM;
+import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_NEXT;
+import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_START;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.ToleratedUpdateError.CmdType;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.util.RevertDefaultThreadHandlerRule;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test of TolerantUpdateProcessor using a randomized MiniSolrCloud.
+ * Reuses some utility methods in {@link TestTolerantUpdateProcessorCloud}
+ *
+ * <p>
+ * <b>NOTE:</b> This test sets up a static instance of MiniSolrCloud with a single collection 
+ * and several clients pointed at specific nodes. These are all re-used across multiple test methods, 
+ * and assumes that the state of the cluster is healthy between tests.
+ * </p>
+ *
+ */
+public class TestTolerantUpdateProcessorRandomCloud extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String COLLECTION_NAME = "test_col";
+  
+  /** A basic client for operations at the cloud level, default collection will be set */
+  private static CloudSolrClient CLOUD_CLIENT;
+  /** one HttpSolrClient for each server */
+  private static List<SolrClient> NODE_CLIENTS;
+
+  @BeforeClass
+  private static void createMiniSolrCloudCluster() throws Exception {
+    
+    final String configName = "solrCloudCollectionConfig";
+    final File configDir = new File(TEST_HOME() + File.separator + "collection1" + File.separator + "conf");
+
+    final int numShards = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3);
+    final int repFactor = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3);
+    // at least one server won't have any replicas
+    final int numServers = 1 + (numShards * repFactor);
+
+    log.info("Configuring cluster: servers={}, shards={}, repfactor={}", numServers, numShards, repFactor);
+    configureCluster(numServers)
+      .addConfig(configName, configDir.toPath())
+      .configure();
+
+    TestTolerantUpdateProcessorCloud.assertSpinLoopAllJettyAreRunning(cluster);
+    
+    Map<String, String> collectionProperties = new HashMap<>();
+    collectionProperties.put("config", "solrconfig-distrib-update-processor-chains.xml");
+    collectionProperties.put("schema", "schema15.xml"); // string id 
+
+
+    assertNotNull(cluster.createCollection(COLLECTION_NAME, numShards, repFactor,
+                                           configName, null, null, collectionProperties));
+    
+    CLOUD_CLIENT = cluster.getSolrClient();
+    CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
+
+    NODE_CLIENTS = new ArrayList<SolrClient>(numServers);
+    
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      URL jettyURL = jetty.getBaseUrl();
+      NODE_CLIENTS.add(new HttpSolrClient(jettyURL.toString() + "/" + COLLECTION_NAME + "/"));
+    }
+    assertEquals(numServers, NODE_CLIENTS.size());
+    
+    ZkStateReader zkStateReader = CLOUD_CLIENT.getZkStateReader();
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION_NAME, zkStateReader, true, true, 330);
+    
+  }
+  
+  @Before
+  private void deleteAllDocs() throws Exception {
+    assertEquals(0, update(params("commit","true")).deleteByQuery("*:*").process(CLOUD_CLIENT).getStatus());
+    assertEquals("index should be empty", 0L, countDocs(CLOUD_CLIENT));
+  }
+  
+  public void testRandomUpdates() throws Exception {
+    final int maxDocId = atLeast(10000);
+    final BitSet expectedDocIds = new BitSet(maxDocId+1);
+    
+    final int numIters = atLeast(50);
+    for (int i = 0; i < numIters; i++) {
+
+      log.info("BEGIN ITER #{}", i);
+      
+      final UpdateRequest req = update(params("maxErrors","-1",
+                                              "update.chain", "tolerant-chain-max-errors-10"));
+      final int numCmds = TestUtil.nextInt(random(), 1, 20);
+      final List<ExpectedErr> expectedErrors = new ArrayList<ExpectedErr>(numCmds);
+      int expectedErrorsCount = 0;
+      // it's ambigious/confusing which order mixed DELQ + ADD  (or ADD and DELI for the same ID)
+      // in the same request wll be processed by various clients, so we keep things simple
+      // and ensure that no single doc Id is affected by more then one command in the same request
+      final BitSet docsAffectedThisRequest = new BitSet(maxDocId+1);
+      for (int cmdIter = 0; cmdIter < numCmds; cmdIter++) {
+        if ((maxDocId / 2) < docsAffectedThisRequest.cardinality()) {
+          // we're already mucking with more then half the docs in the index
+          break;
+        }
+
+        final boolean causeError = random().nextBoolean();
+        if (causeError) {
+          expectedErrorsCount++;
+        }
+        
+        if (random().nextBoolean()) {
+          // add a doc
+          String id = null;
+          SolrInputDocument doc = null;
+          if (causeError && (0 == TestUtil.nextInt(random(), 0, 21))) {
+            doc = doc(f("foo_s","no unique key"));
+            expectedErrors.add(addErr("(unknown)"));
+          } else {
+            final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
+            docsAffectedThisRequest.set(id_i);
+            id = "id_"+id_i;
+            if (causeError) {
+              expectedErrors.add(addErr(id));
+            } else {
+              expectedDocIds.set(id_i);
+            }
+            final String val = causeError ? "bogus_val" : (""+TestUtil.nextInt(random(), 42, 666));
+            doc = doc(f("id",id),
+                      f("id_i", id_i),
+                      f("foo_i", val));
+          }
+          req.add(doc);
+          log.info("ADD: {} = {}", id, doc);
+        } else {
+          // delete something
+          if (random().nextBoolean()) {
+            // delete by id
+            final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
+            final String id = "id_"+id_i;
+            final boolean docExists = expectedDocIds.get(id_i);
+            docsAffectedThisRequest.set(id_i);
+            long versionConstraint = docExists ? 1 : -1;
+            if (causeError) {
+              versionConstraint = -1 * versionConstraint;
+              expectedErrors.add(delIErr(id));
+            } else {
+              // if doc exists it will legitimately be deleted
+              expectedDocIds.clear(id_i);
+            }
+            req.deleteById(id, versionConstraint);
+            log.info("DEL: {} = {}", id, causeError ? "ERR" : "OK" );
+          } else {
+            // delete by query
+            final String q;
+            if (causeError) {
+              // even though our DBQ is gibberish that's going to fail, record a docId as affected
+              // so that we don't generate the same random DBQ and get redundent errors
+              // (problematic because of how DUP forwarded DBQs have to have their errors deduped by TUP)
+              final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
+              docsAffectedThisRequest.set(id_i);
+              q = "foo_i:["+id_i+" TO ....giberish";
+              expectedErrors.add(delQErr(q));
+            } else {
+              // ensure our DBQ is only over a range of docs not already affected
+              // by any other cmds in this request
+              final int rangeAxis = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
+              final int loBound = docsAffectedThisRequest.previousSetBit(rangeAxis);
+              final int hiBound = docsAffectedThisRequest.nextSetBit(rangeAxis);
+              final int lo = TestUtil.nextInt(random(), loBound+1, rangeAxis);
+              final int hi = TestUtil.nextInt(random(), rangeAxis,
+                                              // bound might be negative if no set bits above axis
+                                              (hiBound < 0) ? maxDocId : hiBound-1);
+
+              if (lo != hi) {
+                assert lo < hi : "lo="+lo+" hi="+hi;
+                // NOTE: clear & set are exclusive of hi, so we use "}" in range query accordingly
+                q = "id_i:[" + lo + " TO " + hi + "}";
+                expectedDocIds.clear(lo, hi);
+                docsAffectedThisRequest.set(lo, hi);
+              } else {
+                // edge case: special case DBQ of one doc
+                assert (lo == rangeAxis && hi == rangeAxis) : "lo="+lo+" axis="+rangeAxis+" hi="+hi;
+                q = "id_i:[" + lo + " TO " + lo + "]"; // have to be inclusive of both ends
+                expectedDocIds.clear(lo);
+                docsAffectedThisRequest.set(lo);
+              }
+            }
+            req.deleteByQuery(q);
+            log.info("DEL: {}", q);
+          }
+        }
+      }
+      assertEquals("expected error count sanity check: " + req.toString(),
+                   expectedErrorsCount, expectedErrors.size());
+        
+      final SolrClient client = random().nextBoolean() ? CLOUD_CLIENT
+        : NODE_CLIENTS.get(TestUtil.nextInt(random(), 0, NODE_CLIENTS.size()-1));
+      
+      final UpdateResponse rsp = req.process(client);
+      assertUpdateTolerantErrors(client.toString() + " => " + expectedErrors.toString(), rsp,
+                                 expectedErrors.toArray(new ExpectedErr[expectedErrors.size()]));
+               
+      log.info("END ITER #{}, expecting #docs: {}", i, expectedDocIds.cardinality());
+
+      assertEquals("post update commit failed?", 0, CLOUD_CLIENT.commit().getStatus());
+      
+      for (int j = 0; j < 5; j++) {
+        if (expectedDocIds.cardinality() == countDocs(CLOUD_CLIENT)) {
+          break;
+        }
+        log.info("sleeping to give searchers a chance to re-open #" + j);
+        Thread.sleep(200);
+      }
+
+      // check the index contents against our expecationts
+      final BitSet actualDocIds = allDocs(CLOUD_CLIENT, maxDocId);
+      if ( expectedDocIds.cardinality() != actualDocIds.cardinality() ) {
+        log.error("cardinality missmatch: expected {} BUT actual {}",
+                  expectedDocIds.cardinality(),
+                  actualDocIds.cardinality());
+      }
+      final BitSet x = (BitSet) actualDocIds.clone();
+      x.xor(expectedDocIds);
+      for (int b = x.nextSetBit(0); 0 <= b; b = x.nextSetBit(b+1)) {
+        final boolean expectedBit = expectedDocIds.get(b);
+        final boolean actualBit = actualDocIds.get(b);
+        log.error("bit #"+b+" mismatch: expected {} BUT actual {}", expectedBit, actualBit);
+      }
+      assertEquals(x.cardinality() + " mismatched bits",
+                   expectedDocIds.cardinality(), actualDocIds.cardinality());
+    }
+  }
+
+  /** sanity check that randomUnsetBit works as expected 
+   * @see #randomUnsetBit
+   */
+  public void testSanityRandomUnsetBit() {
+    final int max = atLeast(100);
+    BitSet bits = new BitSet(max+1);
+    for (int i = 0; i <= max; i++) {
+      assertFalse("how is bitset already full? iter="+i+" card="+bits.cardinality()+"/max="+max,
+                  bits.cardinality() == max+1);
+      final int nextBit = randomUnsetBit(random(), bits, max);
+      assertTrue("nextBit shouldn't be negative yet: " + nextBit,
+                 0 <= nextBit);
+      assertTrue("nextBit can't exceed max: " + nextBit,
+                 nextBit <= max);
+      assertFalse("expect unset: " + nextBit, bits.get(nextBit));
+      bits.set(nextBit);
+    }
+    
+    assertEquals("why isn't bitset full?", max+1, bits.cardinality());
+
+    final int firstClearBit = bits.nextClearBit(0);
+    assertTrue("why is there a clear bit? = " + firstClearBit,
+               max < firstClearBit);
+    assertEquals("why is a bit set above max?",
+                 -1, bits.nextSetBit(max+1));
+    
+    assertEquals("wrong nextBit at end of all iters", -1,
+                 randomUnsetBit(random(), bits, max));
+    assertEquals("wrong nextBit at redundent end of all iters", -1,
+                 randomUnsetBit(random(), bits, max));
+  }
+  
+  public static SolrInputDocument doc(SolrInputField... fields) {
+    // SolrTestCaseJ4 has same method name, prevents static import from working
+    return TestTolerantUpdateProcessorCloud.doc(fields);
+  }
+
+  /**
+   * Given a BitSet, returns a random bit that is currently false, or -1 if all bits are true.
+   * NOTE: this method is not fair.
+   */
+  public static final int randomUnsetBit(Random r, BitSet bits, final int max) {
+    // NOTE: don't forget, BitSet will grow automatically if not careful
+    if (bits.cardinality() == max+1) {
+      return -1;
+    }
+    final int candidate = TestUtil.nextInt(r, 0, max);
+    if (bits.get(candidate)) {
+      final int lo = bits.previousClearBit(candidate);
+      final int hi = bits.nextClearBit(candidate);
+      if (lo < 0 && max < hi) {
+        fail("how the hell did we not short circut out? card="+bits.cardinality()+"/size="+bits.size());
+      } else if (lo < 0) {
+        return hi;
+      } else if (max < hi) {
+        return lo;
+      } // else...
+      return ((candidate - lo) < (hi - candidate)) ? lo : hi;
+    }
+    return candidate;
+  }
+
+  /** returns the numFound from a *:* query */
+  public static final long countDocs(SolrClient c) throws Exception {
+    return c.query(params("q","*:*","rows","0")).getResults().getNumFound();
+  }
+
+  /** uses a Cursor to iterate over every doc in the index, recording the 'id_i' value in a BitSet */
+  private static final BitSet allDocs(final SolrClient c, final int maxDocIdExpected) throws Exception {
+    BitSet docs = new BitSet(maxDocIdExpected+1);
+    String cursorMark = CURSOR_MARK_START;
+    int docsOnThisPage = Integer.MAX_VALUE;
+    while (0 < docsOnThisPage) {
+      final SolrParams p = params("q","*:*",
+                                  "rows","100",
+                                  // note: not numeric, but we don't actual care about the order
+                                  "sort", "id asc",
+                                  CURSOR_MARK_PARAM, cursorMark);
+      QueryResponse rsp = c.query(p);
+      cursorMark = rsp.getNextCursorMark();
+      docsOnThisPage = 0;
+      for (SolrDocument doc : rsp.getResults()) {
+        docsOnThisPage++;
+        int id_i = ((Integer)doc.get("id_i")).intValue();
+        assertTrue("found id_i bigger then expected "+maxDocIdExpected+": " + id_i,
+                   id_i <= maxDocIdExpected);
+        docs.set(id_i);
+      }
+      cursorMark = rsp.getNextCursorMark();
+    }
+    return docs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f051f56b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestBadConfig.java b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
index 637c975..31361be 100644
--- a/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
+++ b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
@@ -96,4 +96,9 @@ public class TestBadConfig extends AbstractBadConfigTestBase {
     assertConfigs("bad-solrconfig-unexpected-schema-attribute.xml", "schema-minimal.xml",
                   "Unexpected arg(s): {bogusParam=bogusValue}");
   }
+
+  public void testTolerantUpdateProcessorNoUniqueKey() throws Exception {
+    assertConfigs("solrconfig-tolerant-update-minimal.xml", "schema-minimal.xml",
+                  "requires a schema that includes a uniqueKey field");
+  }
 }