You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2015/05/20 03:03:24 UTC
svn commit: r1680436 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/loader/
core/src/java/org/apache/solr/update/
core/src/test/org/apache/solr/handler/loader/
solrj/src/java/org/apache/solr/client/solrj/impl/ solrj/src/java/org/...
Author: thelabdude
Date: Wed May 20 01:03:23 2015
New Revision: 1680436
URL: http://svn.apache.org/r1680436
Log:
SOLR-7333: Make the poll queue time configurable and use knowledge that a batch is being processed to poll efficiently
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed May 20 01:03:23 2015
@@ -303,6 +303,10 @@ Optimizations
* SOLR-7547: Short circuit SolrDisptachFilter for static content request. Right now it creates
a new HttpSolrCall object and tries to process it. (Anshum Gupta)
+* SOLR-7333: Make the poll queue time a leader uses when distributing updates to replicas
+ configurable and use knowledge that a batch is being processed to poll efficiently.
+ (Timothy Potter)
+
Other Changes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java Wed May 20 01:03:23 2015
@@ -31,6 +31,8 @@ import org.apache.solr.update.processor.
*/
public abstract class ContentStreamLoader {
+ protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
+
/**
* This should be called once for each RequestHandler
*/
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Wed May 20 01:03:23 2015
@@ -49,7 +49,7 @@ import java.util.Set;
*/
public class JavabinLoader extends ContentStreamLoader {
public static Logger log = LoggerFactory.getLogger(JavabinLoader.class);
-
+
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
InputStream is = null;
@@ -91,7 +91,12 @@ public class JavabinLoader extends Conte
if (overwrite != null) {
addCmd.overwrite = overwrite;
}
-
+
+ if (updateRequest.isLastDocInBatch()) {
+ // this is a hint to downstream code that indicates we've sent the last doc in a batch
+ addCmd.isLastDocInBatch = true;
+ }
+
try {
processor.processAdd(addCmd);
addCmd.clear();
@@ -115,7 +120,9 @@ public class JavabinLoader extends Conte
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
-
+ // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
+ // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
+ addCmd.pollQueueTime = pollQueueTime;
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
return addCmd;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed May 20 01:03:23 2015
@@ -50,6 +50,10 @@ public class AddUpdateCommand extends Up
public Term updateTerm;
public int commitWithin = -1;
+
+ public boolean isLastDocInBatch = false;
+
+ public int pollQueueTime = 0;
public AddUpdateCommand(SolrQueryRequest req) {
super(req);
@@ -65,6 +69,7 @@ public class AddUpdateCommand extends Up
solrDoc = null;
indexedId = null;
updateTerm = null;
+ isLastDocInBatch = false;
version = 0;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed May 20 01:03:23 2015
@@ -196,12 +196,14 @@ public class SolrCmdDistributor {
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {
-
+ String cmdStr = cmd.toString();
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
+ if (cmd.isLastDocInBatch)
+ uReq.lastDocInBatch();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
+ submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
}
}
@@ -310,17 +312,19 @@ public class SolrCmdDistributor {
public boolean synchronous;
public String cmdString;
public RequestReplicationTracker rfTracker;
+ public int pollQueueTime;
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
- this(cmdString, node, uReq, synchronous, null);
+ this(cmdString, node, uReq, synchronous, null, 0);
}
- public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
+ public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmdString = cmdString;
this.rfTracker = rfTracker;
+ this.pollQueueTime = pollQueueTime;
}
public String toString() {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java Wed May 20 01:03:23 2015
@@ -87,7 +87,7 @@ public class StreamingSolrClients {
};
client.setParser(new BinaryResponseParser());
client.setRequestWriter(new BinaryRequestWriter());
- client.setPollQueueTime(0);
+ client.setPollQueueTime(req.pollQueueTime);
Set<String> queryParams = new HashSet<>(2);
queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java?rev=1680436&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java Wed May 20 01:03:23 2015
@@ -0,0 +1,92 @@
+package org.apache.solr.handler.loader;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.processor.BufferingRequestProcessor;
+import org.junit.BeforeClass;
+
+public class JavabinLoaderTest extends SolrTestCaseJ4 {
+ @BeforeClass
+ public static void beforeTests() throws Exception {
+ initCore("solrconfig.xml","schema.xml");
+ }
+
+ /**
+ * Verifies the isLastDocInBatch flag gets set correctly for a batch of docs and for a request with a single doc.
+ */
+ public void testLastDocInBatchFlag() throws Exception {
+ doTestLastDocInBatchFlag(1); // single doc
+ doTestLastDocInBatchFlag(2); // multiple docs
+ }
+
+ protected void doTestLastDocInBatchFlag(int numDocsInBatch) throws Exception {
+ List<SolrInputDocument> batch = new ArrayList<>(numDocsInBatch);
+ for (int d=0; d < numDocsInBatch; d++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.setField("id", String.valueOf(d));
+ batch.add(doc);
+ }
+
+ UpdateRequest updateRequest = new UpdateRequest();
+ if (batch.size() > 1) {
+ updateRequest.add(batch);
+ } else {
+ updateRequest.add(batch.get(0));
+ }
+
+ // client-side SolrJ would do this ...
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ (new JavaBinUpdateRequestCodec()).marshal(updateRequest, os);
+
+ // need to override the processAdd method b/c JavabinLoader calls
+ // clear on the addCmd after it is passed on to the handler ... a simple clone will suffice for this test
+ BufferingRequestProcessor mockUpdateProcessor = new BufferingRequestProcessor(null) {
+ @Override
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ addCommands.add((AddUpdateCommand)cmd.clone());
+ }
+ };
+
+ SolrQueryRequest req = req();
+ (new JavabinLoader()).load(req,
+ new SolrQueryResponse(),
+ new ContentStreamBase.ByteArrayStream(os.toByteArray(), "test"),
+ mockUpdateProcessor);
+ req.close();
+
+ assertTrue(mockUpdateProcessor.addCommands.size() == numDocsInBatch);
+ for (int i=0; i < numDocsInBatch-1; i++)
+ assertFalse(mockUpdateProcessor.addCommands.get(i).isLastDocInBatch); // not last doc in batch
+
+ // last doc should have the flag set
+ assertTrue(mockUpdateProcessor.addCommands.get(batch.size()-1).isLastDocInBatch);
+ }
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java Wed May 20 01:03:23 2015
@@ -156,7 +156,7 @@ public class ConcurrentUpdateSolrClient
log.debug("starting runner: {}", this);
HttpPost method = null;
- HttpResponse response = null;
+ HttpResponse response = null;
try {
while (!queue.isEmpty()) {
try {
@@ -207,7 +207,14 @@ public class ConcurrentUpdateSolrClient
}
}
out.flush();
- req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+
+ if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
+ // no need to wait to see another doc in the queue if we've hit the last doc in a batch
+ req = queue.poll(0, TimeUnit.MILLISECONDS);
+ } else {
+ req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ }
+
}
if (isXml) {
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Wed May 20 01:03:23 2015
@@ -126,7 +126,6 @@ public class JavaBinUpdateRequestCodec {
@Override
public List readIterator(DataInputInputStream fis) throws IOException {
-
// default behavior for reading any regular Iterator in the stream
if (seenOuterMostDocIterator) return super.readIterator(fis);
@@ -142,9 +141,16 @@ public class JavaBinUpdateRequestCodec {
if (handler == null) return super.readIterator(fis);
Integer commitWithin = null;
Boolean overwrite = null;
+ Object o = null;
while (true) {
- Object o = readVal(fis);
- if (o == END_OBJ) break;
+ if (o == null) {
+ o = readVal(fis);
+ }
+
+ if (o == END_OBJ) {
+ break;
+ }
+
SolrInputDocument sdoc = null;
if (o instanceof List) {
sdoc = listToSolrInputDocument((List<NamedList>) o);
@@ -160,9 +166,16 @@ public class JavaBinUpdateRequestCodec {
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else {
-
sdoc = (SolrInputDocument) o;
}
+
+ // peek at the next object to see if we're at the end
+ o = readVal(fis);
+ if (o == END_OBJ) {
+ // indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication
+ updateRequest.lastDocInBatch();
+ }
+
handler.update(sdoc, updateRequest, commitWithin, overwrite);
}
return Collections.EMPTY_LIST;
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Wed May 20 01:03:23 2015
@@ -57,7 +57,9 @@ public class UpdateRequest extends Abstr
private Iterator<SolrInputDocument> docIterator = null;
private Map<String,Map<String,Object>> deleteById = null;
private List<String> deleteQuery = null;
-
+
+ private boolean isLastDocInBatch = false;
+
public UpdateRequest() {
super(METHOD.POST, "/update");
}
@@ -460,4 +462,11 @@ public class UpdateRequest extends Abstr
return deleteQuery;
}
+ public boolean isLastDocInBatch() {
+ return isLastDocInBatch;
+ }
+
+ public void lastDocInBatch() {
+ isLastDocInBatch = true;
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java?rev=1680436&r1=1680435&r2=1680436&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java Wed May 20 01:03:23 2015
@@ -555,7 +555,7 @@ public class JavaBinCodec {
@Override
public String toString() {
- return "MapEntry[" + key.toString() + ":" + value.toString() + "]";
+ return "MapEntry[" + key + ":" + value + "]";
}
@Override