You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2015/05/20 04:20:14 UTC

svn commit: r1680441 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/handler/loader/ solr/core/src/java/org/apache/solr/update/ solr/core/src/test/org/apache/solr/handler/loader/ solr/solrj/ solr/solrj/src/ja...

Author: thelabdude
Date: Wed May 20 02:20:13 2015
New Revision: 1680441

URL: http://svn.apache.org/r1680441
Log:
SOLR-7333: Make the poll queue time configurable and use knowledge that a batch is being processed to poll efficiently

Added:
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/handler/loader/
      - copied from r1680436, lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/
Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Wed May 20 02:20:13 2015
@@ -246,6 +246,10 @@ Optimizations
 * SOLR-7547: Short circuit SolrDisptachFilter for static content request. Right now it creates
   a new HttpSolrCall object and tries to process it. (Anshum Gupta)
 
+* SOLR-7333: Make the poll queue time a leader uses when distributing updates to replicas 
+  configurable and use knowledge that a batch is being processed to poll efficiently.
+  (Timothy Potter)
+
 Other Changes
 ----------------------
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java Wed May 20 02:20:13 2015
@@ -31,6 +31,8 @@ import org.apache.solr.update.processor.
  */
 public abstract class ContentStreamLoader {
 
+  protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
+
   /**
    * This should be called once for each RequestHandler
    */

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Wed May 20 02:20:13 2015
@@ -49,7 +49,7 @@ import java.util.Set;
  */
 public class JavabinLoader extends ContentStreamLoader {
   public static Logger log = LoggerFactory.getLogger(JavabinLoader.class);
-  
+
   @Override
   public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
     InputStream is = null;
@@ -91,7 +91,12 @@ public class JavabinLoader extends Conte
         if (overwrite != null) {
           addCmd.overwrite = overwrite;
         }
-        
+
+        if (updateRequest.isLastDocInBatch()) {
+          // this is a hint to downstream code that indicates we've sent the last doc in a batch
+          addCmd.isLastDocInBatch = true;
+        }
+
         try {
           processor.processAdd(addCmd);
           addCmd.clear();
@@ -115,7 +120,9 @@ public class JavabinLoader extends Conte
 
   private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
     AddUpdateCommand addCmd = new AddUpdateCommand(req);
-
+    // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
+    // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
+    addCmd.pollQueueTime = pollQueueTime;
     addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
     addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
     return addCmd;

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed May 20 02:20:13 2015
@@ -49,6 +49,10 @@ public class AddUpdateCommand extends Up
    public Term updateTerm;
 
    public int commitWithin = -1;
+
+   public boolean isLastDocInBatch = false;
+
+   public int pollQueueTime = 0;
    
    public AddUpdateCommand(SolrQueryRequest req) {
      super(req);
@@ -64,6 +68,7 @@ public class AddUpdateCommand extends Up
      solrDoc = null;
      indexedId = null;
      updateTerm = null;
+     isLastDocInBatch = false;
      version = 0;
    }
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed May 20 02:20:13 2015
@@ -196,12 +196,14 @@ public class SolrCmdDistributor {
   }
   
   public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {  
-
+    String cmdStr = cmd.toString();
     for (Node node : nodes) {
       UpdateRequest uReq = new UpdateRequest();
+      if (cmd.isLastDocInBatch)
+        uReq.lastDocInBatch();
       uReq.setParams(params);
       uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-      submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
+      submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
     }
     
   }
@@ -310,17 +312,19 @@ public class SolrCmdDistributor {
     public boolean synchronous;
     public String cmdString;
     public RequestReplicationTracker rfTracker;
+    public int pollQueueTime;
 
     public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
-      this(cmdString, node, uReq, synchronous, null);
+      this(cmdString, node, uReq, synchronous, null, 0);
     }
     
-    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
+    public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
       this.node = node;
       this.uReq = uReq;
       this.synchronous = synchronous;
       this.cmdString = cmdString;
       this.rfTracker = rfTracker;
+      this.pollQueueTime = pollQueueTime;
     }
     
     public String toString() {

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java Wed May 20 02:20:13 2015
@@ -87,7 +87,7 @@ public class StreamingSolrClients {
       };
       client.setParser(new BinaryResponseParser());
       client.setRequestWriter(new BinaryRequestWriter());
-      client.setPollQueueTime(0);
+      client.setPollQueueTime(req.pollQueueTime);
       Set<String> queryParams = new HashSet<>(2);
       queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
       queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java Wed May 20 02:20:13 2015
@@ -156,7 +156,7 @@ public class ConcurrentUpdateSolrClient
 
       log.debug("starting runner: {}", this);
       HttpPost method = null;
-      HttpResponse response = null;            
+      HttpResponse response = null;
       try {
         while (!queue.isEmpty()) {
           try {
@@ -207,7 +207,14 @@ public class ConcurrentUpdateSolrClient
                       }
                     }
                     out.flush();
-                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+
+                    if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
+                      // no need to wait to see another doc in the queue if we've hit the last doc in a batch
+                      req = queue.poll(0, TimeUnit.MILLISECONDS);
+                    } else {
+                      req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+                    }
+
                   }
                   
                   if (isXml) {

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Wed May 20 02:20:13 2015
@@ -126,7 +126,6 @@ public class JavaBinUpdateRequestCodec {
 
       @Override
       public List readIterator(DataInputInputStream fis) throws IOException {
-
         // default behavior for reading any regular Iterator in the stream
         if (seenOuterMostDocIterator) return super.readIterator(fis);
 
@@ -142,9 +141,16 @@ public class JavaBinUpdateRequestCodec {
         if (handler == null) return super.readIterator(fis);
         Integer commitWithin = null;
         Boolean overwrite = null;
+        Object o = null;
         while (true) {
-          Object o = readVal(fis);
-          if (o == END_OBJ) break;
+          if (o == null) {
+            o = readVal(fis);
+          }
+
+          if (o == END_OBJ) {
+            break;
+          }
+
           SolrInputDocument sdoc = null;
           if (o instanceof List) {
             sdoc = listToSolrInputDocument((List<NamedList>) o);
@@ -160,9 +166,16 @@ public class JavaBinUpdateRequestCodec {
               overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
             }
           } else  {
-          
             sdoc = (SolrInputDocument) o;
           }
+
+          // peek at the next object to see if we're at the end
+          o = readVal(fis);
+          if (o == END_OBJ) {
+            // indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication
+            updateRequest.lastDocInBatch();
+          }
+
           handler.update(sdoc, updateRequest, commitWithin, overwrite);
         }
         return Collections.EMPTY_LIST;

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Wed May 20 02:20:13 2015
@@ -57,7 +57,9 @@ public class UpdateRequest extends Abstr
   private Iterator<SolrInputDocument> docIterator = null;
   private Map<String,Map<String,Object>> deleteById = null;
   private List<String> deleteQuery = null;
-  
+
+  private boolean isLastDocInBatch = false;
+
   public UpdateRequest() {
     super(METHOD.POST, "/update");
   }
@@ -460,4 +462,11 @@ public class UpdateRequest extends Abstr
     return deleteQuery;
   }
   
+  public boolean isLastDocInBatch() {
+    return isLastDocInBatch;
+  }
+  
+  public void lastDocInBatch() {
+    isLastDocInBatch = true;
+  }
 }

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java Wed May 20 02:20:13 2015
@@ -555,7 +555,7 @@ public class JavaBinCodec {
 
       @Override
       public String toString() {
-        return "MapEntry[" + key.toString() + ":" + value.toString() + "]";
+        return "MapEntry[" + key + ":" + value + "]";
       }
 
       @Override