You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/09/11 06:03:04 UTC
svn commit: r1521726 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/loader/
solr/core/src/java/org/apache/solr/update/
solr/core/src/test/org/apache/solr/cloud/ solr/solrj/
solr/solrj/src/java/org/ap...
Author: markrmiller
Date: Wed Sep 11 04:03:04 2013
New Revision: 1521726
URL: http://svn.apache.org/r1521726
Log:
SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node update forwarding.
SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin.
SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default.
Removed:
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
lucene/dev/branches/branch_4x/solr/test-framework/ (props changed)
lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Wed Sep 11 04:03:04 2013
@@ -56,6 +56,15 @@ Upgrading from Solr 4.4.0
init param option is now deprecated and should be replaced with the more standard
<arr name="fieldName">. See SOLR-4249 for more details.
+* UpdateRequestExt has been removed as part of SOLR-4816. You should use UpdateRequest
+ instead.
+
+* CloudSolrServer can now use multiple threads to add documents by default. This is a
+ small change in runtime semantics when using the bulk add method - you will still
+ end up with the same exception on a failure, but some documents beyond the one that
+ failed may have made it in. To get the old, single threaded behavior, set parallel updates
+ to false on the CloudSolrServer instance.
+
Detailed Change List
----------------------
@@ -107,7 +116,11 @@ New Features
can specify facet.threads to parallelize loading the uninverted fields. In at least
one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta,
Gun Akkor via Erick Erickson)
+
+* SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
+ update forwarding. (Joel Bernstein, Mark Miller)
+* SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller)
Bug Fixes
----------------------
@@ -193,6 +206,9 @@ Optimizations
* SOLR-5057: QueryResultCache should not related with the order of fq's list (Feihong Huang via Erick Erickson)
+* SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default.
+ (Joel Bernstein via Mark Miller)
+
Other Changes
----------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Wed Sep 11 04:03:04 2013
@@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* Update handler which uses the JavaBin format
@@ -97,7 +100,7 @@ public class JavabinLoader extends Conte
} catch (EOFException e) {
break; // this is expected
}
- if (update.getDeleteById() != null || update.getDeleteQuery() != null) {
+ if (update.getDeleteByIdMap() != null || update.getDeleteQuery() != null) {
delete(req, update, processor);
}
}
@@ -118,9 +121,17 @@ public class JavabinLoader extends Conte
delcmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
}
- if(update.getDeleteById() != null) {
- for (String s : update.getDeleteById()) {
- delcmd.id = s;
+ if(update.getDeleteByIdMap() != null) {
+ Set<Entry<String,Map<String,Object>>> entries = update.getDeleteByIdMap().entrySet();
+ for (Entry<String,Map<String,Object>> e : entries) {
+ delcmd.id = e.getKey();
+ Map<String,Object> map = e.getValue();
+ if (map != null) {
+ Long version = (Long) map.get("ver");
+ if (version != null) {
+ delcmd.setVersion(version);
+ }
+ }
processor.processDelete(delcmd);
delcmd.clear();
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed Sep 11 04:03:04 2013
@@ -17,6 +17,10 @@
package org.apache.solr.update;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.BytesRef;
@@ -27,11 +31,6 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
/**
*
*/
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed Sep 11 04:03:04 2013
@@ -34,7 +34,7 @@ import java.util.concurrent.RejectedExec
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -152,8 +152,9 @@ public class SolrCmdDistributor {
// finish with the pending requests
checkResponses(false);
- UpdateRequestExt ureq = new UpdateRequestExt();
+ UpdateRequest ureq = new UpdateRequest();
ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+
ureq.setParams(params);
syncRequest(node, ureq);
}
@@ -173,7 +174,7 @@ public class SolrCmdDistributor {
deleteRequest.cmd = clonedCmd;
deleteRequest.params = params;
- UpdateRequestExt ureq = new UpdateRequestExt();
+ UpdateRequest ureq = new UpdateRequest();
if (cmd.isDeleteById()) {
ureq.deleteById(cmd.getId(), cmd.getVersion());
} else {
@@ -185,7 +186,7 @@ public class SolrCmdDistributor {
}
}
- private void syncRequest(Node node, UpdateRequestExt ureq) {
+ private void syncRequest(Node node, UpdateRequest ureq) {
Request sreq = new Request();
sreq.node = node;
sreq.ureq = ureq;
@@ -224,7 +225,7 @@ public class SolrCmdDistributor {
// currently, we dont try to piggy back on outstanding adds or deletes
- UpdateRequestExt ureq = new UpdateRequestExt();
+ UpdateRequest ureq = new UpdateRequest();
ureq.setParams(params);
addCommit(ureq, cmd);
@@ -265,7 +266,7 @@ public class SolrCmdDistributor {
flushDeletes(maxBufferedDeletesPerServer);
}
- void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+ void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
if (cmd == null) return;
ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
@@ -281,14 +282,13 @@ public class SolrCmdDistributor {
List<AddRequest> alist = adds.get(node);
if (alist == null || alist.size() < limit) continue;
- UpdateRequestExt ureq = new UpdateRequestExt();
+ UpdateRequest ureq = new UpdateRequest();
ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-
+
for (AddRequest aReq : alist) {
AddUpdateCommand cmd = aReq.cmd;
combinedParams.add(aReq.params);
-
ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
}
@@ -315,7 +315,7 @@ public class SolrCmdDistributor {
for (Node node : nodes) {
List<DeleteRequest> dlist = deletes.get(node);
if (dlist == null || dlist.size() < limit) continue;
- UpdateRequestExt ureq = new UpdateRequestExt();
+ UpdateRequest ureq = new UpdateRequest();
ModifiableSolrParams combinedParams = new ModifiableSolrParams();
@@ -354,14 +354,14 @@ public class SolrCmdDistributor {
public static class Request {
public Node node;
- UpdateRequestExt ureq;
+ UpdateRequest ureq;
NamedList<Object> ursp;
int rspCode;
public Exception exception;
int retries;
}
- void submit(UpdateRequestExt ureq, Node node) {
+ void submit(UpdateRequest ureq, Node node) {
Request sreq = new Request();
sreq.node = node;
sreq.ureq = ureq;
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Wed Sep 11 04:03:04 2013
@@ -17,7 +17,6 @@
package org.apache.solr.update;
-import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.request.SolrQueryRequest;
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java Wed Sep 11 04:03:04 2013
@@ -153,6 +153,7 @@ public class AliasIntegrationTest extend
// search with new cloud client
CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+ cloudSolrServer.setParallelUpdates(random().nextBoolean());
query = new SolrQuery("*:*");
query.set("collection", "testalias");
res = cloudSolrServer.query(query);
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Wed Sep 11 04:03:04 2013
@@ -1104,6 +1104,7 @@ public class BasicDistributedZkTest exte
synchronized(this) {
try {
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+ commondCloudSolrServer.setParallelUpdates(random().nextBoolean());
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
commondCloudSolrServer.getLbServer().setConnectionTimeout(15000);
commondCloudSolrServer.getLbServer().setSoTimeout(30000);
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java Wed Sep 11 04:03:04 2013
@@ -41,7 +41,7 @@ public class BinaryRequestWriter extends
if (req instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) req;
if (isNull(updateRequest.getDocuments()) &&
- isNull(updateRequest.getDeleteById()) &&
+ isNull(updateRequest.getDeleteByIdMap()) &&
isNull(updateRequest.getDeleteQuery())
&& (updateRequest.getDocIterator() == null) ) {
return null;
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Sep 11 04:03:04 2013
@@ -30,18 +30,28 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -49,7 +59,9 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
@@ -58,6 +70,10 @@ import org.apache.zookeeper.KeeperExcept
* Instances of this class communicate with Zookeeper to discover
* Solr endpoints for SolrCloud collections, and then use the
* {@link LBHttpSolrServer} to issue requests.
+ *
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
*/
public class CloudSolrServer extends SolrServer {
private volatile ZkStateReader zkStateReader;
@@ -65,7 +81,7 @@ public class CloudSolrServer extends Sol
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
private volatile String defaultCollection;
- private LBHttpSolrServer lbServer;
+ private final LBHttpSolrServer lbServer;
private HttpClient myClient;
Random rand = new Random();
@@ -79,8 +95,31 @@ public class CloudSolrServer extends Sol
private volatile int lastClusterStateHashCode;
private final boolean updatesToLeaders;
+ private boolean parallelUpdates = true;
+ private ExecutorService threadPool = Executors
+ .newCachedThreadPool(new SolrjNamedThreadFactory(
+ "CloudSolrServer ThreadPool"));
+ private String idField = "id";
+ private final Set<String> NON_ROUTABLE_PARAMS;
+ {
+ NON_ROUTABLE_PARAMS = new HashSet<String>();
+ NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+
+ NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+
+ // Not supported via SolrCloud
+ // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+ }
+
+
-
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
* in the form HOST:PORT.
@@ -92,12 +131,13 @@ public class CloudSolrServer extends Sol
this.updatesToLeaders = true;
}
- public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException {
+ public CloudSolrServer(String zkHost, boolean updatesToLeaders)
+ throws MalformedURLException {
this.zkHost = zkHost;
this.myClient = HttpClientUtil.createClient(null);
this.lbServer = new LBHttpSolrServer(myClient);
this.updatesToLeaders = updatesToLeaders;
-}
+ }
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -121,10 +161,40 @@ public class CloudSolrServer extends Sol
this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders;
}
+
+ public ResponseParser getParser() {
+ return lbServer.getParser();
+ }
+
+ /**
+ * Note: This setter method is <b>not thread-safe</b>.
+ *
+ * @param processor
+ * Default Response Parser chosen to parse the response if the parser
+ * were not specified as part of the request.
+ * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+ */
+ public void setParser(ResponseParser processor) {
+ lbServer.setParser(processor);
+ }
public ZkStateReader getZkStateReader() {
return zkStateReader;
}
+
+ /**
+ * @param idField the field to route documents on.
+ */
+ public void setIdField(String idField) {
+ this.idField = idField;
+ }
+
+ /**
+ * @return the field that updates are routed on.
+ */
+ public String getIdField() {
+ return idField;
+ }
/** Sets the default collection for request */
public void setDefaultCollection(String collection) {
@@ -179,18 +249,293 @@ public class CloudSolrServer extends Sol
}
}
+ public void setParallelUpdates(boolean parallelUpdates) {
+ this.parallelUpdates = parallelUpdates;
+ }
+
+ private NamedList directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
+ UpdateRequest updateRequest = (UpdateRequest) request;
+ ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+ ModifiableSolrParams routableParams = new ModifiableSolrParams();
+ ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+ if(params != null) {
+ nonRoutableParams.add(params);
+ routableParams.add(params);
+ for(String param : NON_ROUTABLE_PARAMS) {
+ routableParams.remove(param);
+ }
+ }
+ if (params == null) {
+ return null;
+ }
+
+ String collection = params.get("collection", defaultCollection);
+ if (collection == null) {
+ throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+ }
+
+
+ //Check to see if the collection is an alias.
+ Aliases aliases = zkStateReader.getAliases();
+ if(aliases != null) {
+ Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
+ if(collectionAliases != null && collectionAliases.containsKey(collection)) {
+ collection = collectionAliases.get(collection);
+ }
+ }
+
+ DocCollection col = clusterState.getCollection(collection);
+
+ DocRouter router = col.getRouter();
+
+ if (router instanceof ImplicitDocRouter) {
+ // short circuit as optimization
+ return null;
+ }
+
+ //Create the URL map, which is keyed on slice name.
+ //The value is a list of URLs for each replica in the slice.
+ //The first value in the list is the leader for the slice.
+ Map<String,List<String>> urlMap = buildUrlMap(col);
+
+ NamedList exceptions = new NamedList();
+ NamedList shardResponses = new NamedList();
+
+ Map<String, LBHttpSolrServer.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+ if (routes == null) {
+ return null;
+ }
+
+ Iterator<Map.Entry<String, LBHttpSolrServer.Req>> it = routes.entrySet().iterator();
+
+ long start = System.nanoTime();
+ if(this.parallelUpdates) {
+ ArrayBlockingQueue<RequestTask> finishedTasks = new ArrayBlockingQueue<RequestTask>(routes.size());
+
+ while (it.hasNext()) {
+ Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
+ String url = entry.getKey();
+ LBHttpSolrServer.Req lbRequest = entry.getValue();
+ threadPool.execute(new RequestTask(url, lbRequest, finishedTasks));
+ }
+
+ while ((shardResponses.size() + exceptions.size()) != routes.size()) {
+ RequestTask requestTask = null;
+ try {
+ requestTask = finishedTasks.take();
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+
+ Exception e = requestTask.getException();
+ if (e != null) {
+ exceptions.add(requestTask.getLeader(), e);
+ } else {
+ shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse());
+ }
+ }
+
+ if(exceptions.size() > 0) {
+ throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+ }
+ } else {
+ while (it.hasNext()) {
+ Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
+ String url = entry.getKey();
+ LBHttpSolrServer.Req lbRequest = entry.getValue();
+ try{
+ NamedList rsp = lbServer.request(lbRequest).getResponse();
+ shardResponses.add(url, rsp);
+ } catch(Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+ }
+
+ UpdateRequest nonRoutableRequest = null;
+ List<String> deleteQuery = updateRequest.getDeleteQuery();
+ if (deleteQuery != null && deleteQuery.size() > 0) {
+ UpdateRequest deleteQueryRequest = new UpdateRequest();
+ deleteQueryRequest.setDeleteQuery(deleteQuery);
+ nonRoutableRequest = deleteQueryRequest;
+ }
+
+ Set<String> paramNames = nonRoutableParams.getParameterNames();
+
+ Set<String> intersection = new HashSet<String>(paramNames);
+ intersection.retainAll(NON_ROUTABLE_PARAMS);
+
+ if (nonRoutableRequest != null || intersection.size() > 0) {
+ if (nonRoutableRequest == null) {
+ nonRoutableRequest = new UpdateRequest();
+ }
+ nonRoutableRequest.setParams(nonRoutableParams);
+ List<String> urlList = new ArrayList<String>();
+ urlList.addAll(routes.keySet());
+ Collections.shuffle(urlList, rand);
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(nonRoutableRequest, urlList);
+ try {
+ LBHttpSolrServer.Rsp rsp = lbServer.request(req);
+ shardResponses.add(urlList.get(0), rsp.getResponse());
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+ }
+ }
+
+ long end = System.nanoTime();
+
+ RouteResponse rr = condenseResponse(shardResponses, (long)((end - start)/1000000));
+ rr.setRouteResponses(shardResponses);
+ rr.setRoutes(routes);
+ return rr;
+ }
+
+ private Map<String,List<String>> buildUrlMap(DocCollection col) {
+ Map<String, List<String>> urlMap = new HashMap<String, List<String>>();
+ Collection<Slice> slices = col.getActiveSlices();
+ Iterator<Slice> sliceIterator = slices.iterator();
+ while (sliceIterator.hasNext()) {
+ Slice slice = sliceIterator.next();
+ String name = slice.getName();
+ List<String> urls = new ArrayList<String>();
+ Replica leader = slice.getLeader();
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+ String url = zkProps.getBaseUrl() + "/" + col.getName();
+ urls.add(url);
+ Collection<Replica> replicas = slice.getReplicas();
+ Iterator<Replica> replicaIterator = replicas.iterator();
+ while (replicaIterator.hasNext()) {
+ Replica replica = replicaIterator.next();
+ if (!replica.getNodeName().equals(leader.getNodeName()) &&
+ !replica.getName().equals(leader.getName())) {
+ ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+ String url1 = zkProps1.getBaseUrl() + "/" + col.getName();
+ urls.add(url1);
+ }
+ }
+ urlMap.put(name, urls);
+ }
+ return urlMap;
+ }
+
+ public RouteResponse condenseResponse(NamedList response, long timeMillis) {
+ RouteResponse condensed = new RouteResponse();
+ int status = 0;
+ for(int i=0; i<response.size(); i++) {
+ NamedList shardResponse = (NamedList)response.getVal(i);
+ NamedList header = (NamedList)shardResponse.get("responseHeader");
+ Integer shardStatus = (Integer)header.get("status");
+ int s = shardStatus.intValue();
+ if(s > 0) {
+ status = s;
+ }
+ }
+
+ NamedList cheader = new NamedList();
+ cheader.add("status", status);
+ cheader.add("QTime", timeMillis);
+ condensed.add("responseHeader", cheader);
+ return condensed;
+ }
+
+ class RequestTask implements Runnable {
+
+ private LBHttpSolrServer.Req req;
+ private ArrayBlockingQueue<RequestTask> tasks;
+ private LBHttpSolrServer.Rsp rsp;
+ private String leader;
+ private Exception e;
+
+ public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue<RequestTask> tasks) {
+ this.req = req;
+ this.tasks = tasks;
+ this.leader = leader;
+ }
+
+ public void run() {
+ try {
+ LBHttpSolrServer lb = new LBHttpSolrServer(myClient);
+ this.rsp = lb.request(req);
+ this.tasks.add(this);
+ } catch (Exception e) {
+ this.e = e;
+ this.tasks.add(this);
+ }
+ }
+
+ public Exception getException() {
+ return e;
+ }
+
+ public String getLeader() {
+ return this.leader;
+ }
+
+ public LBHttpSolrServer.Rsp getRsp() {
+ return rsp;
+ }
+ }
+
+ class RouteResponse extends NamedList {
+ private NamedList routeResponses;
+ private Map<String, LBHttpSolrServer.Req> routes;
+
+ public void setRouteResponses(NamedList routeResponses) {
+ this.routeResponses = routeResponses;
+ }
+
+ public NamedList getRouteResponses() {
+ return routeResponses;
+ }
+
+ public void setRoutes(Map<String, LBHttpSolrServer.Req> routes) {
+ this.routes = routes;
+ }
+
+ public Map<String, LBHttpSolrServer.Req> getRoutes() {
+ return routes;
+ }
+
+ }
+
+ class RouteException extends SolrException {
+
+ private NamedList exceptions;
+ private Map<String, LBHttpSolrServer.Req> routes;
+
+ public RouteException(ErrorCode errorCode, NamedList exceptions, Map<String, LBHttpSolrServer.Req> routes){
+ super(errorCode, ((Exception)exceptions.getVal(0)).getMessage(), (Exception)exceptions.getVal(0));
+ this.exceptions = exceptions;
+ this.routes = routes;
+ }
+
+ public NamedList getExceptions() {
+ return exceptions;
+ }
+
+ public Map<String, LBHttpSolrServer.Req> getRoutes() {
+ return this.routes;
+ }
+ }
+
@Override
public NamedList<Object> request(SolrRequest request)
throws SolrServerException, IOException {
connect();
- // TODO: if you can hash here, you could favor the shard leader
-
ClusterState clusterState = zkStateReader.getClusterState();
+
boolean sendToLeaders = false;
List<String> replicas = null;
- if (request instanceof IsUpdateRequest && updatesToLeaders) {
+ if (request instanceof IsUpdateRequest) {
+ if(request instanceof UpdateRequest) {
+ NamedList response = directUpdate((AbstractUpdateRequest)request,clusterState);
+ if(response != null) {
+ return response;
+ }
+ }
sendToLeaders = true;
replicas = new ArrayList<String>();
}
@@ -355,6 +700,10 @@ public class CloudSolrServer extends Sol
if (myClient!=null) {
myClient.getConnectionManager().shutdown();
}
+
+ if(this.threadPool != null && !this.threadPool.isShutdown()) {
+ this.threadPool.shutdown();
+ }
}
public LBHttpSolrServer getLbServer() {
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Wed Sep 11 04:03:04 2013
@@ -98,14 +98,14 @@ public class HttpSolrServer extends Solr
*
* @see org.apache.solr.client.solrj.impl.BinaryResponseParser
*/
- protected ResponseParser parser;
+ protected volatile ResponseParser parser;
/**
* The RequestWriter used to write all requests to Solr
*
* @see org.apache.solr.client.solrj.request.RequestWriter
*/
- protected RequestWriter requestWriter = new RequestWriter();
+ protected volatile RequestWriter requestWriter = new RequestWriter();
private final HttpClient httpClient;
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Wed Sep 11 04:03:04 2013
@@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.imp
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.*;
+import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
@@ -93,7 +94,8 @@ public class LBHttpSolrServer extends So
private final AtomicInteger counter = new AtomicInteger(-1);
private static final SolrQuery solrQuery = new SolrQuery("*:*");
- private final ResponseParser parser;
+ private volatile ResponseParser parser;
+ private volatile RequestWriter requestWriter;
static {
solrQuery.setRows(0);
@@ -219,11 +221,13 @@ public class LBHttpSolrServer extends So
}
protected HttpSolrServer makeServer(String server) throws MalformedURLException {
- return new HttpSolrServer(server, httpClient, parser);
+ HttpSolrServer s = new HttpSolrServer(server, httpClient, parser);
+ if (requestWriter != null) {
+ s.setRequestWriter(requestWriter);
+ }
+ return s;
}
-
-
/**
* Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
* If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
@@ -590,6 +594,18 @@ public class LBHttpSolrServer extends So
return httpClient;
}
+ public ResponseParser getParser() {
+ return parser;
+ }
+
+ public void setParser(ResponseParser parser) {
+ this.parser = parser;
+ }
+
+ public void setRequestWriter(RequestWriter requestWriter) {
+ this.requestWriter = requestWriter;
+ }
+
@Override
protected void finalize() throws Throwable {
try {
@@ -603,4 +619,5 @@ public class LBHttpSolrServer extends So
// defaults
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
+
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Wed Sep 11 04:03:04 2013
@@ -23,6 +23,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -63,11 +66,14 @@ public class JavaBinUpdateRequestCodec {
if(updateRequest.getDocIterator() != null){
docIter = updateRequest.getDocIterator();
}
+
+ Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
nl.add("params", params);// 0: params
- nl.add("delById", updateRequest.getDeleteById());
+ nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
nl.add("delByQ", updateRequest.getDeleteQuery());
nl.add("docs", docIter);
+ nl.add("docsMap", docMap);
JavaBinCodec codec = new JavaBinCodec();
codec.marshal(nl, os);
}
@@ -86,7 +92,9 @@ public class JavaBinUpdateRequestCodec {
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList>> doclist;
+ Map<SolrInputDocument,Map<String,Object>> docMap;
List<String> delById;
+ Map<String,Long> delByIdMap;
List<String> delByQ;
final NamedList[] namedList = new NamedList[1];
JavaBinCodec codec = new JavaBinCodec() {
@@ -158,9 +166,11 @@ public class JavaBinUpdateRequestCodec {
}
}
delById = (List<String>) namedList[0].get("delById");
+ delByIdMap = (Map<String,Long>) namedList[0].get("delByIdMap");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List) namedList[0].get("docs");
-
+ docMap = (Map<SolrInputDocument,Map<String,Object>>) namedList[0].get("docsMap");
+
if (doclist != null && !doclist.isEmpty()) {
List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
for (Object o : doclist) {
@@ -172,11 +182,29 @@ public class JavaBinUpdateRequestCodec {
}
updateRequest.add(solrInputDocs);
}
+ if (docMap != null && !docMap.isEmpty()) {
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docMap.entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ Map<String,Object> map = entry.getValue();
+ Boolean overwrite = null;
+ Integer commitWithin = null;
+ if (map != null) {
+ overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE);
+ commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN);
+ }
+ updateRequest.add(entry.getKey(), commitWithin, overwrite);
+ }
+ }
if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
}
}
+ if (delByIdMap != null) {
+ for (Map.Entry<String,Long> entry : delByIdMap.entrySet()) {
+ updateRequest.deleteById(entry.getKey(), entry.getValue());
+ }
+ }
if (delByQ != null) {
for (String s : delByQ) {
updateRequest.deleteByQuery(s);
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java Wed Sep 11 04:03:04 2013
@@ -26,6 +26,7 @@ import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.nio.charset.Charset;
/**
@@ -52,7 +53,7 @@ public class RequestWriter {
private boolean isEmpty(UpdateRequest updateRequest) {
return isNull(updateRequest.getDocuments()) &&
- isNull(updateRequest.getDeleteById()) &&
+ isNull(updateRequest.getDeleteByIdMap()) &&
isNull(updateRequest.getDeleteQuery()) &&
updateRequest.getDocIterator() == null;
}
@@ -137,4 +138,8 @@ public class RequestWriter {
protected boolean isNull(List l) {
return l == null || l.isEmpty();
}
+
+ protected boolean isNull(Map l) {
+ return l == null || l.isEmpty();
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Wed Sep 11 04:03:04 2013
@@ -22,203 +22,411 @@ import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.XML;
/**
*
- *
+ *
* @since solr 1.3
*/
public class UpdateRequest extends AbstractUpdateRequest {
- private List<SolrInputDocument> documents = null;
+ public static final String OVERWRITE = "ow";
+ public static final String COMMIT_WITHIN = "cw";
+ private Map<SolrInputDocument,Map<String,Object>> documents = null;
private Iterator<SolrInputDocument> docIterator = null;
- private List<String> deleteById = null;
+ private Map<String,Map<String,Object>> deleteById = null;
private List<String> deleteQuery = null;
-
- public UpdateRequest()
- {
- super( METHOD.POST, "/update" );
+
+ public UpdateRequest() {
+ super(METHOD.POST, "/update");
}
-
+
public UpdateRequest(String url) {
super(METHOD.POST, url);
}
-
- //---------------------------------------------------------------------------
- //---------------------------------------------------------------------------
+
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
/**
* clear the pending documents and delete commands
*/
- public void clear()
- {
- if( documents != null ) {
+ public void clear() {
+ if (documents != null) {
documents.clear();
}
- if( deleteById != null ) {
+ if (deleteById != null) {
deleteById.clear();
}
- if( deleteQuery != null ) {
+ if (deleteQuery != null) {
deleteQuery.clear();
}
}
- //---------------------------------------------------------------------------
- //---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------
+
+ public UpdateRequest add(final SolrInputDocument doc) {
+ if (documents == null) {
+ documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ }
+ documents.put(doc, null);
+ return this;
+ }
+
+ public UpdateRequest add(final SolrInputDocument doc, Boolean overwrite) {
+ return add(doc, null, overwrite);
+ }
+
+ public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin) {
+ return add(doc, commitWithin, null);
+ }
- public UpdateRequest add( final SolrInputDocument doc )
- {
- if( documents == null ) {
- documents = new ArrayList<SolrInputDocument>( 2 );
+ public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin,
+ Boolean overwrite) {
+ if (documents == null) {
+ documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
}
- documents.add( doc );
+ Map<String,Object> params = new HashMap<String,Object>(2);
+ if (commitWithin != null) params.put(COMMIT_WITHIN, commitWithin);
+ if (overwrite != null) params.put(OVERWRITE, overwrite);
+
+ documents.put(doc, params);
+
return this;
}
- public UpdateRequest add( final Collection<SolrInputDocument> docs )
- {
- if( documents == null ) {
- documents = new ArrayList<SolrInputDocument>( docs.size()+1 );
+ public UpdateRequest add(final Collection<SolrInputDocument> docs) {
+ if (documents == null) {
+ documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ }
+ for (SolrInputDocument doc : docs) {
+ documents.put(doc, null);
}
- documents.addAll( docs );
return this;
}
- public UpdateRequest deleteById( String id )
- {
- if( deleteById == null ) {
- deleteById = new ArrayList<String>();
+ public UpdateRequest deleteById(String id) {
+ if (deleteById == null) {
+ deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
- deleteById.add( id );
+ deleteById.put(id, null);
return this;
}
- public UpdateRequest deleteById( List<String> ids )
- {
- if( deleteById == null ) {
- deleteById = new ArrayList<String>(ids);
- } else {
- deleteById.addAll(ids);
+
+ public UpdateRequest deleteById(List<String> ids) {
+ if (deleteById == null) {
+ deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
+
+ for (String id : ids) {
+ deleteById.put(id, null);
+ }
+
return this;
}
- public UpdateRequest deleteByQuery( String q )
- {
- if( deleteQuery == null ) {
+ public UpdateRequest deleteById(String id, Long version) {
+ if (deleteById == null) {
+ deleteById = new LinkedHashMap<String,Map<String,Object>>();
+ }
+ Map<String,Object> params = new HashMap<String,Object>(1);
+ params.put("ver", version);
+ deleteById.put(id, params);
+ return this;
+ }
+
+ public UpdateRequest deleteByQuery(String q) {
+ if (deleteQuery == null) {
deleteQuery = new ArrayList<String>();
}
- deleteQuery.add( q );
+ deleteQuery.add(q);
return this;
}
+
+ /**
+ * @param router to route updates with
+ * @param col DocCollection for the updates
+ * @param urlMap of the cluster
+ * @param params params to use
+ * @param idField the id field
+ * @return a Map of urls to requests
+ */
+ public Map<String,LBHttpSolrServer.Req> getRoutes(DocRouter router,
+ DocCollection col, Map<String,List<String>> urlMap,
+ ModifiableSolrParams params, String idField) {
+
+ if ((documents == null || documents.size() == 0)
+ && (deleteById == null || deleteById.size() == 0)) {
+ return null;
+ }
+
+ Map<String,LBHttpSolrServer.Req> routes = new HashMap<String,LBHttpSolrServer.Req>();
+ if (documents != null) {
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ SolrInputDocument doc = entry.getKey();
+ Object id = doc.getFieldValue(idField);
+ if (id == null) {
+ return null;
+ }
+ Slice slice = router.getTargetSlice(id
+ .toString(), doc, null, col);
+ if (slice == null) {
+ return null;
+ }
+ List<String> urls = urlMap.get(slice.getName());
+ String leaderUrl = urls.get(0);
+ LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes
+ .get(leaderUrl);
+ if (request == null) {
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.setMethod(getMethod());
+ updateRequest.setCommitWithin(getCommitWithin());
+ updateRequest.setParams(params);
+ updateRequest.setPath(getPath());
+ request = new LBHttpSolrServer.Req(updateRequest, urls);
+ routes.put(leaderUrl, request);
+ }
+ UpdateRequest urequest = (UpdateRequest) request.getRequest();
+ urequest.add(doc);
+ }
+ }
+
+ // Route the deleteById's
+
+ if (deleteById != null) {
+
+ Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
+ .iterator();
+ while (entries.hasNext()) {
+
+ Map.Entry<String,Map<String,Object>> entry = entries.next();
+
+ String deleteId = entry.getKey();
+ Map<String,Object> map = entry.getValue();
+ Long version = null;
+ if (map != null) {
+ version = (Long) map.get("ver");
+ }
+ Slice slice = router.getTargetSlice(deleteId, null, null, col);
+ if (slice == null) {
+ return null;
+ }
+ List<String> urls = urlMap.get(slice.getName());
+ String leaderUrl = urls.get(0);
+ LBHttpSolrServer.Req request = routes.get(leaderUrl);
+ if (request != null) {
+ UpdateRequest urequest = (UpdateRequest) request.getRequest();
+ urequest.deleteById(deleteId, version);
+ } else {
+ UpdateRequest urequest = new UpdateRequest();
+ urequest.deleteById(deleteId, version);
+ request = new LBHttpSolrServer.Req(urequest, urls);
+ routes.put(leaderUrl, request);
+ }
+ }
+ }
+ return routes;
+ }
+
public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
this.docIterator = docIterator;
}
-
- //--------------------------------------------------------------------------
- //--------------------------------------------------------------------------
-
+
+ public void setDeleteQuery(List<String> deleteQuery) {
+ this.deleteQuery = deleteQuery;
+ }
+
+ // --------------------------------------------------------------------------
+ // --------------------------------------------------------------------------
+
@Override
public Collection<ContentStream> getContentStreams() throws IOException {
- return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML );
+ return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
}
-
+
public String getXML() throws IOException {
StringWriter writer = new StringWriter();
- writeXML( writer );
+ writeXML(writer);
writer.flush();
-
+
// If action is COMMIT or OPTIMIZE, it is sent with params
String xml = writer.toString();
- //System.out.println( "SEND:"+xml );
+ // System.out.println( "SEND:"+xml );
return (xml.length() > 0) ? xml : null;
}
+ private List<Map<SolrInputDocument,Map<String,Object>>> getDocLists(Map<SolrInputDocument,Map<String,Object>> documents) {
+ List<Map<SolrInputDocument,Map<String,Object>>> docLists = new ArrayList<Map<SolrInputDocument,Map<String,Object>>>();
+ Map<SolrInputDocument,Map<String,Object>> docList = null;
+ if (this.documents != null) {
+
+ Boolean lastOverwrite = true;
+ Integer lastCommitWithin = -1;
+
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = this.documents
+ .entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ Map<String,Object> map = entry.getValue();
+ Boolean overwrite = null;
+ Integer commitWithin = null;
+ if (map != null) {
+ overwrite = (Boolean) entry.getValue().get(OVERWRITE);
+ commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN);
+ }
+ if (overwrite != lastOverwrite || commitWithin != lastCommitWithin
+ || docLists.size() == 0) {
+ docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ docLists.add(docList);
+ }
+ docList.put(entry.getKey(), entry.getValue());
+ lastCommitWithin = commitWithin;
+ lastOverwrite = overwrite;
+ }
+ }
+
+ if (docIterator != null) {
+ docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+ docLists.add(docList);
+ while (docIterator.hasNext()) {
+ SolrInputDocument doc = docIterator.next();
+ if (doc != null) {
+ docList.put(doc, null);
+ }
+ }
+
+ }
+
+ return docLists;
+ }
+
/**
* @since solr 1.4
*/
- public void writeXML( Writer writer ) throws IOException {
- if( (documents != null && documents.size() > 0) || docIterator != null) {
- if( commitWithin > 0 ) {
- writer.write("<add commitWithin=\""+commitWithin+"\">");
- }
- else {
- writer.write("<add>");
- }
- if(documents != null) {
- for (SolrInputDocument doc : documents) {
- if (doc != null) {
- ClientUtils.writeXML(doc, writer);
- }
+ public void writeXML(Writer writer) throws IOException {
+ List<Map<SolrInputDocument,Map<String,Object>>> getDocLists = getDocLists(documents);
+
+ for (Map<SolrInputDocument,Map<String,Object>> docs : getDocLists) {
+
+ if ((docs != null && docs.size() > 0)) {
+ Entry<SolrInputDocument,Map<String,Object>> firstDoc = docs.entrySet()
+ .iterator().next();
+ Map<String,Object> map = firstDoc.getValue();
+ Integer cw = null;
+ Boolean ow = null;
+ if (map != null) {
+ cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN);
+ ow = (Boolean) firstDoc.getValue().get(OVERWRITE);
}
- }
- if (docIterator != null) {
- while (docIterator.hasNext()) {
- SolrInputDocument doc = docIterator.next();
- if (doc != null) {
- ClientUtils.writeXML(doc, writer);
- }
+ if (ow == null) ow = true;
+ int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin;
+ boolean overwrite = ow;
+ if (commitWithin > -1 || overwrite != true) {
+ writer.write("<add commitWithin=\"" + commitWithin + "\" "
+ + "overwrite=\"" + overwrite + "\">");
+ } else {
+ writer.write("<add>");
+ }
+
+ Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docs
+ .entrySet();
+ for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+ ClientUtils.writeXML(entry.getKey(), writer);
}
+
+ writer.write("</add>");
}
- writer.write("</add>");
}
// Add the delete commands
boolean deleteI = deleteById != null && deleteById.size() > 0;
boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
- if( deleteI || deleteQ ) {
- if(commitWithin>0) {
- writer.append( "<delete commitWithin=\"" + commitWithin + "\">" );
+ if (deleteI || deleteQ) {
+ if (commitWithin > 0) {
+ writer.append("<delete commitWithin=\"" + commitWithin + "\">");
} else {
- writer.append( "<delete>" );
+ writer.append("<delete>");
}
- if( deleteI ) {
- for( String id : deleteById ) {
- writer.append( "<id>" );
- XML.escapeCharData( id, writer );
- writer.append( "</id>" );
+ if (deleteI) {
+ for (Map.Entry<String,Map<String,Object>> entry : deleteById.entrySet()) {
+ writer.append("<id");
+ Map<String,Object> map = entry.getValue();
+ if (map != null) {
+ Long version = (Long) map.get("ver");
+ if (version != null) {
+ writer.append(" version=\"" + version + "\"");
+ }
+ }
+ writer.append(">");
+
+ XML.escapeCharData(entry.getKey(), writer);
+ writer.append("</id>");
}
}
- if( deleteQ ) {
- for( String q : deleteQuery ) {
- writer.append( "<query>" );
- XML.escapeCharData( q, writer );
- writer.append( "</query>" );
+ if (deleteQ) {
+ for (String q : deleteQuery) {
+ writer.append("<query>");
+ XML.escapeCharData(q, writer);
+ writer.append("</query>");
}
}
- writer.append( "</delete>" );
+ writer.append("</delete>");
}
}
-
-
- //--------------------------------------------------------------------------
- //--------------------------------------------------------------------------
-
- //--------------------------------------------------------------------------
- //
- //--------------------------------------------------------------------------
-
+
+ // --------------------------------------------------------------------------
+ // --------------------------------------------------------------------------
+
+ // --------------------------------------------------------------------------
+ //
+ // --------------------------------------------------------------------------
+
public List<SolrInputDocument> getDocuments() {
+ if (documents == null) return null;
+ List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(documents.size());
+ docs.addAll(documents.keySet());
+ return docs;
+ }
+
+ public Map<SolrInputDocument,Map<String,Object>> getDocumentsMap() {
return documents;
}
-
+
public Iterator<SolrInputDocument> getDocIterator() {
return docIterator;
}
-
+
public List<String> getDeleteById() {
+ if (deleteById == null) return null;
+ List<String> deletes = new ArrayList<String>(deleteById.keySet());
+ return deletes;
+ }
+
+ public Map<String,Map<String,Object>> getDeleteByIdMap() {
return deleteById;
}
-
+
public List<String> getDeleteQuery() {
return deleteQuery;
}
-
+
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java Wed Sep 11 04:03:04 2013
@@ -19,13 +19,22 @@ package org.apache.solr.client.solrj.imp
import java.io.File;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.ExternalPaths;
import org.junit.After;
import org.junit.AfterClass;
@@ -100,6 +109,92 @@ public class CloudSolrServerTest extends
del("*:*");
+ commit();
+
+ SolrInputDocument doc1 = new SolrInputDocument();
+ doc1.addField(id, "0");
+ doc1.addField("a_t", "hello1");
+ SolrInputDocument doc2 = new SolrInputDocument();
+ doc2.addField(id, "2");
+ doc2.addField("a_t", "hello2");
+
+ UpdateRequest request = new UpdateRequest();
+ request.add(doc1);
+ request.add(doc2);
+ request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
+
+ // Test single threaded routed updates for UpdateRequest
+ NamedList response = cloudClient.request(request);
+ CloudSolrServer.RouteResponse rr = (CloudSolrServer.RouteResponse) response;
+ Map<String,LBHttpSolrServer.Req> routes = rr.getRoutes();
+ Iterator<Map.Entry<String,LBHttpSolrServer.Req>> it = routes.entrySet()
+ .iterator();
+ while (it.hasNext()) {
+ Map.Entry<String,LBHttpSolrServer.Req> entry = it.next();
+ String url = entry.getKey();
+ UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
+ .getRequest();
+ SolrInputDocument doc = updateRequest.getDocuments().get(0);
+ String id = doc.getField("id").getValue().toString();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("q", "id:" + id);
+ params.add("distrib", "false");
+ QueryRequest queryRequest = new QueryRequest(params);
+ HttpSolrServer solrServer = new HttpSolrServer(url);
+ QueryResponse queryResponse = queryRequest.process(solrServer);
+ SolrDocumentList docList = queryResponse.getResults();
+ assertTrue(docList.getNumFound() == 1);
+ }
+
+ // Test the deleteById routing for UpdateRequest
+
+ UpdateRequest delRequest = new UpdateRequest();
+ delRequest.deleteById("0");
+ delRequest.deleteById("2");
+ delRequest.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
+ cloudClient.request(delRequest);
+ ModifiableSolrParams qParams = new ModifiableSolrParams();
+ qParams.add("q", "*:*");
+ QueryRequest qRequest = new QueryRequest(qParams);
+ QueryResponse qResponse = qRequest.process(cloudClient);
+ SolrDocumentList docs = qResponse.getResults();
+ assertTrue(docs.getNumFound() == 0);
+
+ // Test Multi-Threaded routed updates for UpdateRequest
+
+ CloudSolrServer threadedClient = null;
+ try {
+ threadedClient = new CloudSolrServer(zkServer.getZkAddress());
+ threadedClient.setParallelUpdates(true);
+ threadedClient.setDefaultCollection("collection1");
+ response = threadedClient.request(request);
+ rr = (CloudSolrServer.RouteResponse) response;
+ routes = rr.getRoutes();
+ it = routes.entrySet()
+ .iterator();
+ while (it.hasNext()) {
+ Map.Entry<String,LBHttpSolrServer.Req> entry = it.next();
+ String url = entry.getKey();
+ UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
+ .getRequest();
+ SolrInputDocument doc = updateRequest.getDocuments().get(0);
+ String id = doc.getField("id").getValue().toString();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("q", "id:" + id);
+ params.add("distrib", "false");
+ QueryRequest queryRequest = new QueryRequest(params);
+ HttpSolrServer solrServer = new HttpSolrServer(url);
+ QueryResponse queryResponse = queryRequest.process(solrServer);
+ SolrDocumentList docList = queryResponse.getResults();
+ assertTrue(docList.getNumFound() == 1);
+ }
+ } finally {
+ threadedClient.shutdown();
+ }
+
+ del("*:*");
+ commit();
+
indexr(id, 0, "a_t", "to come to the aid of their country.");
CloudJettyRunner shard1Leader = shardToLeaderJetty.get("shard1");
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java Wed Sep 11 04:03:04 2013
@@ -16,23 +16,22 @@
*/
package org.apache.solr.client.solrj.request;
-import junit.framework.Assert;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.SolrInputField;
-import org.apache.solr.common.util.FastInputStream;
-import org.junit.Test;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.ArrayList;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.junit.Test;
/**
* Test for UpdateRequestCodec
@@ -93,7 +92,7 @@ public class TestUpdateRequestCodec exte
};
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
- Assert.assertNull(updateUnmarshalled.getDocuments());
+
for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document);
}
@@ -144,7 +143,7 @@ public class TestUpdateRequestCodec exte
};
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
- Assert.assertNull(updateUnmarshalled.getDocuments());
+
for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document);
}
Modified: lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Wed Sep 11 04:03:04 2013
@@ -246,6 +246,7 @@ public abstract class AbstractFullDistri
protected CloudSolrServer createCloudClient(String defaultCollection)
throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+ server.setParallelUpdates(random().nextBoolean());
if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
server.getLbServer().getHttpClient().getParams()
.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
@@ -1696,6 +1697,7 @@ public abstract class AbstractFullDistri
synchronized(this) {
try {
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+ commondCloudSolrServer.setParallelUpdates(random().nextBoolean());
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
commondCloudSolrServer.connect();
} catch (MalformedURLException e) {