You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2015/05/20 04:20:14 UTC
svn commit: r1680441 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/loader/
solr/core/src/java/org/apache/solr/update/
solr/core/src/test/org/apache/solr/handler/loader/ solr/solrj/
solr/solrj/src/ja...
Author: thelabdude
Date: Wed May 20 02:20:13 2015
New Revision: 1680441
URL: http://svn.apache.org/r1680441
Log:
SOLR-7333: Make the poll queue time configurable and use knowledge that a batch is being processed to poll efficiently
Added:
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/handler/loader/
- copied from r1680436, lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/loader/
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Wed May 20 02:20:13 2015
@@ -246,6 +246,10 @@ Optimizations
* SOLR-7547: Short circuit SolrDisptachFilter for static content request. Right now it creates
a new HttpSolrCall object and tries to process it. (Anshum Gupta)
+* SOLR-7333: Make the poll queue time a leader uses when distributing updates to replicas
+ configurable and use knowledge that a batch is being processed to poll efficiently.
+ (Timothy Potter)
+
Other Changes
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java Wed May 20 02:20:13 2015
@@ -31,6 +31,8 @@ import org.apache.solr.update.processor.
*/
public abstract class ContentStreamLoader {
+ protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
+
/**
* This should be called once for each RequestHandler
*/
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Wed May 20 02:20:13 2015
@@ -49,7 +49,7 @@ import java.util.Set;
*/
public class JavabinLoader extends ContentStreamLoader {
public static Logger log = LoggerFactory.getLogger(JavabinLoader.class);
-
+
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
InputStream is = null;
@@ -91,7 +91,12 @@ public class JavabinLoader extends Conte
if (overwrite != null) {
addCmd.overwrite = overwrite;
}
-
+
+ if (updateRequest.isLastDocInBatch()) {
+ // this is a hint to downstream code that indicates we've sent the last doc in a batch
+ addCmd.isLastDocInBatch = true;
+ }
+
try {
processor.processAdd(addCmd);
addCmd.clear();
@@ -115,7 +120,9 @@ public class JavabinLoader extends Conte
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
-
+ // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
+ // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
+ addCmd.pollQueueTime = pollQueueTime;
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
return addCmd;
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed May 20 02:20:13 2015
@@ -49,6 +49,10 @@ public class AddUpdateCommand extends Up
public Term updateTerm;
public int commitWithin = -1;
+
+ public boolean isLastDocInBatch = false;
+
+ public int pollQueueTime = 0;
public AddUpdateCommand(SolrQueryRequest req) {
super(req);
@@ -64,6 +68,7 @@ public class AddUpdateCommand extends Up
solrDoc = null;
indexedId = null;
updateTerm = null;
+ isLastDocInBatch = false;
version = 0;
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed May 20 02:20:13 2015
@@ -196,12 +196,14 @@ public class SolrCmdDistributor {
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {
-
+ String cmdStr = cmd.toString();
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
+ if (cmd.isLastDocInBatch)
+ uReq.lastDocInBatch();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
+ submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
}
}
@@ -310,17 +312,19 @@ public class SolrCmdDistributor {
public boolean synchronous;
public String cmdString;
public RequestReplicationTracker rfTracker;
+ public int pollQueueTime;
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
- this(cmdString, node, uReq, synchronous, null);
+ this(cmdString, node, uReq, synchronous, null, 0);
}
- public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
+ public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmdString = cmdString;
this.rfTracker = rfTracker;
+ this.pollQueueTime = pollQueueTime;
}
public String toString() {
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java Wed May 20 02:20:13 2015
@@ -87,7 +87,7 @@ public class StreamingSolrClients {
};
client.setParser(new BinaryResponseParser());
client.setRequestWriter(new BinaryRequestWriter());
- client.setPollQueueTime(0);
+ client.setPollQueueTime(req.pollQueueTime);
Set<String> queryParams = new HashSet<>(2);
queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java Wed May 20 02:20:13 2015
@@ -156,7 +156,7 @@ public class ConcurrentUpdateSolrClient
log.debug("starting runner: {}", this);
HttpPost method = null;
- HttpResponse response = null;
+ HttpResponse response = null;
try {
while (!queue.isEmpty()) {
try {
@@ -207,7 +207,14 @@ public class ConcurrentUpdateSolrClient
}
}
out.flush();
- req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+
+ if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
+ // no need to wait to see another doc in the queue if we've hit the last doc in a batch
+ req = queue.poll(0, TimeUnit.MILLISECONDS);
+ } else {
+ req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ }
+
}
if (isXml) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Wed May 20 02:20:13 2015
@@ -126,7 +126,6 @@ public class JavaBinUpdateRequestCodec {
@Override
public List readIterator(DataInputInputStream fis) throws IOException {
-
// default behavior for reading any regular Iterator in the stream
if (seenOuterMostDocIterator) return super.readIterator(fis);
@@ -142,9 +141,16 @@ public class JavaBinUpdateRequestCodec {
if (handler == null) return super.readIterator(fis);
Integer commitWithin = null;
Boolean overwrite = null;
+ Object o = null;
while (true) {
- Object o = readVal(fis);
- if (o == END_OBJ) break;
+ if (o == null) {
+ o = readVal(fis);
+ }
+
+ if (o == END_OBJ) {
+ break;
+ }
+
SolrInputDocument sdoc = null;
if (o instanceof List) {
sdoc = listToSolrInputDocument((List<NamedList>) o);
@@ -160,9 +166,16 @@ public class JavaBinUpdateRequestCodec {
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else {
-
sdoc = (SolrInputDocument) o;
}
+
+ // peek at the next object to see if we're at the end
+ o = readVal(fis);
+ if (o == END_OBJ) {
+ // indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication
+ updateRequest.lastDocInBatch();
+ }
+
handler.update(sdoc, updateRequest, commitWithin, overwrite);
}
return Collections.EMPTY_LIST;
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Wed May 20 02:20:13 2015
@@ -57,7 +57,9 @@ public class UpdateRequest extends Abstr
private Iterator<SolrInputDocument> docIterator = null;
private Map<String,Map<String,Object>> deleteById = null;
private List<String> deleteQuery = null;
-
+
+ private boolean isLastDocInBatch = false;
+
public UpdateRequest() {
super(METHOD.POST, "/update");
}
@@ -460,4 +462,11 @@ public class UpdateRequest extends Abstr
return deleteQuery;
}
+ public boolean isLastDocInBatch() {
+ return isLastDocInBatch;
+ }
+
+ public void lastDocInBatch() {
+ isLastDocInBatch = true;
+ }
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java?rev=1680441&r1=1680440&r2=1680441&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java Wed May 20 02:20:13 2015
@@ -555,7 +555,7 @@ public class JavaBinCodec {
@Override
public String toString() {
- return "MapEntry[" + key.toString() + ":" + value.toString() + "]";
+ return "MapEntry[" + key + ":" + value + "]";
}
@Override