You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2015/05/20 03:03:24 UTC

svn commit: r1680436 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/loader/ core/src/java/org/apache/solr/update/ core/src/test/org/apache/solr/handler/loader/ solrj/src/java/org/apache/solr/client/solrj/impl/ solrj/src/java/org/...

Author: thelabdude
Date: Wed May 20 01:03:23 2015
New Revision: 1680436

URL: http://svn.apache.org/r1680436
Log:
SOLR-7333: Make the poll queue time configurable and use knowledge that a batch is being processed to poll efficiently

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed May 20 01:03:23 2015
@@ -303,6 +303,10 @@ Optimizations
 * SOLR-7547: Short circuit SolrDisptachFilter for static content request. Right now it creates
   a new HttpSolrCall object and tries to process it. (Anshum Gupta)
 
+* SOLR-7333: Make the poll queue time a leader uses when distributing updates to replicas 
+  configurable and use knowledge that a batch is being processed to poll efficiently.
+  (Timothy Potter)
+
 Other Changes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java Wed May 20 01:03:23 2015
@@ -31,6 +31,8 @@ import org.apache.solr.update.processor.
  */
 public abstract class ContentStreamLoader {
 
+  protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
+
   /**
    * This should be called once for each RequestHandler
    */

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Wed May 20 01:03:23 2015
@@ -49,7 +49,7 @@ import java.util.Set;
  */
 public class JavabinLoader extends ContentStreamLoader {
   public static Logger log = LoggerFactory.getLogger(JavabinLoader.class);
-  
+
   @Override
   public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
     InputStream is = null;
@@ -91,7 +91,12 @@ public class JavabinLoader extends Conte
         if (overwrite != null) {
           addCmd.overwrite = overwrite;
         }
-        
+
+        if (updateRequest.isLastDocInBatch()) {
+          // this is a hint to downstream code that indicates we've sent the last doc in a batch
+          addCmd.isLastDocInBatch = true;
+        }
+
         try {
           processor.processAdd(addCmd);
           addCmd.clear();
@@ -115,7 +120,9 @@ public class JavabinLoader extends Conte
 
   private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
     AddUpdateCommand addCmd = new AddUpdateCommand(req);
-
+    // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
+    // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
+    addCmd.pollQueueTime = pollQueueTime;
     addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
     addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
     return addCmd;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed May 20 01:03:23 2015
@@ -50,6 +50,10 @@ public class AddUpdateCommand extends Up
    public Term updateTerm;
 
    public int commitWithin = -1;
+
+   public boolean isLastDocInBatch = false;
+
+   public int pollQueueTime = 0;
    
    public AddUpdateCommand(SolrQueryRequest req) {
      super(req);
@@ -65,6 +69,7 @@ public class AddUpdateCommand extends Up
      solrDoc = null;
      indexedId = null;
      updateTerm = null;
+     isLastDocInBatch = false;
      version = 0;
    }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed May 20 01:03:23 2015
@@ -196,12 +196,14 @@ public class SolrCmdDistributor {
   }
   
   public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {  
-
+    String cmdStr = cmd.toString();
     for (Node node : nodes) {
       UpdateRequest uReq = new UpdateRequest();
+      if (cmd.isLastDocInBatch)
+        uReq.lastDocInBatch();
       uReq.setParams(params);
       uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-      submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
+      submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
     }
     
   }
@@ -310,17 +312,19 @@ public class SolrCmdDistributor {
     public boolean synchronous;
     public String cmdString;
     public RequestReplicationTracker rfTracker;
+    public int pollQueueTime;
 
     public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
-      this(cmdString, node, uReq, synchronous, null);
+      this(cmdString, node, uReq, synchronous, null, 0);
     }
     
-    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
+    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
       this.node = node;
       this.uReq = uReq;
       this.synchronous = synchronous;
       this.cmdString = cmdString;
       this.rfTracker = rfTracker;
+      this.pollQueueTime = pollQueueTime;
     }
     
     public String toString() {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java Wed May 20 01:03:23 2015
@@ -87,7 +87,7 @@ public class StreamingSolrClients {
       };
       client.setParser(new BinaryResponseParser());
       client.setRequestWriter(new BinaryRequestWriter());
-      client.setPollQueueTime(0);
+      client.setPollQueueTime(req.pollQueueTime);
       Set<String> queryParams = new HashSet<>(2);
       queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
       queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java?rev=1680436&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java Wed May 20 01:03:23 2015
@@ -0,0 +1,92 @@
+package org.apache.solr.handler.loader;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.processor.BufferingRequestProcessor;
+import org.junit.BeforeClass;
+
+public class JavabinLoaderTest extends SolrTestCaseJ4 {
+  @BeforeClass
+  public static void beforeTests() throws Exception {
+    initCore("solrconfig.xml","schema.xml");
+  }
+
+  /**
+   * Verifies the isLastDocInBatch flag gets set correctly for a batch of docs and for a request with a single doc.
+   */
+  public void testLastDocInBatchFlag() throws Exception {
+    doTestLastDocInBatchFlag(1); // single doc
+    doTestLastDocInBatchFlag(2); // multiple docs
+  }
+
+  protected void doTestLastDocInBatchFlag(int numDocsInBatch) throws Exception {
+    List<SolrInputDocument> batch = new ArrayList<>(numDocsInBatch);
+    for (int d=0; d < numDocsInBatch; d++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.setField("id", String.valueOf(d));
+      batch.add(doc);
+    }
+
+    UpdateRequest updateRequest = new UpdateRequest();
+    if (batch.size() > 1) {
+      updateRequest.add(batch);
+    } else {
+      updateRequest.add(batch.get(0));
+    }
+
+    // client-side SolrJ would do this ...
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    (new JavaBinUpdateRequestCodec()).marshal(updateRequest, os);
+
+    // need to override the processAdd method b/c JavabinLoader calls
+    // clear on the addCmd after it is passed on to the handler ... a simple clone will suffice for this test
+    BufferingRequestProcessor mockUpdateProcessor = new BufferingRequestProcessor(null) {
+      @Override
+      public void processAdd(AddUpdateCommand cmd) throws IOException {
+        addCommands.add((AddUpdateCommand)cmd.clone());
+      }
+    };
+
+    SolrQueryRequest req = req();
+    (new JavabinLoader()).load(req,
+        new SolrQueryResponse(),
+        new ContentStreamBase.ByteArrayStream(os.toByteArray(), "test"),
+        mockUpdateProcessor);
+    req.close();
+
+    assertTrue(mockUpdateProcessor.addCommands.size() == numDocsInBatch);
+    for (int i=0; i < numDocsInBatch-1; i++)
+      assertFalse(mockUpdateProcessor.addCommands.get(i).isLastDocInBatch); // not last doc in batch
+
+    // last doc should have the flag set
+    assertTrue(mockUpdateProcessor.addCommands.get(batch.size()-1).isLastDocInBatch);
+  }
+}

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java Wed May 20 01:03:23 2015
@@ -156,7 +156,7 @@ public class ConcurrentUpdateSolrClient
 
       log.debug("starting runner: {}", this);
       HttpPost method = null;
-      HttpResponse response = null;            
+      HttpResponse response = null;
       try {
         while (!queue.isEmpty()) {
           try {
@@ -207,7 +207,14 @@ public class ConcurrentUpdateSolrClient
                       }
                     }
                     out.flush();
-                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+
+                    if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
+                      // no need to wait to see another doc in the queue if we've hit the last doc in a batch
+                      req = queue.poll(0, TimeUnit.MILLISECONDS);
+                    } else {
+                      req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+                    }
+
                   }
                   
                   if (isXml) {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Wed May 20 01:03:23 2015
@@ -126,7 +126,6 @@ public class JavaBinUpdateRequestCodec {
 
       @Override
       public List readIterator(DataInputInputStream fis) throws IOException {
-
         // default behavior for reading any regular Iterator in the stream
         if (seenOuterMostDocIterator) return super.readIterator(fis);
 
@@ -142,9 +141,16 @@ public class JavaBinUpdateRequestCodec {
         if (handler == null) return super.readIterator(fis);
         Integer commitWithin = null;
         Boolean overwrite = null;
+        Object o = null;
         while (true) {
-          Object o = readVal(fis);
-          if (o == END_OBJ) break;
+          if (o == null) {
+            o = readVal(fis);
+          }
+
+          if (o == END_OBJ) {
+            break;
+          }
+
           SolrInputDocument sdoc = null;
           if (o instanceof List) {
             sdoc = listToSolrInputDocument((List<NamedList>) o);
@@ -160,9 +166,16 @@ public class JavaBinUpdateRequestCodec {
               overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
             }
           } else  {
-          
             sdoc = (SolrInputDocument) o;
           }
+
+          // peek at the next object to see if we're at the end
+          o = readVal(fis);
+          if (o == END_OBJ) {
+            // indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication
+            updateRequest.lastDocInBatch();
+          }
+
           handler.update(sdoc, updateRequest, commitWithin, overwrite);
         }
         return Collections.EMPTY_LIST;

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Wed May 20 01:03:23 2015
@@ -57,7 +57,9 @@ public class UpdateRequest extends Abstr
   private Iterator<SolrInputDocument> docIterator = null;
   private Map<String,Map<String,Object>> deleteById = null;
   private List<String> deleteQuery = null;
-  
+
+  private boolean isLastDocInBatch = false;
+
   public UpdateRequest() {
     super(METHOD.POST, "/update");
   }
@@ -460,4 +462,11 @@ public class UpdateRequest extends Abstr
     return deleteQuery;
   }
   
+  public boolean isLastDocInBatch() {
+    return isLastDocInBatch;
+  }
+  
+  public void lastDocInBatch() {
+    isLastDocInBatch = true;
+  }
 }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java Wed May 20 01:03:23 2015
@@ -555,7 +555,7 @@ public class JavaBinCodec {
 
       @Override
       public String toString() {
-        return "MapEntry[" + key.toString() + ":" + value.toString() + "]";
+        return "MapEntry[" + key + ":" + value + "]";
       }
 
       @Override