You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2016/04/13 17:54:04 UTC
lucene-solr:master: SOLR-7729: ConcurrentUpdateSolrClient ignores the
collection parameter in some methods.
Repository: lucene-solr
Updated Branches:
refs/heads/master 79195a8e5 -> 0a5f7f8b5
SOLR-7729: ConcurrentUpdateSolrClient ignores the collection parameter in some methods.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0a5f7f8b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0a5f7f8b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0a5f7f8b
Branch: refs/heads/master
Commit: 0a5f7f8b5e35a053031cc89b40e7c315cfcef82d
Parents: 79195a8
Author: markrmiller <ma...@apache.org>
Authored: Wed Apr 13 10:55:21 2016 -0400
Committer: markrmiller <ma...@apache.org>
Committed: Wed Apr 13 10:55:21 2016 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../solrj/impl/ConcurrentUpdateSolrClient.java | 65 +++++++++++++---
.../impl/ConcurrentUpdateSolrClientTest.java | 78 +++++++++++++++++++-
3 files changed, 132 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0a5f7f8b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index fbe4698..fc50468 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -111,6 +111,9 @@ Bug Fixes
* SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe. (Scott Blum, hoss,
sarowe, Erick Erickson, Mark Miller, shalin)
+* SOLR-7729: ConcurrentUpdateSolrClient ignores the collection parameter in some methods.
+ (Nicolas Gavalda, Jorge Luis Betancourt Gonzalez via Mark Miller)
+
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0a5f7f8b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index 2551957..f209672 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -77,7 +77,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private HttpSolrClient client;
- final BlockingQueue<UpdateRequest> queue;
+ final BlockingQueue<Update> queue;
final ExecutorService scheduler;
final Queue<Runner> runners;
volatile CountDownLatch lock = null; // used to block everything
@@ -224,15 +224,15 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
InputStream rspBody = null;
try {
- final UpdateRequest updateRequest =
+ final Update update =
queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
- if (updateRequest == null)
+ if (update == null)
break;
String contentType = client.requestWriter.getUpdateContentType();
final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
- final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+ final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
EntityTemplate template = new EntityTemplate(new ContentProducer() {
@@ -242,11 +242,12 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
if (isXml) {
out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
}
- UpdateRequest req = updateRequest;
- while (req != null) {
+ Update upd = update;
+ while (upd != null) {
+ UpdateRequest req = upd.getRequest();
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
- queue.add(req); // params are different, push back to queue
+ queue.add(upd); // params are different, push back to queue
break;
}
@@ -274,9 +275,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
// no need to wait to see another doc in the queue if we've hit the last doc in a batch
- req = queue.poll(0, TimeUnit.MILLISECONDS);
+ upd = queue.poll(0, TimeUnit.MILLISECONDS);
} else {
- req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
}
@@ -298,7 +299,11 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
requestParams.set(CommonParams.WT, client.parser.getWriterType());
requestParams.set(CommonParams.VERSION, client.parser.getVersion());
- method = new HttpPost(client.getBaseURL() + "/update"
+ String basePath = client.getBaseURL();
+ if (update.getCollection() != null)
+ basePath += "/" + update.getCollection();
+
+ method = new HttpPost(basePath + "/update"
+ requestParams.toQueryString());
org.apache.http.client.config.RequestConfig.Builder requestConfigBuilder = HttpClientUtil.createDefaultRequestConfigBuilder();
@@ -373,6 +378,41 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
}
+ /**
+ * Class representing an UpdateRequest and an optional collection.
+ */
+ class Update {
+ UpdateRequest request;
+ String collection;
+ /**
+ *
+ * @param request the update request.
+ * @param collection The collection, can be null.
+ */
+ public Update(UpdateRequest request, String collection) {
+ this.request = request;
+ this.collection = collection;
+ }
+ /**
+ * @return the update request.
+ */
+ public UpdateRequest getRequest() {
+ return request;
+ }
+ public void setRequest(UpdateRequest request) {
+ this.request = request;
+ }
+ /**
+ * @return the collection, can be null.
+ */
+ public String getCollection() {
+ return collection;
+ }
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+ }
+
@Override
public NamedList<Object> request(final SolrRequest request, String collection)
throws SolrServerException, IOException {
@@ -415,7 +455,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
tmpLock.await();
}
- boolean success = queue.offer(req);
+ Update update = new Update(req, collection);
+ boolean success = queue.offer(update);
for (;;) {
synchronized (runners) {
@@ -448,7 +489,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
// start more runners.
//
if (!success) {
- success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
+ success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
}
}
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0a5f7f8b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
index 5e2baeb..de728bd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
@@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.impl;
import org.apache.http.HttpResponse;
import org.apache.solr.SolrJettyTestBase;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -39,7 +41,6 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -187,16 +188,86 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
TestServlet.numDocsRcvd.get() == expectedDocs);
}
+ @Test
+ public void testCollectionParameters() throws IOException, SolrServerException {
+
+ int cussThreadCount = 2;
+ int cussQueueSize = 10;
+
+ try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString(), cussQueueSize, cussThreadCount)) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "collection");
+ concurrentClient.add("collection1", doc);
+ concurrentClient.commit("collection1");
+
+ assertEquals(1, concurrentClient.query("collection1", new SolrQuery("id:collection")).getResults().getNumFound());
+ }
+
+ try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString() + "/collection1", cussQueueSize, cussThreadCount)) {
+ assertEquals(1, concurrentClient.query(new SolrQuery("id:collection")).getResults().getNumFound());
+ }
+
+ }
+
+ @Test
+ public void testConcurrentCollectionUpdate() throws Exception {
+
+ int cussThreadCount = 2;
+ int cussQueueSize = 100;
+ int numDocs = 100;
+ int numRunnables = 5;
+ int expected = numDocs * numRunnables;
+
+ try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString(), cussQueueSize, cussThreadCount)) {
+ concurrentClient.setPollQueueTime(0);
+
+ // ensure it doesn't block where there's nothing to do yet
+ concurrentClient.blockUntilFinished();
+
+ // Delete all existing documents.
+ concurrentClient.deleteByQuery("collection1", "*:*");
+
+ int poolSize = 5;
+ ExecutorService threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrjNamedThreadFactory("testCUSS"));
+
+ for (int r=0; r < numRunnables; r++)
+ threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient, "collection1"));
+
+ // ensure all docs are sent
+ threadPool.awaitTermination(5, TimeUnit.SECONDS);
+ threadPool.shutdown();
+
+ concurrentClient.commit("collection1");
+
+ assertEquals(expected, concurrentClient.query("collection1", new SolrQuery("*:*")).getResults().getNumFound());
+
+ // wait until all requests are processed by CUSS
+ concurrentClient.blockUntilFinished();
+ concurrentClient.shutdownNow();
+ }
+
+ try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString() + "/collection1", cussQueueSize, cussThreadCount)) {
+ assertEquals(expected, concurrentClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+ }
+
+ }
+
class SendDocsRunnable implements Runnable {
private String id;
private int numDocs;
private ConcurrentUpdateSolrClient cuss;
+ private String collection;
SendDocsRunnable(String id, int numDocs, ConcurrentUpdateSolrClient cuss) {
+ this(id, numDocs, cuss, null);
+ }
+
+ SendDocsRunnable(String id, int numDocs, ConcurrentUpdateSolrClient cuss, String collection) {
this.id = id;
this.numDocs = numDocs;
this.cuss = cuss;
+ this.collection = collection;
}
@Override
@@ -208,7 +279,10 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
UpdateRequest req = new UpdateRequest();
req.add(doc);
try {
- cuss.request(req);
+ if (this.collection == null)
+ cuss.request(req);
+ else
+ cuss.request(req, this.collection);
} catch (Throwable t) {
t.printStackTrace();
}