You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2019/04/17 23:51:44 UTC
[lucene-solr] branch branch_7_7 updated: SOLR-13408: Cannot
start/stop DaemonStream repeatedly, other API improvements
This is an automated email from the ASF dual-hosted git repository.
erick pushed a commit to branch branch_7_7
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_7_7 by this push:
new ac38f23 SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements
ac38f23 is described below
commit ac38f23db2133fcd16e2c81c44ca1695de2718b3
Author: erick <er...@gmail.com>
AuthorDate: Wed Apr 17 16:51:36 2019 -0700
SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements
---
solr/CHANGES.txt | 2 +
.../org/apache/solr/handler/StreamHandler.java | 60 ++--
.../solr/handler/admin/DaemonStreamApiTest.java | 319 +++++++++++++++++++++
.../solr/client/solrj/io/stream/DaemonStream.java | 7 +-
.../solr/client/solrj/io/stream/StreamingTest.java | 91 +++---
5 files changed, 415 insertions(+), 64 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d8fd618..a754c1b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -115,6 +115,8 @@ Bug Fixes
this issue reverts the default replica placement policy to the 'legacy' assignment policy that was the default until
Solr 7.4. (Gus Heck, Andrzej Bialecki, Bram Van Dam, shalin)
+* SOLR-13408: Cannot start/stop DaemonStream repeatedly, other API improvements (Erick Erickson)
+
================== 7.7.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index a447093..b670123 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -192,33 +193,46 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}
private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
- String action = params.get("action");
- if ("stop".equalsIgnoreCase(action)) {
- String id = params.get(ID);
- DaemonStream d = daemons.get(id);
- if (d != null) {
+ String action = params.get("action").toLowerCase(Locale.ROOT).trim();
+
+ if ("list".equals(action)) {
+ Collection<DaemonStream> vals = daemons.values();
+ rsp.add("result-set", new DaemonCollectionStream(vals));
+ return;
+ }
+
+ String id = params.get(ID);
+ DaemonStream d = daemons.get(id);
+ if (d == null) {
+ rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
+ return;
+ }
+
+ switch (action) {
+ case "stop":
d.close();
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " stopped on " + coreName));
- } else {
- rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
- }
- } else {
- if ("start".equalsIgnoreCase(action)) {
- String id = params.get(ID);
- DaemonStream d = daemons.get(id);
- d.open();
- rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " started on " + coreName));
- } else if ("list".equalsIgnoreCase(action)) {
- Collection<DaemonStream> vals = daemons.values();
- rsp.add("result-set", new DaemonCollectionStream(vals));
- } else if ("kill".equalsIgnoreCase(action)) {
- String id = params.get("id");
- DaemonStream d = daemons.remove(id);
- if (d != null) {
- d.close();
+ break;
+
+ case "start":
+ try {
+ d.open();
+ } catch (IOException e) {
+ rsp.add("result-set", new DaemonResponseStream("Daemon: " + id + " error: " + e.getMessage()));
}
+ rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " started on " + coreName));
+ break;
+
+ case "kill":
+ daemons.remove(id);
+ d.close(); // we already found it in the daemons list, so we don't need to verify we removed it.
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
- }
+ break;
+
+ default:
+ rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " action '"
+ + action + "' not recognized on " + coreName));
+ break;
}
}
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
new file mode 100644
index 0000000..b04b749
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.handler.admin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.SolrStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.handler.TestSQLHandler;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DaemonStreamApiTest extends SolrTestCaseJ4 {
+
+ private MiniSolrCloudCluster cluster;
+
+ private static final String SOURCE_COLL = "sourceColl";
+ private static final String TARGET_COLL = "targetColl";
+ private static final String CHECKPOINT_COLL = "checkpointColl";
+
+ private static final String DAEMON_ROOT = "daemon";
+ private static final String CONF_NAME = "conf";
+
+ private static final String DAEMON_OP = "DaemonOp";
+
+ // We want 2-5 daemons. Choose one of them to start/stop/kill to catch any off-by-one or other bookeeping errors.
+ final int numDaemons = random().nextInt(3) + 2;
+ String daemonOfInterest;
+
+ List<String> daemonNames = new ArrayList<>();
+
+ private String url;
+
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ cluster = new MiniSolrCloudCluster(1, createTempDir(), buildJettyConfig("/solr"));
+
+ url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + CHECKPOINT_COLL;
+
+ cluster.uploadConfigSet(configset("cloud-minimal"), CONF_NAME);
+ // create a single shard, single replica collection. This is necessary until SOLR-13245 since the commands
+ // don't look in all replicas.
+ CollectionAdminRequest.createCollection(SOURCE_COLL, CONF_NAME, 1, 1)
+ .setMaxShardsPerNode(1)
+ .process(cluster.getSolrClient());
+
+ CollectionAdminRequest.createCollection(TARGET_COLL, CONF_NAME, 1, 1)
+ .setMaxShardsPerNode(1)
+ .process(cluster.getSolrClient());
+
+ CollectionAdminRequest.createCollection(CHECKPOINT_COLL, CONF_NAME, 1, 1)
+ .setMaxShardsPerNode(1)
+ .process(cluster.getSolrClient());
+
+ for (int idx = 0; idx < numDaemons; ++idx) {
+ String name = DAEMON_ROOT + idx;
+ daemonNames.add(name);
+ }
+ daemonOfInterest = daemonNames.get(random().nextInt(numDaemons));
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ cluster.shutdown();
+ super.tearDown();
+ }
+
+ @Test
+ public void testAPIs() throws IOException, SolrServerException, InterruptedException {
+
+ checkCmdsNoDaemon(daemonOfInterest); // test no daemon defined
+
+ // Now create all our daemons.
+ for (String name : daemonNames) {
+ createDaemon(DAEMON_DEF.replace("DAEMON_NAME", name), name);
+ }
+
+ List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
+ assertEquals("Should have all daemons listed", numDaemons, tuples.size());
+
+ for (int idx = 0; idx < numDaemons; ++idx) {
+ assertEquals("Daemon should be running ", tuples.get(idx).getString("id"), daemonNames.get(idx));
+ }
+
+ // Are all the daemons in a good state?
+ for (String daemon : daemonNames) {
+ checkAlive(daemon);
+ }
+
+ // We shouldn't be able to open a daemon twice without closing., leads to thread leeks.
+ Tuple tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest)
+ , DAEMON_OP);
+ assertTrue("Should not open twice without closing",
+ tupleOfInterest.getString(DAEMON_OP).contains("There is already an open daemon named"));
+
+ // Try stopping and check return.
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonOfInterest),
+ DAEMON_OP);
+ assertTrue("Should have been able to stop the daemon",
+ tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " stopped"));
+ checkStopped();
+
+ // Are all the daemons alive? NOTE: a stopped daemon is still there, but in a TERMINATED state
+ for (String daemon : daemonNames) {
+ if (daemon.equals(daemonOfInterest) == false) {
+ checkAlive(daemon);
+ }
+ }
+
+ // Try starting and check return.
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest),
+ DAEMON_OP);
+ assertTrue("Should have been able to start the daemon",
+ tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " started"));
+
+ // Are all the daemons alive?
+ for (String daemon : daemonNames) {
+ checkAlive(daemon);
+ }
+
+ // Try killing a daemon, it should be removed from lists.
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonOfInterest),
+ DAEMON_OP);
+ assertTrue("Daemon should have been killed",
+ tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " killed"));
+
+ // Loop for a bit, waiting for the daemon to be removed from the list of possible entries.
+ checkDaemonKilled(daemonOfInterest);
+
+ // Should not be able to start a killed daemon
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonOfInterest),
+ DAEMON_OP);
+ assertTrue("Daemon should not be found",
+ tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
+
+ // Should not be able to sop a killed daemon
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonOfInterest),
+ DAEMON_OP);
+ assertTrue("Daemon should not be found",
+ tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
+
+ // Should not be able to kill a killed daemon
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonOfInterest),
+ DAEMON_OP);
+ assertTrue("Daemon should not be found",
+ tupleOfInterest.getString(DAEMON_OP).contains(daemonOfInterest + " not found"));
+
+
+ // Let's bring the killed daemon back and see if it returns in our lists. Use the method that loops a bit to check
+ // in case there's a delay.
+ createDaemon(DAEMON_DEF.replace("DAEMON_NAME", daemonOfInterest), daemonOfInterest);
+ checkAlive(daemonOfInterest);
+
+ // Now kill them all so the threads disappear.
+ for (String daemon : daemonNames) {
+ getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemon));
+ checkDaemonKilled(daemon);
+ }
+ }
+
+ // There can be some delay while threads stabilize, so we need to loop;
+ private void checkAlive(String daemonName) throws InterruptedException, IOException {
+ TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+ while (timeout.hasTimedOut() == false) {
+ Tuple tuple = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "list"),
+ daemonName);
+ String state = tuple.getString("state");
+ if (state.equals("RUNNABLE") || state.equals("WAITING") || state.equals("TIMED_WAITING")) {
+ return;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ fail("State for daemon '" + daemonName + "' did not become RUNNABLE, WAITING or TIMED_WAITING in 10 seconds");
+ }
+ // There can be some delay while threads stabilize, so we need to loop. Evenutally, the statu of a stopped
+ // thread should be "TERMINATED"
+ private void checkStopped() throws InterruptedException, IOException {
+ TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+ while (timeout.hasTimedOut() == false) {
+ Tuple tuple = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "list"),
+ daemonOfInterest);
+ if (tuple.getString("state").equals("TERMINATED")) {
+ return;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ fail("State for daemon '" + daemonOfInterest + "' did not become TERMINATED in 10 seconds");
+ }
+
+ private void checkDaemonKilled(String daemon) throws IOException, InterruptedException {
+ TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+ while (timeout.hasTimedOut() == false) {
+ List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
+ Boolean foundIt = false;
+ for (Tuple tuple : tuples) {
+ if (tuple.get("id").equals(daemon)) {
+ foundIt = true;
+ }
+ }
+ if (foundIt == false) return;
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ fail("'" + daemonOfInterest + "' did not disappear in 10 seconds");
+ }
+ private void createDaemon(String daemonDef, String errMsg) throws IOException, SolrServerException {
+ SolrClient client = cluster.getSolrClient();
+ // create a daemon
+ QueryResponse resp = client.query(CHECKPOINT_COLL, TestSQLHandler.mapParams("expr", daemonDef, "qt", "/stream"));
+ assertEquals(errMsg, 0, resp.getStatus());
+
+ // This should close and replace the current daemon and NOT leak threads.
+ resp = client.query(CHECKPOINT_COLL, TestSQLHandler.mapParams("expr", daemonDef, "qt", "/stream"));
+ assertEquals(errMsg, 0, resp.getStatus());
+ }
+
+ private void checkCmdsNoDaemon(String daemonName) throws IOException {
+
+ List<Tuple> tuples = getTuples(TestSQLHandler.mapParams("qt", "/stream", "action", "list"));
+ assertEquals("List should be empty", 0, tuples.size());
+
+ Tuple tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "start", "id", daemonName),
+ "DaemonOp");
+ assertTrue("Start for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
+
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "stop", "id", daemonName),
+ "DaemonOp");
+ assertTrue("Stop for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
+
+ tupleOfInterest = getTupleOfInterest(TestSQLHandler.mapParams("qt", "/stream", "action", "kill", "id", daemonName),
+ "DaemonOp");
+
+ assertTrue("Kill for daemon should not be found", tupleOfInterest.getString("DaemonOp").contains("not found on"));
+ }
+
+ // It's _really_ useful to have the tuples sorted....
+ private List<Tuple> getTuples(final SolrParams params) throws IOException {
+ return getTuples(params, null);
+ }
+
+ private List<Tuple> getTuples(final SolrParams params, String ofInterest) throws IOException {
+ //log.info("Tuples from params: {}", params);
+ TupleStream tupleStream = new SolrStream(url, params);
+
+ tupleStream.open();
+ List<Tuple> tuples = new ArrayList<>();
+ for (; ; ) {
+ Tuple t = tupleStream.read();
+ //log.info(" ... {}", t.fields);
+ if (t.EOF) {
+ break;
+ } else if (ofInterest == null || t.getString("id").equals(ofInterest) || t.getString(ofInterest).equals("null") == false) {
+ // a failed return is a bit different, the onlyh key is DaemonOp
+ tuples.add(t);
+ }
+ }
+ tupleStream.close();
+ Collections.sort(tuples, (o1, o2) -> (o1.getString("id").compareTo(o2.getString("id"))));
+ return tuples;
+ }
+
+ private Tuple getTupleOfInterest(final SolrParams params, String ofInterest) throws IOException {
+ List<Tuple> tuples = getTuples(params, ofInterest);
+ if (tuples.size() != 1) {
+ fail("Should have found a tuple for tuple of interest: " + ofInterest);
+ }
+ return tuples.get(0);
+ }
+
+ private static String DAEMON_DEF =
+ " daemon(id=\"DAEMON_NAME\"," +
+ " runInterval=\"1000\"," +
+ " terminate=\"false\"," +
+ " update(targetColl," +
+ " batchSize=100," +
+ " topic(checkpointColl," +
+ " sourceColl," +
+ " q=\"*:*\"," +
+ " fl=\"id\"," +
+ " id=\"topic1\"," +
+ " initialCheckpoint=0)" +
+ "))";
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 9d02ec2..b1aafff 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -194,7 +194,12 @@ public class DaemonStream extends TupleStream implements Expressible {
return id;
}
- public void open() {
+ public void open() throws IOException {
+ if (this.streamRunner != null && this.closed == false) {
+ log.error("There is already a running daemon named '{}', no action taken", id);
+ throw new IOException("There is already an open daemon named '" + id + "', no action taken.");
+ }
+ this.closed = false;
this.streamRunner = new StreamRunner(runInterval, id);
this.streamRunner.start();
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 003f01b..50739bd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -1739,58 +1739,69 @@ public void testParallelRankStream() throws Exception {
daemonStream.setStreamContext(context);
daemonStream.open();
+ CheckDaemonStream(context, daemonStream);
- // Wait for the checkpoint
- JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
-
+ // We should get an error if we try to open an already-open stream.
+ final IOException ex = expectThrows(IOException.class, () -> {
+ daemonStream.open();
+ });
+ assertEquals("Should have an intelligible exception message", ex.getMessage(), "There is already an open daemon named 'daemon1', no action taken.");
+ daemonStream.close();
- SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
- int count = 0;
- while (count == 0) {
- SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
- solrStream.setStreamContext(context);
- List<Tuple> tuples = getTuples(solrStream);
- count = tuples.size();
- if (count > 0) {
- Tuple t = tuples.get(0);
- assertTrue(t.getLong("id") == 50000000);
- } else {
- System.out.println("###### Waiting for checkpoint #######:" + count);
- }
- }
+ // We should be able to close then re-open the stream, then close it again, see SOLR-13408
+ daemonStream.open();
+ CheckDaemonStream(context, daemonStream);
+ daemonStream.close();
+ } finally {
+ cache.close();
+ }
+ }
- new UpdateRequest()
- .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
- .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
- .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
- .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ private void CheckDaemonStream(StreamContext context, DaemonStream daemonStream) throws IOException, SolrServerException {
+ // Wait for the checkpoint
+ JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
- for (int i = 0; i < 5; i++) {
- daemonStream.read();
+ SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
+ int count = 0;
+ while (count == 0) {
+ SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
+ solrStream.setStreamContext(context);
+ List<Tuple> tuples = getTuples(solrStream);
+ count = tuples.size();
+ if (count > 0) {
+ Tuple t = tuples.get(0);
+ assertTrue(t.getLong("id") == 50000000);
+ } else {
+ System.out.println("###### Waiting for checkpoint #######:" + count);
}
+ }
- new UpdateRequest()
- .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
- .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
-
- for (int i = 0; i < 2; i++) {
- daemonStream.read();
- }
+ new UpdateRequest()
+ .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+ .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
+ .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
+ .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- daemonStream.shutdown();
+ for (int i = 0; i < 5; i++) {
+ daemonStream.read();
+ }
- Tuple tuple = daemonStream.read();
+ new UpdateRequest()
+ .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- assertTrue(tuple.EOF);
- daemonStream.close();
- } finally {
- cache.close();
+ for (int i = 0; i < 2; i++) {
+ daemonStream.read();
}
+ daemonStream.shutdown();
+
+ Tuple tuple = daemonStream.read();
+ assertTrue(tuple.EOF);
}
@Test