You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2019/04/17 23:51:44 UTC

[lucene-solr] branch branch_7_7 updated: SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements

This is an automated email from the ASF dual-hosted git repository.

erick pushed a commit to branch branch_7_7
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_7_7 by this push:
     new ac38f23  SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements
ac38f23 is described below

commit ac38f23db2133fcd16e2c81c44ca1695de2718b3
Author: erick <er...@gmail.com>
AuthorDate: Wed Apr 17 16:51:36 2019 -0700

    SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements
---
 solr/CHANGES.txt                                   |   2 +
 .../org/apache/solr/handler/StreamHandler.java     |  60 ++--
 .../solr/handler/admin/DaemonStreamApiTest.java    | 319 +++++++++++++++++++++
 .../solr/client/solrj/io/stream/DaemonStream.java  |   7 +-
 .../solr/client/solrj/io/stream/StreamingTest.java |  91 +++---
 5 files changed, 415 insertions(+), 64 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d8fd618..a754c1b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -115,6 +115,8 @@ Bug Fixes
   this issue reverts the default replica placement policy to the 'legacy' assignment policy that was the default until
   Solr 7.4. (Gus Heck, Andrzej Bialecki, Bram Van Dam, shalin)
 
+* SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements (Erick Erickson)
+
 ==================  7.7.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index a447093..b670123 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -192,33 +193,46 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
   }
 
   private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
-    String action = params.get("action");
-    if ("stop".equalsIgnoreCase(action)) {
-      String id = params.get(ID);
-      DaemonStream d = daemons.get(id);
-      if (d != null) {
+    String action = params.get("action").toLowerCase(Locale.ROOT).trim();
+
+    if ("list".equals(action)) {
+      Collection<DaemonStream> vals = daemons.values();
+      rsp.add("result-set", new DaemonCollectionStream(vals));
+      return;
+    }
+
+    String id = params.get(ID);
+    DaemonStream d = daemons.get(id);
+    if (d == null) {
+      rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
+      return;
+    }
+
+    switch (action) {
+      case "stop":
         d.close();
         rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " stopped on " + coreName));
-      } else {
-        rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
-      }
-    } else {
-      if ("start".equalsIgnoreCase(action)) {
-        String id = params.get(ID);
-        DaemonStream d = daemons.get(id);
-        d.open();
-        rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " started on " + coreName));
-      } else if ("list".equalsIgnoreCase(action)) {
-        Collection<DaemonStream> vals = daemons.values();
-        rsp.add("result-set", new DaemonCollectionStream(vals));
-      } else if ("kill".equalsIgnoreCase(action)) {
-        String id = params.get("id");
-        DaemonStream d = daemons.remove(id);
-        if (d != null) {
-          d.close();
+        break;
+
+      case "start":
+        try {
+          d.open();
+        } catch (IOException e) {
+          rsp.add("result-set", new DaemonResponseStream("Daemon: " + id + " error: " + e.getMessage()));
         }
+        rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " started on " + coreName));
+        break;
+
+      case "kill":
+        daemons.remove(id);
+        d.close(); // we already found it in the daemons list, so we don't need to verify we removed it.
         rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
-      }
+        break;
+
+       default:
+         rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " action '"
+             + action + "' not recognized on " + coreName));
+         break;
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
new file mode 100644
index 0000000..b04b749
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.handler.admin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.SolrStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.handler.TestSQLHandler;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DaemonStreamApiTest extends SolrTestCaseJ4 {
+
+  private MiniSolrCloudCluster cluster;
+
+  private static final String SOURCE_COLL = "sourceColl";
+  private static final String TARGET_COLL = "targetColl";
+  private static final String CHECKPOINT_COLL = "checkpointColl";
+
+  private static final String DAEMON_ROOT = "daemon";
+  private static final String CONF_NAME = "conf";
+
+  private static final String DAEMON_OP = "DaemonOp";
+
+  // We want 2-5 daemons. Choose one of them to start/stop/kill to catch any off-by-one or other bookeeping errors.
+  final int numDaemons = random().nextInt(3) + 2;
+  String daemonOfInterest;
+
+  List<String> daemonNames = new ArrayList<>();
+
+  private String url;
+
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    cluster = new MiniSolrCloudCluster(1, createTempDir(), buildJettyConfig("/solr"));
+
+    url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + CHECKPOINT_COLL;
+
+    cluster.uploadConfigSet(configset("cloud-minimal"), CONF_NAME);
+    // create a single shard, single replica collection. This is necessary until SOLR-13245 since the commands
+    // don't look in all replicas.
+    CollectionAdminRequest.createCollection(SOURCE_COLL, CONF_NAME, 1, 1)
+        .setMaxShardsPerNode(1)
+        .process(cluster.getSolrClient());
+
+    CollectionAdminRequest.createCollection(TARGET_COLL, CONF_NAME, 1, 1)
+        .setMaxShardsPerNode(1)
+        .process(cluster.getSolrClient());
+
+    CollectionAdminRequest.createCollection(CHECKPOINT_COLL, CONF_NAME, 1, 1)
+        .setMaxShardsPerNode(1)
+        .process(cluster.getSolrClient());
+
+    for (int idx = 0; idx < numDaemons; ++idx) {
+      String name = DAEMON_ROOT + idx;
+      daemonNames.add(name);
+    }
+    daemonOfInterest = daemonNames.get(random().nextInt(numDaemons));
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    cluster.shutdown();
+    super.tearDown();
+  }
+
+  @Test
+  public void testAPIs() throws IOException, SolrServerException, InterruptedException {
+
+    checkCmdsNoDaemon(daemonOfInterest); // test no daemon defined
+
+    // Now create all our daemons.
+    for (String name : daemonNames) {
+      createDaemon(DAEMON_DEF.replace("DAEMON_NAME", name), name);
+    }
+
+    List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
+    assertEquals("Should have all daemons listed", numDaemons, tuples.size());
+
+    for (int idx = 0; idx < numDaemons; ++idx) {
+      assertEquals("Daemon should be running ", tuples.get(idx).getString("id"), daemonNames.get(idx));
+    }
+
+    // Are all the daemons in a good state?
+    for (String daemon : daemonNames) {
+      checkAlive(daemon);
+    }
+
+    // We shouldn't be able to open a daemon twice without closing., leads to thread leeks.
+    Tuple tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest)
+        , DAEMON_OP);
+    assertTrue("Should not open twice without closing",
+        tupleOfInterest.getString(DAEMON_OP).contains("There is already an open daemon named"));
+
+    // Try stopping and check return.
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonOfInterest),
+        DAEMON_OP);
+    assertTrue("Should have been able to stop the daemon",
+        tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " stopped"));
+    checkStopped();
+
+    // Are all the daemons alive? NOTE: a stopped daemon is still there, but in a TERMINATED state
+    for (String daemon : daemonNames) {
+      if (daemon.equals(daemonOfInterest) == false) {
+        checkAlive(daemon);
+      }
+    }
+
+    // Try starting and check return.
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest),
+        DAEMON_OP);
+    assertTrue("Should have been able to start the daemon",
+        tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " started"));
+
+    // Are all the daemons alive?
+    for (String daemon : daemonNames) {
+      checkAlive(daemon);
+    }
+
+    // Try killing a daemon, it should be removed from lists.
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonOfInterest),
+        DAEMON_OP);
+    assertTrue("Daemon should have been killed",
+        tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " killed"));
+
+    // Loop for a bit, waiting for the daemon to be removed from the list of possible entries.
+    checkDaemonKilled(daemonOfInterest);
+
+    // Should not be able to start a killed daemon
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest),
+        DAEMON_OP);
+    assertTrue("Daemon should not be found",
+        tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
+
+    // Should not be able to sop a killed daemon
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonOfInterest),
+        DAEMON_OP);
+    assertTrue("Daemon should not be found",
+        tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
+
+    // Should not be able to kill a killed daemon
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonOfInterest),
+        DAEMON_OP);
+    assertTrue("Daemon should not be found",
+        tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
+
+
+    // Let's bring the killed daemon back and see if it returns in our lists. Use the method that loops a bit to check
+    // in case there's a delay.
+    createDaemon(DAEMON_DEF.replace("DAEMON_NAME", daemonOfInterest), daemonOfInterest);
+    checkAlive(daemonOfInterest);
+
+    // Now kill them all so the threads disappear.
+    for (String daemon : daemonNames) {
+        getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemon));
+        checkDaemonKilled(daemon);
+    }
+  }
+
+  // There can be some delay while threads stabilize, so we need to loop;
+  private void checkAlive(String daemonName) throws InterruptedException, IOException {
+    TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+    while (timeout.hasTimedOut() == false) {
+      Tuple tuple = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "list"),
+          daemonName);
+      String state = tuple.getString("state");
+      if (state.equals("RUNNABLE") || state.equals("WAITING") || state.equals("TIMED_WAITING")) {
+        return;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+    }
+    fail("State for daemon '" + daemonName + "' did not become RUNNABLE, WAITING or TIMED_WAITING in 10 seconds");
+  }
+  // There can be some delay while threads stabilize, so we need to loop. Evenutally, the statu of a stopped
+  // thread should be "TERMINATED"
+  private void checkStopped() throws InterruptedException, IOException {
+    TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+    while (timeout.hasTimedOut() == false) {
+      Tuple tuple = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "list"),
+          daemonOfInterest);
+      if (tuple.getString("state").equals("TERMINATED")) {
+        return;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+    }
+    fail("State for daemon '" + daemonOfInterest + "' did not become TERMINATED in 10 seconds");
+  }
+
+  private void checkDaemonKilled(String daemon) throws IOException, InterruptedException {
+    TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+    while (timeout.hasTimedOut() == false) {
+      List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
+      Boolean foundIt = false;
+      for (Tuple tuple : tuples) {
+        if (tuple.get("id").equals(daemon)) {
+          foundIt = true;
+        }
+      }
+      if (foundIt == false) return;
+      TimeUnit.MILLISECONDS.sleep(100);
+    }
+    fail("'" + daemonOfInterest + "' did not disappear in 10 seconds");
+  }
+  private void createDaemon(String daemonDef, String errMsg) throws IOException, SolrServerException {
+    SolrClient client = cluster.getSolrClient();
+    // create a daemon
+    QueryResponse resp = client.query(CHECKPOINT_COLL, TestSQLHandler.mapParams("expr", daemonDef, "qt", "/stream"));
+    assertEquals(errMsg, 0, resp.getStatus());
+
+    // This should close and replace the current daemon and NOT leak threads.
+    resp = client.query(CHECKPOINT_COLL, TestSQLHandler.mapParams("expr", daemonDef, "qt", "/stream"));
+    assertEquals(errMsg, 0, resp.getStatus());
+  }
+
+  private void checkCmdsNoDaemon(String daemonName) throws IOException {
+
+    List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
+    assertEquals("List should be empty", 0, tuples.size());
+
+    Tuple tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonName),
+        "DaemonOp");
+    assertTrue("Start for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
+
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonName),
+        "DaemonOp");
+    assertTrue("Stop for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
+
+    tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonName),
+        "DaemonOp");
+
+    assertTrue("Kill for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
+  }
+
+  // It's _really_ useful to have the tuples sorted....
+  private List<Tuple> getTuples(final SolrParams params) throws IOException {
+    return getTuples(params, null);
+  }
+
+  private List<Tuple> getTuples(final SolrParams params, String ofInterest) throws IOException {
+    //log.info("Tuples from params: {}", params);
+    TupleStream tupleStream = new SolrStream(url, params);
+
+    tupleStream.open();
+    List<Tuple> tuples = new ArrayList<>();
+    for (; ; ) {
+      Tuple t = tupleStream.read();
+      //log.info(" ... {}", t.fields);
+      if (t.EOF) {
+        break;
+      } else if (ofInterest == null || t.getString("id").equals(ofInterest) || t.getString(ofInterest).equals("null") == false) {
+        // a failed return is a bit different, the onlyh key is DaemonOp
+        tuples.add(t);
+      }
+    }
+    tupleStream.close();
+    Collections.sort(tuples, (o1, o2) -> (o1.getString("id").compareTo(o2.getString("id"))));
+    return tuples;
+  }
+
+  private Tuple getTupleOfInterest(final SolrParams params, String ofInterest) throws IOException {
+    List<Tuple> tuples = getTuples(params, ofInterest);
+    if (tuples.size() != 1) {
+      fail("Should have found a tuple for tuple of interest: " + ofInterest);
+    }
+    return tuples.get(0);
+  }
+
+  private static String DAEMON_DEF =
+      "  daemon(id=\"DAEMON_NAME\"," +
+          "    runInterval=\"1000\"," +
+          "    terminate=\"false\"," +
+          "    update(targetColl," +
+          "      batchSize=100," +
+          "      topic(checkpointColl," +
+          "        sourceColl," +
+          "        q=\"*:*\"," +
+          "        fl=\"id\"," +
+          "        id=\"topic1\"," +
+          "        initialCheckpoint=0)" +
+          "))";
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 9d02ec2..b1aafff 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -194,7 +194,12 @@ public class DaemonStream extends TupleStream implements Expressible {
     return id;
   }
 
-  public void open() {
+  public void open() throws IOException {
+    if (this.streamRunner != null && this.closed == false) {
+      log.error("There is already a running daemon named '{}', no action taken", id);
+      throw new IOException("There is already an open daemon named '" + id + "', no action taken.");
+    }
+    this.closed = false;
     this.streamRunner = new StreamRunner(runInterval, id);
     this.streamRunner.start();
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 003f01b..50739bd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -1739,58 +1739,69 @@ public void testParallelRankStream() throws Exception {
       daemonStream.setStreamContext(context);
 
       daemonStream.open();
+      CheckDaemonStream(context, daemonStream);
 
-      // Wait for the checkpoint
-      JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
-
+      // We should get an error if we try to open an already-open stream.
+      final IOException ex = expectThrows(IOException.class, () -> {
+        daemonStream.open();
+      });
+      assertEquals("Should have an intelligible exception message", ex.getMessage(), "There is already an open daemon named 'daemon1', no action taken.");
+      daemonStream.close();
 
-      SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
-      int count = 0;
-      while (count == 0) {
-        SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
-        solrStream.setStreamContext(context);
-        List<Tuple> tuples = getTuples(solrStream);
-        count = tuples.size();
-        if (count > 0) {
-          Tuple t = tuples.get(0);
-          assertTrue(t.getLong("id") == 50000000);
-        } else {
-          System.out.println("###### Waiting for checkpoint #######:" + count);
-        }
-      }
+      // We should be able to close then re-open the stream, then close it again, see SOLR-13408
+      daemonStream.open();
+      CheckDaemonStream(context, daemonStream);
+      daemonStream.close();
+    } finally {
+      cache.close();
+    }
+  }
 
-      new UpdateRequest()
-          .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
-          .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
-          .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
-          .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
-          .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
-          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+  private void CheckDaemonStream(StreamContext context, DaemonStream daemonStream) throws IOException, SolrServerException {
+    // Wait for the checkpoint
+    JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
 
-      for (int i = 0; i < 5; i++) {
-        daemonStream.read();
+    SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
+    int count = 0;
+    while (count == 0) {
+      SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
+      solrStream.setStreamContext(context);
+      List<Tuple> tuples = getTuples(solrStream);
+      count = tuples.size();
+      if (count > 0) {
+        Tuple t = tuples.get(0);
+        assertTrue(t.getLong("id") == 50000000);
+      } else {
+        System.out.println("###### Waiting for checkpoint #######:" + count);
       }
+    }
 
-      new UpdateRequest()
-          .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
-          .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
-          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
-      for (int i = 0; i < 2; i++) {
-        daemonStream.read();
-      }
+    new UpdateRequest()
+        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+        .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+        .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-      daemonStream.shutdown();
+    for (int i = 0; i < 5; i++) {
+      daemonStream.read();
+    }
 
-      Tuple tuple = daemonStream.read();
+    new UpdateRequest()
+        .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+        .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
 
-      assertTrue(tuple.EOF);
-      daemonStream.close();
-    } finally {
-      cache.close();
+    for (int i = 0; i < 2; i++) {
+      daemonStream.read();
     }
 
+    daemonStream.shutdown();
+
+    Tuple tuple = daemonStream.read();
 
+    assertTrue(tuple.EOF);
   }
 
   @Test