You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2016/04/13 22:12:27 UTC

lucene-solr:branch_6x: SOLR-7729: ConcurrentUpdateSolrClient ignores the collection parameter in some methods.

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 097e063af -> bf984af6f


SOLR-7729: ConcurrentUpdateSolrClient ignores the collection parameter in some methods.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/bf984af6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/bf984af6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/bf984af6

Branch: refs/heads/branch_6x
Commit: bf984af6f0824ea00eddf8732e8d4cf8323da022
Parents: 097e063
Author: markrmiller <ma...@apache.org>
Authored: Wed Apr 13 10:55:21 2016 -0400
Committer: markrmiller <ma...@apache.org>
Committed: Wed Apr 13 16:12:18 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  3 +
 .../solrj/impl/ConcurrentUpdateSolrClient.java  | 65 +++++++++++++---
 .../impl/ConcurrentUpdateSolrClientTest.java    | 78 +++++++++++++++++++-
 3 files changed, 132 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bf984af6/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7f6a573..c8271b7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -80,6 +80,9 @@ Bug Fixes
 * SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe. (Scott Blum, hoss,
   sarowe, Erick Erickson, Mark Miller, shalin)
 
+* SOLR-7729: ConcurrentUpdateSolrClient ignores the collection parameter in some methods.
+  (Nicolas Gavalda, Jorge Luis Betancourt Gonzalez via Mark Miller)
+  
 Optimizations
 ----------------------
 * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bf984af6/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index c58baad..0cae1ef 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -77,7 +77,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   private static final long serialVersionUID = 1L;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private HttpSolrClient client;
-  final BlockingQueue<UpdateRequest> queue;
+  final BlockingQueue<Update> queue;
   final ExecutorService scheduler;
   final Queue<Runner> runners;
   volatile CountDownLatch lock = null; // used to block everything
@@ -222,15 +222,15 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
 
         InputStream rspBody = null;
         try {
-          final UpdateRequest updateRequest =
+          final Update update = 
               queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
-          if (updateRequest == null)
+          if (update == null)
             break;
 
           String contentType = client.requestWriter.getUpdateContentType();
           final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
 
-          final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+          final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
 
           EntityTemplate template = new EntityTemplate(new ContentProducer() {
 
@@ -240,11 +240,12 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
                 if (isXml) {
                   out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
                 }
-                UpdateRequest req = updateRequest;
-                while (req != null) {
+                Update upd = update;
+                while (upd != null) {
+                  UpdateRequest req = upd.getRequest();
                   SolrParams currentParams = new ModifiableSolrParams(req.getParams());
                   if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
-                    queue.add(req); // params are different, push back to queue
+                    queue.add(upd); // params are different, push back to queue
                     break;
                   }
 
@@ -272,9 +273,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
 
                   if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
                     // no need to wait to see another doc in the queue if we've hit the last doc in a batch
-                    req = queue.poll(0, TimeUnit.MILLISECONDS);
+                    upd = queue.poll(0, TimeUnit.MILLISECONDS);
                   } else {
-                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+                    upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
                   }
 
                 }
@@ -296,7 +297,11 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           requestParams.set(CommonParams.WT, client.parser.getWriterType());
           requestParams.set(CommonParams.VERSION, client.parser.getVersion());
 
-          method = new HttpPost(client.getBaseURL() + "/update"
+          String basePath = client.getBaseURL();
+          if (update.getCollection() != null)
+            basePath += "/" + update.getCollection();
+
+          method = new HttpPost(basePath + "/update"
               + requestParams.toQueryString());
 
           method.setEntity(template);
@@ -361,6 +366,41 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
     }
   }
 
+  /**
+   * Class representing an UpdateRequest and an optional collection.
+   */
+  class Update {
+    UpdateRequest request;
+    String collection;
+    /**
+     * 
+     * @param request the update request.
+     * @param collection The collection, can be null.
+     */
+    public Update(UpdateRequest request, String collection) {
+      this.request = request;
+      this.collection = collection;
+    }
+    /**
+     * @return the update request.
+     */
+    public UpdateRequest getRequest() {
+      return request;
+    }
+    public void setRequest(UpdateRequest request) {
+      this.request = request;
+    }
+    /**
+     * @return the collection, can be null.
+     */
+    public String getCollection() {
+      return collection;
+    }
+    public void setCollection(String collection) {
+      this.collection = collection;
+    }
+  }
+
   @Override
   public NamedList<Object> request(final SolrRequest request, String collection)
       throws SolrServerException, IOException {
@@ -403,7 +443,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
         tmpLock.await();
       }
 
-      boolean success = queue.offer(req);
+      Update update = new Update(req, collection);
+      boolean success = queue.offer(update);
 
       for (;;) {
         synchronized (runners) {
@@ -436,7 +477,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
         // start more runners.
         //
         if (!success) {
-          success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
+          success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
         }
       }
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bf984af6/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
index 5e2baeb..de728bd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
@@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.impl;
 
 import org.apache.http.HttpResponse;
 import org.apache.solr.SolrJettyTestBase;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettyConfig;
 import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -39,7 +41,6 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -187,16 +188,86 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
         TestServlet.numDocsRcvd.get() == expectedDocs);
   }
   
+  @Test
+  public void testCollectionParameters() throws IOException, SolrServerException {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 10;
+
+    try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString(), cussQueueSize, cussThreadCount)) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "collection");
+      concurrentClient.add("collection1", doc);
+      concurrentClient.commit("collection1");
+
+      assertEquals(1, concurrentClient.query("collection1", new SolrQuery("id:collection")).getResults().getNumFound());
+    }
+
+    try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString() + "/collection1", cussQueueSize, cussThreadCount)) {
+      assertEquals(1, concurrentClient.query(new SolrQuery("id:collection")).getResults().getNumFound());
+    }
+
+  }
+
+  @Test
+  public void testConcurrentCollectionUpdate() throws Exception {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+    int numDocs = 100;
+    int numRunnables = 5;
+    int expected = numDocs * numRunnables;
+
+    try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString(), cussQueueSize, cussThreadCount)) {
+      concurrentClient.setPollQueueTime(0);
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      // Delete all existing documents.
+      concurrentClient.deleteByQuery("collection1", "*:*");
+
+      int poolSize = 5;
+      ExecutorService threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrjNamedThreadFactory("testCUSS"));
+
+      for (int r=0; r < numRunnables; r++)
+        threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient, "collection1"));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      concurrentClient.commit("collection1");
+
+      assertEquals(expected, concurrentClient.query("collection1", new SolrQuery("*:*")).getResults().getNumFound());
+
+      // wait until all requests are processed by CUSS 
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+    }
+
+    try (ConcurrentUpdateSolrClient concurrentClient = new ConcurrentUpdateSolrClient(jetty.getBaseUrl().toString() + "/collection1", cussQueueSize, cussThreadCount)) {
+      assertEquals(expected, concurrentClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+
+  }
+
   class SendDocsRunnable implements Runnable {
     
     private String id;
     private int numDocs;
     private ConcurrentUpdateSolrClient cuss;
+    private String collection;
     
     SendDocsRunnable(String id, int numDocs, ConcurrentUpdateSolrClient cuss) {
+      this(id, numDocs, cuss, null);
+    }
+    
+    SendDocsRunnable(String id, int numDocs, ConcurrentUpdateSolrClient cuss, String collection) {
       this.id = id;
       this.numDocs = numDocs;
       this.cuss = cuss;
+      this.collection = collection;
     }
 
     @Override
@@ -208,7 +279,10 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
         UpdateRequest req = new UpdateRequest();
         req.add(doc);        
         try {
-          cuss.request(req);
+          if (this.collection == null)
+            cuss.request(req);
+          else
+            cuss.request(req, this.collection);
         } catch (Throwable t) {
           t.printStackTrace();
         }