You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2020/02/05 17:49:35 UTC

[lucene-solr] branch master updated: SOLR-14241: New delete() Stream Decorator

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new c5d0391  SOLR-14241: New delete() Stream Decorator
c5d0391 is described below

commit c5d0391df9c821dc842287d8c769c6f73275a423
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Wed Feb 5 10:49:24 2020 -0700

    SOLR-14241: New delete() Stream Decorator
---
 solr/CHANGES.txt                                   |   2 +
 .../src/stream-decorator-reference.adoc            |  44 +++-
 .../java/org/apache/solr/client/solrj/io/Lang.java |   3 +-
 .../solr/client/solrj/io/stream/DeleteStream.java  | 112 ++++++++++
 .../solr/client/solrj/io/stream/UpdateStream.java  |  50 ++++-
 .../solr/configsets/streaming/conf/solrconfig.xml  |   5 +
 .../org/apache/solr/client/solrj/io/TestLang.java  |   2 +-
 .../solrj/io/stream/CloudAuthStreamTest.java       | 233 ++++++++++++++++++++-
 .../solrj/io/stream/StreamDecoratorTest.java       | 155 +++++++++++++-
 9 files changed, 589 insertions(+), 17 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a176c08..57d5382 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -172,6 +172,8 @@ New Features
 
  * SOLR-14242: HdfsDirectory now supports indexing geo-points, ranges or shapes. (Adrien Grand)
 
+ * SOLR-14241: New delete() Stream Decorator (hossman)
+ 
 Improvements
 ---------------------
 * SOLR-14120: Define JavaScript methods 'includes' and 'startsWith' to ensure AdminUI can be displayed when using
diff --git a/solr/solr-ref-guide/src/stream-decorator-reference.adoc b/solr/solr-ref-guide/src/stream-decorator-reference.adoc
index ab74884..d759c0b 100644
--- a/solr/solr-ref-guide/src/stream-decorator-reference.adoc
+++ b/solr/solr-ref-guide/src/stream-decorator-reference.adoc
@@ -595,6 +595,45 @@ while(true) {
 daemonStream.close();
 ----
 
+== delete
+
+The `delete` function wraps another functions and uses the `id` and `\_version_` values found to sends the tuples to a SolrCloud collection as <<uploading-data-with-index-handlers.adoc#delete-operations,Delete By Id>> commands.
+
+This is similar to the `<<#update,update()>>` function described below.
+
+=== delete Parameters
+
+* `destinationCollection`: (Mandatory) The collection where the tuples will deleted.
+* `batchSize`: (Mandatory) The indexing batch size.
+* `pruneVersionField`: (Optional, defaults to `false`) Wether to prune `\_version_` values from tuples
+* `StreamExpression`: (Mandatory)
+
+=== delete Syntax
+
+[source,text]
+----
+ delete(collection1
+        batchSize=500,
+        search(collection1,
+               q=old_data:true,
+               qt="/export",
+               fl="id",
+               sort="a_f asc, a_i asc"))
+
+----
+
+The example above consumes the tuples returned by the `search` function against `collection1` and converts the `id` value of each document found into a delete request against the same `collection1`.
+
+[NOTE]
+====
+Unlike the `update()` function, `delete()` defaults to `pruneVersionField=false` -- preserving any `\_version_` values found in the inner stream when converting the tuples to "Delete By ID" requests, to ensure that using this stream will not (by default) result in deleting any documents that were updated _after_ the `search(...)` was executed, but _before_ the `delete(...)` processed that tuple (leveraging <<updating-parts-of-documents.adoc#optimistic-concurrency,Optimistic concurrency>> [...]
+
+Users who wish to ignore concurrent updates, and delete all matched documents should set `pruneVersionField=false` (or ensure that the inner stream tuples do not include any `\_version_` values).
+
+Users who anticipate concurrent updates, and wish to "skip" any failed deletes, should consider configuraing the {solr-javadocs}/solr-core/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.html[`TolerantUpdateProcessorFactory`]
+====
+
+
 == eval
 
 The `eval` function allows for use cases where new streaming expressions are generated on the fly and then evaluated.
@@ -1273,12 +1312,13 @@ unique(
 
 == update
 
-The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing.
+The `update` function wraps another functions and sends the tuples to a SolrCloud collection for indexing as Documents.
 
 === update Parameters
 
 * `destinationCollection`: (Mandatory) The collection where the tuples will indexed.
 * `batchSize`: (Mandatory) The indexing batch size.
+* `pruneVersionField`: (Optional, defaults to `true`) Wether to prune `\_version_` values from tuples
 * `StreamExpression`: (Mandatory)
 
 === update Syntax
@@ -1296,3 +1336,5 @@ The `update` function wraps another functions and sends the tuples to a SolrClou
 ----
 
 The example above sends the tuples returned by the `search` function to the `destinationCollection` to be indexed.
+
+Wrapping `search(...)` as showing in this example is the common case usage of this decorator: to read documents from a collection as tuples, process or modify them in some way, and then add them back to a new collection.  For this reason, `pruneVersionField=true` is the default behavior -- stripping any `\_version_` values found in the inner stream when converting the tuples to Solr documents to prevent any unexpected errors from <<updating-parts-of-documents.adoc#optimistic-concurrency, [...]
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
index 46cdb05..05ba98f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -39,6 +39,7 @@ public class Lang {
         .withFunctionName("facet", FacetStream.class)
         .withFunctionName("facet2D", Facet2DStream.class)
         .withFunctionName("update", UpdateStream.class)
+        .withFunctionName("delete", DeleteStream.class)
         .withFunctionName("jdbc", JDBCStream.class)
         .withFunctionName("topic", TopicStream.class)
         .withFunctionName("commit", CommitStream.class)
@@ -359,4 +360,4 @@ public class Lang {
         .withFunctionName("if", IfThenElseEvaluator.class)
         .withFunctionName("convert", ConversionEvaluator.class);
   }
-}
\ No newline at end of file
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
new file mode 100644
index 0000000..d6ffd55
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.SolrInputDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
+
+/**
+ * Uses tuples to identify the uniqueKey values of documents to be deleted
+ */
+public final class DeleteStream extends UpdateStream implements Expressible {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  private static final String ID_TUPLE_KEY = "id";
+
+  public DeleteStream(StreamExpression expression, StreamFactory factory) throws IOException {
+    super(expression, factory);
+  }
+
+  @Override
+  public Explanation toExplanation(StreamFactory factory) throws IOException {
+    final Explanation explanation = super.toExplanation(factory);
+    explanation.setExpression("Delete docs from " + getCollectionName());
+    
+    return explanation;
+  }
+
+  /** 
+   * {@link DeleteStream} returns <code>false</code> so that Optimistic Concurrency Constraints are 
+   * respected by default when using this stream to wrap a {@link SearchStream} query.
+   */
+  @Override
+  protected boolean defaultPruneVersionField() {
+    return false;
+  }
+  
+  /**
+   * Overrides implementation to extract the <code>"id"</code> and <code>"_version_"</code> 
+   * (if included) from each document and use that information to construct a "Delete By Id" request.  
+   * Any other fields (ie: Tuple values) are ignored.
+   */
+  @Override
+  protected void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
+    if (documentBatch.size() == 0) {
+      return;
+    }
+
+    try {
+      // convert each doc into a deleteById request...
+      final UpdateRequest req = new UpdateRequest();
+      for (SolrInputDocument doc : documentBatch) {
+        final String id = doc.getFieldValue(ID_TUPLE_KEY).toString();
+        final Long version = getVersion(doc);
+        req.deleteById(id, version);
+      }
+      req.process(getCloudSolrClient(), getCollectionName());
+    } catch (SolrServerException | NumberFormatException| IOException e) {
+      log.warn("Unable to delete documents from collection due to unexpected error.", e);
+      String className = e.getClass().getName();
+      String message = e.getMessage();
+      throw new IOException(String.format(Locale.ROOT,"Unexpected error when deleting documents from collection %s- %s:%s", getCollectionName(), className, message));
+    }
+  }
+
+  /**
+   * Helper method that can handle String values when dealing with odd 
+   * {@link Tuple} -&gt; {@link SolrInputDocument} conversions 
+   * (ie: <code>tuple(..)</code> in tests)
+   */
+  private static Long getVersion(final SolrInputDocument doc) throws NumberFormatException {
+    if (! doc.containsKey(VERSION_FIELD)) {
+      return null;
+    }
+    final Object v = doc.getFieldValue(VERSION_FIELD);
+    if (null == v) {
+      return null;
+    }
+    if (v instanceof Long) {
+      return (Long)v;
+    }
+    return Long.parseLong(v.toString());
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
index 7ea8808..5313f14 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
@@ -40,10 +40,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
 
 /**
  * Sends tuples emitted by a wrapped {@link TupleStream} as updates to a SolrCloud collection.
@@ -56,6 +56,13 @@ public class UpdateStream extends TupleStream implements Expressible {
   private String collection;
   private String zkHost;
   private int updateBatchSize;
+  /**
+   * Indicates if the {@link CommonParams#VERSION_FIELD} should be removed from tuples when converting 
+   * to Solr Documents.  
+   * May be set per expression using the <code>"pruneVersionField"</code> named operand, 
+   * defaults to the value returned by {@link #defaultPruneVersionField()} 
+   */
+  private boolean pruneVersionField;
   private int batchNumber;
   private long totalDocsIndex;
   private PushBackStream tupleSource;
@@ -64,7 +71,6 @@ public class UpdateStream extends TupleStream implements Expressible {
   private List<SolrInputDocument> documentBatch = new ArrayList();
   private String coreName;
 
-
   public UpdateStream(StreamExpression expression, StreamFactory factory) throws IOException {
     String collectionName = factory.getValueOperand(expression, 0);
     verifyCollectionName(collectionName, expression);
@@ -73,6 +79,7 @@ public class UpdateStream extends TupleStream implements Expressible {
     verifyZkHost(zkHost, collectionName, expression);
     
     int updateBatchSize = extractBatchSize(expression, factory);
+    pruneVersionField = factory.getBooleanOperand(expression, "pruneVersionField", defaultPruneVersionField());
 
     //Extract underlying TupleStream.
     List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@@ -80,7 +87,6 @@ public class UpdateStream extends TupleStream implements Expressible {
       throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
     }
     StreamExpression sourceStreamExpression = streamExpressions.get(0);
-    
     init(collectionName, factory.constructStream(sourceStreamExpression), zkHost, updateBatchSize);
   }
   
@@ -88,9 +94,10 @@ public class UpdateStream extends TupleStream implements Expressible {
     if (updateBatchSize <= 0) {
       throw new IOException(String.format(Locale.ROOT,"batchSize '%d' must be greater than 0.", updateBatchSize));
     }
+    pruneVersionField = defaultPruneVersionField();
     init(collectionName, tupleSource, zkHost, updateBatchSize);
   }
-  
+
   private void init(String collectionName, TupleStream tupleSource, String zkHost, int updateBatchSize) {
     this.collection = collectionName;
     this.zkHost = zkHost;
@@ -98,6 +105,11 @@ public class UpdateStream extends TupleStream implements Expressible {
     this.tupleSource = new PushBackStream(tupleSource);
   }
   
+  /** The name of the collection being updated */
+  protected String getCollectionName() {
+    return collection;
+  }
+
   @Override
   public void open() throws IOException {
     setCloudSolrClient();
@@ -257,6 +269,21 @@ public class UpdateStream extends TupleStream implements Expressible {
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - batchSize '%s' is not a valid integer.",expression, batchSizeStr));
     }    
   }
+
+  /**
+   * Used during initialization to specify the default value for the <code>"pruneVersionField"</code> option.
+   * {@link UpdateStream} returns <code>true</code> for backcompat and to simplify slurping of data from one
+   * collection to another.
+   */
+  protected boolean defaultPruneVersionField() {
+    return true;
+  }
+  
+  /** Only viable after calling {@link #open} */
+  protected CloudSolrClient getCloudSolrClient() {
+    assert null != this.cloudSolrClient;
+    return this.cloudSolrClient;
+  }
   
   private void setCloudSolrClient() {
     if(this.cache != null) {
@@ -272,7 +299,8 @@ public class UpdateStream extends TupleStream implements Expressible {
   private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) {
     SolrInputDocument doc = new SolrInputDocument();
     for (Object field : tuple.fields.keySet()) {
-      if (! field.equals(VERSION_FIELD)) {
+
+      if (! (field.equals(CommonParams.VERSION_FIELD) && pruneVersionField)) {
         Object value = tuple.get(field);
         if (value instanceof List) {
           addMultivaluedField(doc, (String)field, (List<Object>)value);
@@ -292,7 +320,11 @@ public class UpdateStream extends TupleStream implements Expressible {
     }
   }
   
-  private void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
+  /**
+   * This method will be called on every batch of tuples comsumed, after converting each tuple 
+   * in that batch to a Solr Input Document.
+   */
+  protected void uploadBatchToCollection(List<SolrInputDocument> documentBatch) throws IOException {
     if (documentBatch.size() == 0) {
       return;
     }
@@ -300,6 +332,12 @@ public class UpdateStream extends TupleStream implements Expressible {
     try {
       cloudSolrClient.add(collection, documentBatch);
     } catch (SolrServerException | IOException e) {
+      // TODO: it would be nice if there was an option to "skipFailedBatches"
+      // TODO: and just record the batch failure info in the summary tuple for that batch and continue
+      //
+      // TODO: The summary batches (and/or stream error) should also pay attention to the error metadata
+      // from the SolrServerException ... and ideally also any TolerantUpdateProcessor metadata
+
       log.warn("Unable to add documents to collection due to unexpected error.", e);
       String className = e.getClass().getName();
       String message = e.getMessage();
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml
index 0e13a5a..e7e5c1e 100644
--- a/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/streaming/conf/solrconfig.xml
@@ -35,6 +35,11 @@
     </updateLog>
   </updateHandler>
 
+  <updateRequestProcessorChain default="true">
+    <!-- be tolerant of errors for testing optimistic concurrency of delete() stream -->
+    <processor class="solr.TolerantUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
 
   <requestDispatcher>
     <requestParsers enableRemoteStreaming="false" multipartUploadLimitInKB="-1" />
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 025f6b2..459626e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -45,7 +45,7 @@ import org.junit.Test;
 public class TestLang extends SolrTestCase {
 
   private static final String[] allFunctions = {
-      "search", "facet", "facet2D", "update", "jdbc", "topic", "commit", "random", "knnSearch", "merge",
+      "search", "facet", "facet2D", "update", "delete", "jdbc", "topic", "commit", "random", "knnSearch", "merge",
       "unique", "top", "group", "reduce", "parallel", "rollup", "stats", "innerJoin",
       "leftOuterJoin", "hashJoin", "outerHashJoin", "intersect", "complement", "sort",
       "train", "features", "daemon", "shortestPath", "gatherNodes", "nodes",
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
index b360278..cb6c209 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
@@ -246,7 +246,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
     final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
                                                  params("qt", "/stream", "expr",
                                                         "update("+COLLECTION_X+",batchSize=1," +
-                                                        "tuple(id='42',a_i=1,b_i=5))"));
+                                                        "tuple(id=42,a_i=1,b_i=5))"));
     solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
     final List<Tuple> tuples = getTuples(solrStream);
     assertEquals(1, tuples.size());
@@ -259,7 +259,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
     final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
                                                  params("qt", "/stream", "expr",
                                                         "update("+COLLECTION_X+",batchSize=1," +
-                                                        "tuple(id='42',a_i=1,b_i=5))"));
+                                                        "tuple(id=42,a_i=1,b_i=5))"));
     // "WRITE" credentials should be required for 'update(...)'
     solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD");
     
@@ -278,7 +278,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
       final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
                                                    params("qt", "/stream", "expr",
                                                           "update("+COLLECTION_X+",batchSize=1," +
-                                                          "tuple(id='42',a_i=1,b_i=5))"));
+                                                          "tuple(id=42,a_i=1,b_i=5))"));
       
       solrStream.setCredentials(user, user);
     
@@ -296,7 +296,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
       final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y,
                                                    params("qt", "/stream", "expr",
                                                           "update("+COLLECTION_X+",batchSize=1," +
-                                                          "tuple(id='42',a_i=1,b_i=5))"));
+                                                          "tuple(id=42,a_i=1,b_i=5))"));
       solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
       final List<Tuple> tuples = getTuples(solrStream);
       assertEquals(1, tuples.size());
@@ -318,7 +318,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
         + "       search("+COLLECTION_Y+",                          "
         + "              q=\"foo_i:[* TO 10]\",                     " // 10 matches = 1 batch
         + "              rows=100,                                  "
-        + "              fl=\"id,foo_i\",                           "
+        + "              fl=\"id,foo_i,_version_\",                 " // pruneVersionField default true
         + "              sort=\"foo_i desc\"))                      "
         ;
       
@@ -370,7 +370,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
       final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
                                                    params("qt", "/stream", "expr",
                                                           "update("+COLLECTION_X+",batchSize=1," +
-                                                          "tuple(id='42',a_i=1,b_i=5))"));
+                                                          "tuple(id=42,a_i=1,b_i=5))"));
       solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER);
     
       // NOTE: Can't make any assertions about Exception: SOLR-14226
@@ -386,7 +386,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
       final String expr
         = "executor(threads=1,                                              "
         + "         tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5,    "
-        + "                               tuple(id='42',a_i=1,b_i=5))       "
+        + "                               tuple(id=42,a_i=1,b_i=5))       "
         + "                      \"))                                       "
         ;
     final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
@@ -554,7 +554,226 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
                  0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
     
   }
+
+  public void testSimpleDeleteStream() throws Exception {
+    assertEquals(0,
+                 (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
+                  .add(sdoc("id", "42"))
+                  .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    
+    final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+                                                 params("qt", "/stream", "expr",
+                                                        "delete("+COLLECTION_X+",batchSize=1," +
+                                                        "tuple(id=42))"));
+    solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+    final List<Tuple> tuples = getTuples(solrStream);
+    assertEquals(1, tuples.size());
+    assertEquals(1L, tuples.get(0).get("totalIndexed"));
+    
+    assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+
+  }
+
+  /** A simple "Delete by Query" example */
+  public void testSimpleDeleteStreamByQuery() throws Exception {
+    { // Put some "real" docs directly to both X...
+      final UpdateRequest update = setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER);
+      for (int i = 1; i <= 42; i++) {
+        update.add(sdoc("id",i+"x","foo_i",""+i));
+      }
+      assertEquals("initial docs in X",
+                   0, update.commit(cluster.getSolrClient(), COLLECTION_X).getStatus());
+    }
+    
+    assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    
+    { // WRITE_X user should be able to delete X via a query from X
+      final String expr
+        = "delete("+COLLECTION_X+", batchSize=5,                    " // note batch size
+        + "       search("+COLLECTION_X+",                          "
+        + "              q=\"foo_i:[* TO 10]\",                     " // 10 matches = 2 batches
+        + "              rows=100,                                  "
+        + "              fl=\"id,foo_i,_version_\",                 " // foo_i should be ignored...
+        + "              sort=\"foo_i desc\"))                      " // version constraint should be ok
+        ;
+
+      final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+                                                   params("qt", "/stream",
+                                                          "expr", expr));
+      solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+      final List<Tuple> tuples = getTuples(solrStream);
+      assertEquals(2, tuples.size());
+      assertEquals(5L, tuples.get(0).get("totalIndexed"));
+      assertEquals(10L, tuples.get(1).get("totalIndexed"));
+    }
+    
+    assertEquals(42L - 10L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+  }
+
+  
+  public void testSimpleDeleteStreamInvalidCredentials() throws Exception {
+    assertEquals(0,
+                 (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
+                  .add(sdoc("id", "42"))
+                  .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    
+    final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+                                                 params("qt", "/stream", "expr",
+                                                        "update("+COLLECTION_X+",batchSize=1," +
+                                                        "tuple(id=42))"));
+    // "WRITE" credentials should be required for 'update(...)'
+    solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD");
+    
+    // NOTE: Can't make any assertions about Exception: SOLR-14226
+    expectThrows(Exception.class, () -> {
+        final List<Tuple> ignored = getTuples(solrStream);
+      });
+    
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+  }
+
+
+  public void testSimpleDeleteStreamInsufficientCredentials() throws Exception {
+    assertEquals(0,
+                 (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
+                  .add(sdoc("id", "42"))
+                  .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    
+    // both of these users have valid credentials and authz read COLLECTION_X, but neither has
+    // authz to write to X...
+    for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
+      final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+                                                   params("qt", "/stream", "expr",
+                                                          "update("+COLLECTION_X+",batchSize=1," +
+                                                          "tuple(id=42))"));
+      
+      solrStream.setCredentials(user, user);
+    
+      // NOTE: Can't make any assertions about Exception: SOLR-14226
+      expectThrows(Exception.class, () -> {
+          final List<Tuple> ignored = getTuples(solrStream);
+        });
+    }
+    
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+  }
   
+  public void testIndirectDeleteStream() throws Exception {
+    { // Put some "real" docs directly to both X & Y...
+      final UpdateRequest xxx_Update = setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER);
+      final UpdateRequest yyy_Update = setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER);
+      for (int i = 1; i <= 42; i++) {
+        xxx_Update.add(sdoc("id",i+"z","foo_i",""+i));
+        yyy_Update.add(sdoc("id",i+"z","foo_i",""+i));
+      }
+      assertEquals("initial docs in X",
+                   0, xxx_Update.commit(cluster.getSolrClient(), COLLECTION_X).getStatus());
+      assertEquals("initial docs in Y",
+                   0, yyy_Update.commit(cluster.getSolrClient(), COLLECTION_Y).getStatus());
+    }
+    
+    assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
+    
+    { // WRITE_X user should be able to delete X via a (dummy) stream from Y...
+      final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y,
+                                                   params("qt", "/stream", "expr",
+                                                          "delete("+COLLECTION_X+",batchSize=1," +
+                                                          "tuple(id=42z))"));
+      solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+      final List<Tuple> tuples = getTuples(solrStream);
+      assertEquals(1, tuples.size());
+      assertEquals(1L, tuples.get(0).get("totalIndexed"));
+    }
+
+    assertEquals(42L - 1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
+    
+    { // WRITE_X user should be able to delete ids from X via a (search) stream from Y (routed via Y)
+      final String expr
+        = "delete("+COLLECTION_X+", batchSize=50,                   " // note batch size
+        + "       pruneVersionField=true,                           " // NOTE: ignoring Y version to del X
+        + "       search("+COLLECTION_Y+",                          "
+        + "              q=\"foo_i:[* TO 10]\",                     " // 10 matches = 1 batch
+        + "              rows=100,                                  "
+        + "              fl=\"id,foo_i,_version_\",                 " // foo_i & version should be ignored
+        + "              sort=\"foo_i desc\"))                      "
+        ;
+      
+      final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, // NOTE: Y route
+                                                   params("qt", "/stream",
+                                                          "expr", expr));
+      solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+      final List<Tuple> tuples = getTuples(solrStream);
+      assertEquals(1, tuples.size());
+      assertEquals(10L, tuples.get(0).get("batchIndexed"));
+      assertEquals(10L, tuples.get(0).get("totalIndexed"));
+
+    }
+
+    assertEquals(42L - 1L - 10L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
+      
+    { // WRITE_X user should be able to delete ids from X via a (search) stream from Y (routed via X)...
+      final String expr
+        = "delete("+COLLECTION_X+", batchSize=5,                    " // note batch size
+        + "       search("+COLLECTION_Y+",                          "
+        + "              q=\"foo_i:[30 TO *]\",                     " // 13 matches = 3 batches
+        + "              rows=100,                                  "
+        + "              fl=\"id,foo_i\",                           " // foo_i should be ignored
+        + "              sort=\"foo_i desc\"))                      "
+        ;
+      
+      final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, // NOTE: X route
+                                                   params("qt", "/stream",
+                                                          "expr", expr));
+      solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+      final List<Tuple> tuples = getTuples(solrStream);
+      assertEquals(3, tuples.size());
+      
+      assertEquals( 5L, tuples.get(0).get("batchIndexed"));
+      assertEquals( 5L, tuples.get(0).get("totalIndexed"));
+      
+      assertEquals( 5L, tuples.get(1).get("batchIndexed"));
+      assertEquals(10L, tuples.get(1).get("totalIndexed"));
+      
+      assertEquals( 3L, tuples.get(2).get("batchIndexed"));
+      assertEquals(13L, tuples.get(2).get("totalIndexed"));
+    }
+
+    assertEquals(42L - 1L - 10L - (13L - 1L), // '42' in last 13 deletes was already deleted from X
+                 commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    assertEquals(42L, commitAndCountDocsInCollection(COLLECTION_Y, WRITE_Y_USER));
+    
+  }
+
+  public void testIndirectDeleteStreamInsufficientCredentials() throws Exception {
+    assertEquals(0,
+                 (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
+                  .add(sdoc("id", "42"))
+                  .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+    
+    // regardless of how it's routed, WRITE_Y should NOT have authz to delete from X...
+    for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
+      final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
+                                                   params("qt", "/stream", "expr",
+                                                          "delete("+COLLECTION_X+",batchSize=1," +
+                                                          "tuple(id=42))"));
+      solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER);
+    
+      // NOTE: Can't make any assertions about Exception: SOLR-14226
+      expectThrows(Exception.class, () -> {
+          final List<Tuple> ignored = getTuples(solrStream);
+        });
+    }
+
+    assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+  }
+
   /**
    * Helper method that uses the specified user to (first commit, and then) count the total 
    * number of documents in the collection
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 75474a4..52aa378 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -56,8 +57,10 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.AbstractDistribZkTestBase;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.junit.Assume;
@@ -2696,7 +2699,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
     try {
       //Copy all docs to destinationCollection
-      expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))");
+      // confirm update() stream defaults to ignoring _version_ field in tuples
+      expression = StreamExpressionParser.parse("update(destinationCollection, batchSize=5, search(collection1, q=*:*, fl=\"id,_version_,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\"))");
       stream = new UpdateStream(expression, factory);
       stream.setStreamContext(streamContext);
       List<Tuple> tuples = getTuples(stream);
@@ -4267,6 +4271,155 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
     }
   }
 
+  public void testDeleteStream() throws Exception {
+    final String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
+    final SolrClient client = cluster.getSolrClient();
+    
+    { final UpdateRequest req = new UpdateRequest();
+      for (int i = 0; i < 20; i++) {
+        req.add(id, "doc_"+i, "deletable_s", "yup");
+      }
+      assertEquals(0, req.commit(cluster.getSolrClient(), COLLECTIONORALIAS).getStatus());
+    }
+
+    // fetch the _version_ param assigned each doc to test optimistic concurrency later...
+    final Map<String,Long> versions = new HashMap<>();  
+    { final QueryResponse allDocs = client.query(COLLECTIONORALIAS, params("q","deletable_s:yup",
+                                                                           "rows","100"));
+      assertEquals(20L, allDocs.getResults().getNumFound());
+      for (SolrDocument doc : allDocs.getResults()) {
+        versions.put(doc.getFirstValue("id").toString(), (Long) doc.getFirstValue("_version_"));
+      }
+    }
+                 
+    { // trivially delete 1 doc
+      final String expr
+        = "commit("+COLLECTIONORALIAS+",waitSearcher=true,     "
+        + "       delete("+COLLECTIONORALIAS+",batchSize=10,   "
+        + "              tuple(id=doc_2)))                     "
+        ;
+      final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
+      
+      final List<Tuple> tuples = getTuples(stream);
+      assertEquals(1, tuples.size());
+      assertEquals(1L, tuples.get(0).get("totalIndexed"));
+      
+      assertEquals(20L - 1L,
+                   client.query(COLLECTIONORALIAS,
+                                params("q","deletable_s:yup")).getResults().getNumFound());
+    }
+
+    { // delete 5 docs, spread across 3 batches (2 + 2 + 1)
+      final String expr
+        = "commit("+COLLECTIONORALIAS+",waitSearcher=true,          "
+        + "       delete("+COLLECTIONORALIAS+",batchSize=2,list(    " // NOTE: batch size
+        + "               tuple(id=doc_3),                          "
+        + "               tuple(id=doc_11),                         "
+        + "               tuple(id=doc_7),                          "
+        + "               tuple(id=doc_17),                         "
+        + "               tuple(id=doc_15),                         "
+        + "              ) ) )                                      "
+        ;
+      final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
+      
+      final List<Tuple> tuples = getTuples(stream);
+      assertEquals(3, tuples.size());
+      assertEquals(2L, tuples.get(0).get("totalIndexed"));
+      assertEquals(4L, tuples.get(1).get("totalIndexed"));
+      assertEquals(5L, tuples.get(2).get("totalIndexed"));
+      
+      assertEquals(20L - 1L - 5L,
+                   client.query(COLLECTIONORALIAS,
+                                params("q","deletable_s:yup")).getResults().getNumFound());
+    }
+
+    { // attempt to delete 2 docs, one with correct version, one with "stale" version that should fail
+      // but config uses TolerantUpdateProcessorFactory so batch should still be ok...
+      //
+      // It would be nice it there was a more explicit, targetted, option for update() and delete() to
+      // ensure that even if one "batch" fails it continues with other batches.
+      // See TODO in UpdateStream
+
+      final long v13_ok = versions.get("doc_13").longValue();
+      final long v10_bad = versions.get("doc_10").longValue() - 42L;
+      final String expr
+        = "commit("+COLLECTIONORALIAS+",waitSearcher=true,            "
+        + "       delete("+COLLECTIONORALIAS+",batchSize=10,list(     "
+        + "               tuple(id=doc_10,_version_="+v10_bad+"),     "
+        + "               tuple(id=doc_13,_version_="+v13_ok+"),      "
+        + "              ) ) )                                        "
+        ;
+      final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
+      
+      final List<Tuple> tuples = getTuples(stream);
+      assertEquals(1, tuples.size());
+      assertEquals(2L, tuples.get(0).get("totalIndexed"));
+
+      // should still be in the index due to version conflict...
+      assertEquals(1L, client.query(COLLECTIONORALIAS,
+                                    params("q","id:doc_10")).getResults().getNumFound());
+      // should not be in the index due to successful delete...
+      assertEquals(0L, client.query(COLLECTIONORALIAS,
+                                    params("q","id:doc_13")).getResults().getNumFound());
+      
+      assertEquals(20L - 1L - 5L - 1L,
+                   client.query(COLLECTIONORALIAS,
+                                params("q","deletable_s:yup")).getResults().getNumFound());
+    }
+
+    { // by using pruneVersionField=true we should be able to ignore optimistic concurrency constraints,
+      // and delete docs even if the stream we are wrapping returns _version_ values that are no
+      // longer valid...
+      final long v10_bad = versions.get("doc_10").longValue() - 42L;
+      final String expr
+        = "commit("+COLLECTIONORALIAS+",waitSearcher=true,            "
+        + "       delete("+COLLECTIONORALIAS+",batchSize=10,          "
+        + "              pruneVersionField=true, list(                "
+        + "               tuple(id=doc_10,_version_="+v10_bad+"),     "
+        + "              ) ) )                                        "
+        ;
+      final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
+      
+      final List<Tuple> tuples = getTuples(stream);
+      assertEquals(1, tuples.size());
+      assertEquals(1L, tuples.get(0).get("totalIndexed"));
+
+      // _version_should have been ignored and doc deleted anyway...
+      assertEquals(0L, client.query(COLLECTIONORALIAS,
+                                    params("q","id:doc_10")).getResults().getNumFound());
+      
+      assertEquals(20L - 1L - 5L - 1L - 1L,
+                   client.query(COLLECTIONORALIAS,
+                                params("q","deletable_s:yup")).getResults().getNumFound());
+    }
+
+    { // now test a "realistic" DBQ type situation, confirm all (remaining) matching docs deleted...
+      final String expr
+        = "commit("+COLLECTIONORALIAS+",waitSearcher=true,                "
+        + "       delete("+COLLECTIONORALIAS+",batchSize=99,              "
+        + "              search("+COLLECTIONORALIAS+",qt=\"/export\",     "
+        + "                     q=\"deletable_s:yup\",                    "
+        + "                     sort=\"id asc\",fl=\"id,_version_\"       "
+        + "              ) ) )                                            "
+        ;
+      final SolrStream stream = new SolrStream(url, params("qt", "/stream", "expr", expr));
+      
+      final List<Tuple> tuples = getTuples(stream);
+      assertEquals(1, tuples.size());
+      assertEquals(20L - 1L - 5L - 1L - 1L,
+                   tuples.get(0).get("totalIndexed"));
+
+      // shouldn't be anything left...
+      assertEquals(0L,
+                   client.query(COLLECTIONORALIAS,
+                                params("q","deletable_s:yup")).getResults().getNumFound());
+      
+    }
+    
+  }
+
+
+  
   protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
     List<Tuple> tuples = new ArrayList<Tuple>();