You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2016/07/07 12:44:50 UTC
lucene-solr:master: SOLR-9090: Add directUpdatesToLeadersOnly flag to
solrj CloudSolrClient. (Marvin Justice, Christine Poerschke)
Repository: lucene-solr
Updated Branches:
refs/heads/master f61a5f27d -> 976079a8e
SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. (Marvin Justice, Christine Poerschke)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/976079a8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/976079a8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/976079a8
Branch: refs/heads/master
Commit: 976079a8ee8a2cff1c8df01ae9f2856b3ddcdac3
Parents: f61a5f2
Author: Christine Poerschke <cp...@apache.org>
Authored: Tue Jul 5 17:14:47 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Thu Jul 7 10:03:21 2016 +0100
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../solr/client/solrj/impl/CloudSolrClient.java | 129 ++++++++++++++++---
.../client/solrj/request/UpdateRequest.java | 6 +
.../solrj/impl/CloudSolrClientBuilderTest.java | 10 ++
.../client/solrj/impl/CloudSolrClientTest.java | 24 +++-
.../java/org/apache/solr/SolrTestCaseJ4.java | 56 +++++++-
6 files changed, 203 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 95fa796..40add1d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -88,6 +88,9 @@ New Features
* SOLR-9243: Add terms.list parameter to the TermsComponent to fetch the docFreq for a list of terms
(Joel Bernstein)
+* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient.
+ (Marvin Justice, Christine Poerschke)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index f5e18b3..876f7f8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
@@ -117,6 +118,7 @@ public class CloudSolrClient extends SolrClient {
Random rand = new Random();
private final boolean updatesToLeaders;
+ private final boolean directUpdatesToLeadersOnly;
private boolean parallelUpdates = true;
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
@@ -206,6 +208,7 @@ public class CloudSolrClient extends SolrClient {
this.lbClient.setRequestWriter(new BinaryRequestWriter());
this.lbClient.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
+ this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
lbClient.addQueryParams(STATE_VERSION);
}
@@ -242,6 +245,7 @@ public class CloudSolrClient extends SolrClient {
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = createLBHttpSolrClient(myClient);
this.updatesToLeaders = true;
+ this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
lbClient.addQueryParams(STATE_VERSION);
}
@@ -299,6 +303,7 @@ public class CloudSolrClient extends SolrClient {
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
this.lbClient = createLBHttpSolrClient(myClient);
this.updatesToLeaders = true;
+ this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
}
@@ -329,8 +334,38 @@ public class CloudSolrClient extends SolrClient {
*/
@Deprecated
public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) {
+ this(zkHosts, chroot, httpClient, lbSolrClient, updatesToLeaders, false);
+ }
+
+ /**
+ * Create a new client object that connects to Zookeeper and is always aware
+ * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+ * SolrCloud has enough replicas for every shard in a collection, there is no
+ * single point of failure. Updates will be sent to shard leaders by default.
+ *
+ * @param zkHosts
+ * A Java Collection (List, Set, etc) of HOST:PORT strings, one for
+ * each host in the zookeeper ensemble. Note that with certain
+ * Collection types like HashSet, the order of hosts in the final
+ * connect string may not be in the same order you added them.
+ * @param chroot
+ * A chroot value for zookeeper, starting with a forward slash. If no
+ * chroot is required, use null.
+ * @param httpClient
+ * the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
+ * multi-threaded connection manager. If null, a default HttpClient will be used.
+ * @param lbSolrClient
+ * LBHttpSolrServer instance for requests. If null, a default HttpClient will be used.
+ * @param updatesToLeaders
+ * If true, sends updates to shard leaders.
+ * @param directUpdatesToLeadersOnly
+ * If true, sends direct updates to shard leaders only.
+ */
+ private CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient,
+ boolean updatesToLeaders, boolean directUpdatesToLeadersOnly) {
this.zkHost = buildZkHostString(zkHosts, chroot);
this.updatesToLeaders = updatesToLeaders;
+ this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.clientIsInternal = httpClient == null;
this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
@@ -374,6 +409,7 @@ public class CloudSolrClient extends SolrClient {
this.lbClient.setRequestWriter(new BinaryRequestWriter());
this.lbClient.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
+ this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = true;
lbClient.addQueryParams(STATE_VERSION);
}
@@ -414,6 +450,7 @@ public class CloudSolrClient extends SolrClient {
this.zkHost = zkHost;
this.lbClient = lbClient;
this.updatesToLeaders = updatesToLeaders;
+ this.directUpdatesToLeadersOnly = false;
shutdownLBHttpSolrServer = false;
this.clientIsInternal = false;
lbClient.addQueryParams(STATE_VERSION);
@@ -648,15 +685,18 @@ public class CloudSolrClient extends SolrClient {
//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
- Map<String,List<String>> urlMap = buildUrlMap(col);
- if (urlMap == null) {
- // we could not find a leader yet - use unoptimized general path
- return null;
- }
-
- Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+ final Map<String,List<String>> urlMap = buildUrlMap(col);
+ final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
if (routes == null) {
- return null;
+ if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
+ // we have info (documents with ids and/or ids to delete) with
+ // which to find the leaders but we could not find (all of) them
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "directUpdatesToLeadersOnly==true but could not find leader(s)");
+ } else {
+ // we could not find a leader or routes yet - use unoptimized general path
+ return null;
+ }
}
final NamedList<Throwable> exceptions = new NamedList<>();
@@ -764,21 +804,23 @@ public class CloudSolrClient extends SolrClient {
List<String> urls = new ArrayList<>();
Replica leader = slice.getLeader();
if (leader == null) {
+ if (directUpdatesToLeadersOnly) {
+ continue;
+ }
// take unoptimized general path - we cannot find a leader yet
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);
- Collection<Replica> replicas = slice.getReplicas();
- Iterator<Replica> replicaIterator = replicas.iterator();
- while (replicaIterator.hasNext()) {
- Replica replica = replicaIterator.next();
- if (!replica.getNodeName().equals(leader.getNodeName()) &&
- !replica.getName().equals(leader.getName())) {
- ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
- String url1 = zkProps1.getCoreUrl();
- urls.add(url1);
+ if (!directUpdatesToLeadersOnly) {
+ for (Replica replica : slice.getReplicas()) {
+ if (!replica.getNodeName().equals(leader.getNodeName()) &&
+ !replica.getName().equals(leader.getName())) {
+ ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+ String url1 = zkProps1.getCoreUrl();
+ urls.add(url1);
+ }
}
}
urlMap.put(name, urls);
@@ -1284,6 +1326,13 @@ public class CloudSolrClient extends SolrClient {
return updatesToLeaders;
}
+ /**
+ * @return true if direct updates are sent to shard leaders only
+ */
+ public boolean isDirectUpdatesToLeadersOnly() {
+ return directUpdatesToLeadersOnly;
+ }
+
/**If caches are expired they are refreshed after acquiring a lock.
* use this to set the number of locks
*/
@@ -1417,6 +1466,31 @@ public class CloudSolrClient extends SolrClient {
this.lbClient.setSoTimeout(timeout);
}
+ private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
+ final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
+ final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
+
+ final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+ final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
+ if (hasNoDocuments && hasNoDeleteById) {
+ // no documents and no delete-by-id, so no info to find leader(s)
+ return false;
+ }
+
+ if (documents != null) {
+ for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
+ final SolrInputDocument doc = entry.getKey();
+ final Object fieldValue = doc.getFieldValue(idField);
+ if (fieldValue == null) {
+ // a document with no id field value, so can't find leader for it
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
private static LBHttpSolrClient createLBHttpSolrClient(HttpClient httpClient) {
final LBHttpSolrClient lbClient = new LBHttpSolrClient.Builder()
.withHttpClient(httpClient)
@@ -1466,6 +1540,7 @@ public class CloudSolrClient extends SolrClient {
private String zkChroot;
private LBHttpSolrClient loadBalancedSolrClient;
private boolean shardLeadersOnly;
+ private boolean directUpdatesToLeadersOnly;
public Builder() {
this.zkHosts = new ArrayList();
@@ -1543,10 +1618,28 @@ public class CloudSolrClient extends SolrClient {
}
/**
+ * Tells {@link Builder} that created clients should send direct updates to shard leaders only.
+ */
+ public Builder sendDirectUpdatesToShardLeadersOnly() {
+ directUpdatesToLeadersOnly = true;
+ return this;
+ }
+
+ /**
+ * Tells {@link Builder} that created clients can send updates
+ * to any shard replica (shard leaders and non-leaders).
+ */
+ public Builder sendDirectUpdatesToAnyShardReplica() {
+ directUpdatesToLeadersOnly = false;
+ return this;
+ }
+
+ /**
* Create a {@link CloudSolrClient} based on the provided configuration.
*/
public CloudSolrClient build() {
- return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, shardLeadersOnly);
+ return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient,
+ shardLeadersOnly, directUpdatesToLeadersOnly);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
index f93a197..aec6e22 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
@@ -257,6 +257,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
return null;
}
List<String> urls = urlMap.get(slice.getName());
+ if (urls == null) {
+ return null;
+ }
String leaderUrl = urls.get(0);
LBHttpSolrClient.Req request = (LBHttpSolrClient.Req) routes
.get(leaderUrl);
@@ -305,6 +308,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
return null;
}
List<String> urls = urlMap.get(slice.getName());
+ if (urls == null) {
+ return null;
+ }
String leaderUrl = urls.get(0);
LBHttpSolrClient.Req request = routes.get(leaderUrl);
if (request != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
index 57692c7..5f1c5c2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
@@ -87,4 +87,14 @@ public class CloudSolrClientBuilderTest extends LuceneTestCase {
assertTrue(createdClient.isUpdatesToLeaders() == true);
}
}
+
+ @Test
+ public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
+ try(CloudSolrClient createdClient = new Builder()
+ .withZkHost(ANY_ZK_HOST)
+ .withZkChroot(ANY_CHROOT)
+ .build()) {
+ assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 616ddc4..cf12036 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -149,6 +149,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
// Test single threaded routed updates for UpdateRequest
NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
+ if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
+ checkSingleServer(response);
+ }
CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
@@ -173,10 +176,13 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
// Test the deleteById routing for UpdateRequest
- new UpdateRequest()
+ final UpdateResponse uResponse = new UpdateRequest()
.deleteById("0")
.deleteById("2")
.commit(cluster.getSolrClient(), COLLECTION);
+ if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
+ checkSingleServer(uResponse.getResponse());
+ }
QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
SolrDocumentList docs = qResponse.getResults();
@@ -187,6 +193,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
threadedClient.setParallelUpdates(true);
threadedClient.setDefaultCollection(COLLECTION);
response = threadedClient.request(request);
+ if (threadedClient.isDirectUpdatesToLeadersOnly()) {
+ checkSingleServer(response);
+ }
rr = (CloudSolrClient.RouteResponse) response;
routes = rr.getRoutes();
it = routes.entrySet()
@@ -540,4 +549,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
HttpClientUtil.close(client);
}
}
+
+ private static void checkSingleServer(NamedList<Object> response) {
+ final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
+ final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
+ final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
+ routes.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
+ assertEquals("wrong number of servers: "+entry.getValue().getServers(),
+ 1, entry.getValue().getServers().size());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 7a41454..ea70805 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -2041,11 +2041,55 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
return (0 == TestUtil.nextInt(random(), 0, 9)) ? unlikely : likely;
}
+ public static class CloudSolrClientBuilder extends CloudSolrClient.Builder {
+
+ private boolean configuredDUTflag = false;
+
+ public CloudSolrClientBuilder() {
+ super();
+ }
+
+ @Override
+ public CloudSolrClient.Builder sendDirectUpdatesToShardLeadersOnly() {
+ configuredDUTflag = true;
+ return super.sendDirectUpdatesToShardLeadersOnly();
+ }
+
+ @Override
+ public CloudSolrClient.Builder sendDirectUpdatesToAnyShardReplica() {
+ configuredDUTflag = true;
+ return super.sendDirectUpdatesToAnyShardReplica();
+ }
+
+ private void randomlyChooseDirectUpdatesToLeadersOnly() {
+ if (random().nextBoolean()) {
+ sendDirectUpdatesToShardLeadersOnly();
+ } else {
+ sendDirectUpdatesToAnyShardReplica();
+ }
+ }
+
+ @Override
+ public CloudSolrClient build() {
+ if (configuredDUTflag == false) {
+ // flag value not explicity configured
+ if (random().nextBoolean()) {
+ // so randomly choose a value
+ randomlyChooseDirectUpdatesToLeadersOnly();
+ } else {
+ // or go with whatever the default value is
+ configuredDUTflag = true;
+ }
+ }
+ return super.build();
+ }
+ }
+
public static CloudSolrClient getCloudSolrClient(String zkHost) {
if (random().nextBoolean()) {
return new CloudSolrClient(zkHost);
}
- return new CloudSolrClient.Builder()
+ return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.build();
}
@@ -2054,7 +2098,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
if (random().nextBoolean()) {
return new CloudSolrClient(zkHost, httpClient);
}
- return new CloudSolrClient.Builder()
+ return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.withHttpClient(httpClient)
.build();
@@ -2066,12 +2110,12 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
if (shardLeadersOnly) {
- return new CloudSolrClient.Builder()
+ return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.sendUpdatesOnlyToShardLeaders()
.build();
}
- return new CloudSolrClient.Builder()
+ return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.sendUpdatesToAllReplicasInShard()
.build();
@@ -2083,13 +2127,13 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
if (shardLeadersOnly) {
- return new CloudSolrClient.Builder()
+ return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.withHttpClient(httpClient)
.sendUpdatesOnlyToShardLeaders()
.build();
}
- return new CloudSolrClient.Builder()
+ return new CloudSolrClientBuilder()
.withZkHost(zkHost)
.withHttpClient(httpClient)
.sendUpdatesToAllReplicasInShard()